academic_observatory_workflows.crossref_metadata_telescope.tasks
Functions
|
Task to retrieve the release for the given start date |
|
Task to Download the crossref metadata dataset. |
|
Task to upload downloaded data to Cloud Storage. |
|
Task to extract the downloaded metadata. |
|
Task to transform the CrossrefMetadataRelease release for a given month. |
|
Task to upload the transformed data to Cloud Storage. |
|
Task to load each transformed release to BigQuery. |
|
Task to add release information to API. |
|
Task to delete all files, folders and XComs associated with this release. |
|
Creates the url to the snapshot |
|
Return Crossref Metadata API token from Airflow connections |
|
Check if a release exists. |
|
Transform a single Crossref Metadata json file. |
|
Transform a single Crossref Metadata JSON value. |
Module Contents
- academic_observatory_workflows.crossref_metadata_telescope.tasks.fetch_release(*, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, crossref_metadata_conn_id: str, dag_id: str, run_id: str, data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) dict[source]
Task to retrieve the release for the given start date
- Parameters:
cloud_workspace – The cloud workspace object for the dag run
crossref_metadata_conn_id – The connection ID for crossref metadata
dag_id – The dag ID for the dag run
run_id – The run ID for the dag run
start_date – The earliest date to retrieve a release for
data_interval_start – The start of the data interval for this dag run
data_interval_end – The end of the data interval for this dag run
- Returns:
The release object in dictionary form
- academic_observatory_workflows.crossref_metadata_telescope.tasks.download(release: dict, base_url: str = 'https://api.crossref.org') None[source]
Task to Download the crossref metadata dataset. Expects the api key to be set as an environment variable named CROSSREF_METADATA_API_KEY
- Parameters:
base_url – The base url of the crossref metadata api
- academic_observatory_workflows.crossref_metadata_telescope.tasks.upload_downloaded(release: dict) None[source]
Task to upload downloaded data to Cloud Storage.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.extract(release, **context) None[source]
Task to extract the downloaded metadata.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.transform(release: dict, *, batch_size: int, max_processes: int | None = None) None[source]
Task to transform the CrossrefMetadataRelease release for a given month. Each extracted file is transformmed.
- Parameters:
max_processes – the number of processes used with ProcessPoolExecutor to transform files in parallel.
batch_size – the number of files to send to ProcessPoolExecutor at one time.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.upload_transformed(release: dict) None[source]
Task to upload the transformed data to Cloud Storage.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.bq_load(release: dict, *, bq_dataset_id: str, bq_table_name: str, dataset_description: str, table_description: str, schema_folder: str) None[source]
Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.
- Parameters:
bq_dataset_id – The bigquery dataset ID
bq_table_name – The bigqiery table name
dataset_description – The description to use when creating the dataset
table_description – The description to use when creating the table
schema_folder – The path to the schema folder
- academic_observatory_workflows.crossref_metadata_telescope.tasks.add_dataset_release(release: dict, *, api_bq_dataset_id: str) None[source]
Task to add release information to API.
- Parameters:
dag_id – The DAG ID
api_bq_dataset_id – The bigquery dataset ID for the API
- academic_observatory_workflows.crossref_metadata_telescope.tasks.cleanup_workflow(release: dict) None[source]
Task to delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.make_snapshot_url(snapshot_date: pendulum.DateTime, base_url: str = 'https://api.crossref.org') str[source]
Creates the url to the snapshot
- Parameters:
snashot_date – The date of the snapshot
- Returns:
The snapshot url
- academic_observatory_workflows.crossref_metadata_telescope.tasks.get_api_key(crossref_metadata_conn_id: str) str[source]
Return Crossref Metadata API token from Airflow connections
- Parameters:
crossref_metadata_id – The airflow connection ID for crossref metadata
- Returns:
The crossref metadata api key
- academic_observatory_workflows.crossref_metadata_telescope.tasks.check_release_exists(month: pendulum.DateTime, api_key: str) bool[source]
Check if a release exists.
- Parameters:
month – the month of the release given as a datetime.
api_key – the Crossref Metadata API key.
- Returns:
if release exists or not.
- academic_observatory_workflows.crossref_metadata_telescope.tasks.transform_file(input_file_path: str, output_file_path: str) None[source]
Transform a single Crossref Metadata json file. The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.
- Parameters:
input_file_path – the path of the file to transform.
output_file_path – where to save the transformed file.