academic_observatory_workflows.orcid_telescope.tasks

Attributes

MANIFEST_HEADER

ORCID_AWS_SUMMARIES_BUCKET

ORCID_REGEX

Functions

fetch_release(→ dict)

Generates the OrcidRelease object.

create_dataset(→ None)

Create datasets

transfer_orcid(release, aws_orcid_conn_id, ...)

Sync files from AWS bucket to Google Cloud bucket.

bq_create_main_table_snapshot(release, ...)

Create a snapshot of each main table. The purpose of this table is to be able to rollback the table

create_manifests(→ str)

Create a manifest of all the modified files in the orcid bucket.

download(release)

Reads each batch's manifest and downloads the files from the gcs bucket.

transform(release[, max_workers])

Transforms the downloaded files into serveral bigquery-compatible .jsonl files

clean_downloads(release)

Removes all files in the downloads folder of the release

upload_transformed(release)

Uploads the upsert and delete files to the transform bucket.

bq_load_main_table(release, schema_file_path)

Load the main table.

bq_load_upsert_table(release, schema_file_path)

Load the upsert table into bigquery

bq_load_delete_table(release, delete_schema_file_path)

Load the delete table into bigquery

bq_upsert_records(release)

Upsert the records from the upserts table into the main table.

bq_delete_records(release)

Delete the records in the delete table from the main table.

add_dataset_release(→ None)

Adds release information to API.

cleanup_workflow(→ None)

Delete all files, folders and XComs associated with this release.

aws_orcid_key(→ Tuple[str, str])

Return API login and password

create_orcid_batch_manifest(→ None)

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the

latest_modified_record_date(→ str)

Reads the manifest file and finds the most recent date of modification for the records

transform_orcid_record(→ Union[Dict, str])

Transform a single ORCID file/record.

Module Contents

academic_observatory_workflows.orcid_telescope.tasks.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
academic_observatory_workflows.orcid_telescope.tasks.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
academic_observatory_workflows.orcid_telescope.tasks.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
academic_observatory_workflows.orcid_telescope.tasks.fetch_release(dag_id: str, run_id: str, dag_run: airflow.models.DagRun, data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, api_bq_dataset_id: str, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str) dict[source]

Generates the OrcidRelease object.

Parameters:
  • dag_id – The ID of the dag

  • run_id – The ID of this dag run

  • dag_run – This dag run’s DagRun object

  • data_interval_start – The start of the data interval

  • data_interval_end – The end of the data interval

  • cloud_workspace – The CloudWorkspace object

  • api_bq_dataset_id – The bigquery dataset ID for the API

  • bq_dataset_id – The bigquery dataset ID for the telescope

  • bq_main_table_name – The name of the main bigquery table

  • bq_upsert_table_name – The name of the upsert table

  • bq_delete_table_name – The name of the delete table

academic_observatory_workflows.orcid_telescope.tasks.create_dataset(release: dict, dataset_description: str) None[source]

Create datasets

Parameters:
  • release – The Orcid Release object

  • dataset_description – The description to give the Orcid dataset

academic_observatory_workflows.orcid_telescope.tasks.transfer_orcid(release: dict, aws_orcid_conn_id: str, transfer_attempts: int, orcid_bucket: str, orcid_summaries_prefix: str)[source]

Sync files from AWS bucket to Google Cloud bucket.

Parameters:
  • aws_orcid_conn_id – The airflow connection ID for the AWS ORCID bucket

  • transfer_attempts – The number of times to attempt the transfer job before giving up

  • orcid_bucket – The name of the gcs bucket to store the Orcid data

  • orcid_summaries_prefix – The prefix in which to store the summaries in the bucket

academic_observatory_workflows.orcid_telescope.tasks.bq_create_main_table_snapshot(release: dict, snapshot_expiry_days: int)[source]

Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after snapshot_expiry_days.

Parameters:

snapshot_expiry_days – The number of days to keep the snapshot

academic_observatory_workflows.orcid_telescope.tasks.create_manifests(release: dict, orcid_bucket: str, orcid_summaries_prefix: str, max_workers: int | None = None) str[source]

Create a manifest of all the modified files in the orcid bucket.

Parameters:
  • orcid_bucket – The name of the gcs bucket to store the Orcid data

  • orcid_summaries_prefix – The prefix in which to store the summaries in the bucket

  • max_workers – Number of process pools to use. If None, uses the cpu count

Returns:

The datetime of the last modified ORCID record in the manifest as a string

academic_observatory_workflows.orcid_telescope.tasks.download(release: dict)[source]

Reads each batch’s manifest and downloads the files from the gcs bucket.

academic_observatory_workflows.orcid_telescope.tasks.transform(release: dict, max_workers: int | None = None)[source]

Transforms the downloaded files into serveral bigquery-compatible .jsonl files

Parameters:

max_workers – Number of process pools to use. If None, uses the cpu count

academic_observatory_workflows.orcid_telescope.tasks.clean_downloads(release: dict)[source]

Removes all files in the downloads folder of the release

academic_observatory_workflows.orcid_telescope.tasks.upload_transformed(release: dict)[source]

Uploads the upsert and delete files to the transform bucket.

academic_observatory_workflows.orcid_telescope.tasks.bq_load_main_table(release: dict, schema_file_path: str)[source]

Load the main table.

Parameters:

schema_file_path – The path to the schema file to use for table load

academic_observatory_workflows.orcid_telescope.tasks.bq_load_upsert_table(release: dict, schema_file_path: str)[source]

Load the upsert table into bigquery

Parameters:

schema_file_path – The path to the schema file to use for table load

academic_observatory_workflows.orcid_telescope.tasks.bq_load_delete_table(release: dict, delete_schema_file_path: str)[source]

Load the delete table into bigquery

Parameters:

schema_file_path – The path to the ‘delete’ schema file to use for table load

academic_observatory_workflows.orcid_telescope.tasks.bq_upsert_records(release: dict)[source]

Upsert the records from the upserts table into the main table.

academic_observatory_workflows.orcid_telescope.tasks.bq_delete_records(release: dict)[source]

Delete the records in the delete table from the main table.

academic_observatory_workflows.orcid_telescope.tasks.add_dataset_release(release: dict, *, api_bq_dataset_id: str, latest_modified_date: str) None[source]

Adds release information to API.

Parameters:
  • api_bq_datastet_id – bigquery dataset ID of the API

  • last_modified_release_date – The modification datetime of the last modified record of this release

academic_observatory_workflows.orcid_telescope.tasks.cleanup_workflow(release: dict) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.orcid_telescope.tasks.aws_orcid_key(conn_id: str) Tuple[str, str][source]

Return API login and password

academic_observatory_workflows.orcid_telescope.tasks.create_orcid_batch_manifest(orcid_batch: academic_observatory_workflows.orcid_telescope.batch.OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None[source]

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.

Parameters:
  • orcid_batch – The OrcidBatch instance for this orcid directory

  • reference_date – The date to use as a reference for the manifest

  • bucket – The name of the bucket

  • bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located

Returns:

The path to the date manifest, the path to the uri manifest

academic_observatory_workflows.orcid_telescope.tasks.latest_modified_record_date(release: dict) str[source]

Reads the manifest file and finds the most recent date of modification for the records

Parameters:

manifest_file_path – the path to the manifest file

Returns:

the most recent date of modification for the records

academic_observatory_workflows.orcid_telescope.tasks.transform_orcid_record(record_path: str) Dict | str[source]

Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.

Parameters:
  • download_path – The path to the file with the ORCID record.

  • transform_folder – The path where transformed files will be saved.

Returns:

The transformed ORCID record. If the record is an error, the ORCID ID is returned.

Raises:
  • KeyError – If the record is not valid - no ‘error’ or ‘record’ section found

  • ValueError – If the ORCID ID does not match the file name’s ID