academic_observatory_workflows.crossref_metadata_telescope.tasks

Functions

fetch_release(→ dict)

Task to retrieve the release for the given start date

download(→ None)

Task to Download the crossref metadata dataset.

upload_downloaded(→ None)

Task to upload downloaded data to Cloud Storage.

extract(→ None)

Task to extract the downloaded metadata.

transform(→ None)

Task to transform the CrossrefMetadataRelease release for a given month.

upload_transformed(→ None)

Task to upload the transformed data to Cloud Storage.

bq_load(→ None)

Task to load each transformed release to BigQuery.

add_dataset_release(→ None)

Task to add release information to API.

cleanup_workflow(→ None)

Task to delete all files, folders and XComs associated with this release.

make_snapshot_url(→ str)

Creates the url to the snapshot

get_api_key(→ str)

Return Crossref Metadata API token from Airflow connections

check_release_exists(→ bool)

Check if a release exists.

transform_file(→ None)

Transform a single Crossref Metadata json file.

transform_item(item)

Transform a single Crossref Metadata JSON value.

Module Contents

academic_observatory_workflows.crossref_metadata_telescope.tasks.fetch_release(*, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, crossref_metadata_conn_id: str, dag_id: str, run_id: str, data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) dict[source]

Task to retrieve the release for the given start date

Parameters:
  • cloud_workspace – The cloud workspace object for the dag run

  • crossref_metadata_conn_id – The connection ID for crossref metadata

  • dag_id – The dag ID for the dag run

  • run_id – The run ID for the dag run

  • start_date – The earliest date to retrieve a release for

  • data_interval_start – The start of the data interval for this dag run

  • data_interval_end – The end of the data interval for this dag run

Returns:

The release object in dictionary form

academic_observatory_workflows.crossref_metadata_telescope.tasks.download(release: dict, base_url: str = 'https://api.crossref.org') None[source]

Task to Download the crossref metadata dataset. Expects the api key to be set as an environment variable named CROSSREF_METADATA_API_KEY

Parameters:

base_url – The base url of the crossref metadata api

academic_observatory_workflows.crossref_metadata_telescope.tasks.upload_downloaded(release: dict) None[source]

Task to upload downloaded data to Cloud Storage.

academic_observatory_workflows.crossref_metadata_telescope.tasks.extract(release, **context) None[source]

Task to extract the downloaded metadata.

academic_observatory_workflows.crossref_metadata_telescope.tasks.transform(release: dict, *, batch_size: int, max_processes: int | None = None) None[source]

Task to transform the CrossrefMetadataRelease release for a given month. Each extracted file is transformmed.

Parameters:
  • max_processes – the number of processes used with ProcessPoolExecutor to transform files in parallel.

  • batch_size – the number of files to send to ProcessPoolExecutor at one time.

academic_observatory_workflows.crossref_metadata_telescope.tasks.upload_transformed(release: dict) None[source]

Task to upload the transformed data to Cloud Storage.

academic_observatory_workflows.crossref_metadata_telescope.tasks.bq_load(release: dict, *, bq_dataset_id: str, bq_table_name: str, dataset_description: str, table_description: str, schema_folder: str) None[source]

Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.

Parameters:
  • bq_dataset_id – The bigquery dataset ID

  • bq_table_name – The bigqiery table name

  • dataset_description – The description to use when creating the dataset

  • table_description – The description to use when creating the table

  • schema_folder – The path to the schema folder

academic_observatory_workflows.crossref_metadata_telescope.tasks.add_dataset_release(release: dict, *, api_bq_dataset_id: str) None[source]

Task to add release information to API.

Parameters:
  • dag_id – The DAG ID

  • api_bq_dataset_id – The bigquery dataset ID for the API

academic_observatory_workflows.crossref_metadata_telescope.tasks.cleanup_workflow(release: dict) None[source]

Task to delete all files, folders and XComs associated with this release.

academic_observatory_workflows.crossref_metadata_telescope.tasks.make_snapshot_url(snapshot_date: pendulum.DateTime, base_url: str = 'https://api.crossref.org') str[source]

Creates the url to the snapshot

Parameters:

snashot_date – The date of the snapshot

Returns:

The snapshot url

academic_observatory_workflows.crossref_metadata_telescope.tasks.get_api_key(crossref_metadata_conn_id: str) str[source]

Return Crossref Metadata API token from Airflow connections

Parameters:

crossref_metadata_id – The airflow connection ID for crossref metadata

Returns:

The crossref metadata api key

academic_observatory_workflows.crossref_metadata_telescope.tasks.check_release_exists(month: pendulum.DateTime, api_key: str) bool[source]

Check if a release exists.

Parameters:
  • month – the month of the release given as a datetime.

  • api_key – the Crossref Metadata API key.

Returns:

if release exists or not.

academic_observatory_workflows.crossref_metadata_telescope.tasks.transform_file(input_file_path: str, output_file_path: str) None[source]

Transform a single Crossref Metadata json file. The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.

Parameters:
  • input_file_path – the path of the file to transform.

  • output_file_path – where to save the transformed file.

academic_observatory_workflows.crossref_metadata_telescope.tasks.transform_item(item)[source]

Transform a single Crossref Metadata JSON value.

Parameters:

item – a JSON value.

Returns:

the transformed item.