academic_observatory_workflows.orcid_telescope.tasks
Attributes
Functions
|
Generates the OrcidRelease object. |
|
Create datasets |
|
Sync files from AWS bucket to Google Cloud bucket. |
|
Create a snapshot of each main table. The purpose of this table is to be able to rollback the table |
|
Create a manifest of all the modified files in the orcid bucket. |
|
Reads each batch's manifest and downloads the files from the gcs bucket. |
|
Transforms the downloaded files into serveral bigquery-compatible .jsonl files |
|
Removes all files in the downloads folder of the release |
|
Uploads the upsert and delete files to the transform bucket. |
|
Load the main table. |
|
Load the upsert table into bigquery |
|
Load the delete table into bigquery |
|
Upsert the records from the upserts table into the main table. |
|
Delete the records in the delete table from the main table. |
|
Adds release information to API. |
|
Delete all files, folders and XComs associated with this release. |
|
Return API login and password |
|
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the |
|
Reads the manifest file and finds the most recent date of modification for the records |
|
Transform a single ORCID file/record. |
Module Contents
- academic_observatory_workflows.orcid_telescope.tasks.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
- academic_observatory_workflows.orcid_telescope.tasks.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
- academic_observatory_workflows.orcid_telescope.tasks.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
- academic_observatory_workflows.orcid_telescope.tasks.fetch_release(dag_id: str, run_id: str, dag_run: airflow.models.DagRun, data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, api_bq_dataset_id: str, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str) dict[source]
Generates the OrcidRelease object.
- Parameters:
dag_id – The ID of the dag
run_id – The ID of this dag run
dag_run – This dag run’s DagRun object
data_interval_start – The start of the data interval
data_interval_end – The end of the data interval
cloud_workspace – The CloudWorkspace object
api_bq_dataset_id – The bigquery dataset ID for the API
bq_dataset_id – The bigquery dataset ID for the telescope
bq_main_table_name – The name of the main bigquery table
bq_upsert_table_name – The name of the upsert table
bq_delete_table_name – The name of the delete table
- academic_observatory_workflows.orcid_telescope.tasks.create_dataset(release: dict, dataset_description: str) None[source]
Create datasets
- Parameters:
release – The Orcid Release object
dataset_description – The description to give the Orcid dataset
- academic_observatory_workflows.orcid_telescope.tasks.transfer_orcid(release: dict, aws_orcid_conn_id: str, transfer_attempts: int, orcid_bucket: str, orcid_summaries_prefix: str)[source]
Sync files from AWS bucket to Google Cloud bucket.
- Parameters:
aws_orcid_conn_id – The airflow connection ID for the AWS ORCID bucket
transfer_attempts – The number of times to attempt the transfer job before giving up
orcid_bucket – The name of the gcs bucket to store the Orcid data
orcid_summaries_prefix – The prefix in which to store the summaries in the bucket
- academic_observatory_workflows.orcid_telescope.tasks.bq_create_main_table_snapshot(release: dict, snapshot_expiry_days: int)[source]
Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after snapshot_expiry_days.
- Parameters:
snapshot_expiry_days – The number of days to keep the snapshot
- academic_observatory_workflows.orcid_telescope.tasks.create_manifests(release: dict, orcid_bucket: str, orcid_summaries_prefix: str, max_workers: int | None = None) str[source]
Create a manifest of all the modified files in the orcid bucket.
- Parameters:
orcid_bucket – The name of the gcs bucket to store the Orcid data
orcid_summaries_prefix – The prefix in which to store the summaries in the bucket
max_workers – Number of process pools to use. If None, uses the cpu count
- Returns:
The datetime of the last modified ORCID record in the manifest as a string
- academic_observatory_workflows.orcid_telescope.tasks.download(release: dict)[source]
Reads each batch’s manifest and downloads the files from the gcs bucket.
- academic_observatory_workflows.orcid_telescope.tasks.transform(release: dict, max_workers: int | None = None)[source]
Transforms the downloaded files into serveral bigquery-compatible .jsonl files
- Parameters:
max_workers – Number of process pools to use. If None, uses the cpu count
- academic_observatory_workflows.orcid_telescope.tasks.clean_downloads(release: dict)[source]
Removes all files in the downloads folder of the release
- academic_observatory_workflows.orcid_telescope.tasks.upload_transformed(release: dict)[source]
Uploads the upsert and delete files to the transform bucket.
- academic_observatory_workflows.orcid_telescope.tasks.bq_load_main_table(release: dict, schema_file_path: str)[source]
Load the main table.
- Parameters:
schema_file_path – The path to the schema file to use for table load
- academic_observatory_workflows.orcid_telescope.tasks.bq_load_upsert_table(release: dict, schema_file_path: str)[source]
Load the upsert table into bigquery
- Parameters:
schema_file_path – The path to the schema file to use for table load
- academic_observatory_workflows.orcid_telescope.tasks.bq_load_delete_table(release: dict, delete_schema_file_path: str)[source]
Load the delete table into bigquery
- Parameters:
schema_file_path – The path to the ‘delete’ schema file to use for table load
- academic_observatory_workflows.orcid_telescope.tasks.bq_upsert_records(release: dict)[source]
Upsert the records from the upserts table into the main table.
- academic_observatory_workflows.orcid_telescope.tasks.bq_delete_records(release: dict)[source]
Delete the records in the delete table from the main table.
- academic_observatory_workflows.orcid_telescope.tasks.add_dataset_release(release: dict, *, api_bq_dataset_id: str, latest_modified_date: str) None[source]
Adds release information to API.
- Parameters:
api_bq_datastet_id – bigquery dataset ID of the API
last_modified_release_date – The modification datetime of the last modified record of this release
- academic_observatory_workflows.orcid_telescope.tasks.cleanup_workflow(release: dict) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.orcid_telescope.tasks.aws_orcid_key(conn_id: str) Tuple[str, str][source]
Return API login and password
- academic_observatory_workflows.orcid_telescope.tasks.create_orcid_batch_manifest(orcid_batch: academic_observatory_workflows.orcid_telescope.batch.OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None[source]
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.
- Parameters:
orcid_batch – The OrcidBatch instance for this orcid directory
reference_date – The date to use as a reference for the manifest
bucket – The name of the bucket
bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located
- Returns:
The path to the date manifest, the path to the uri manifest
- academic_observatory_workflows.orcid_telescope.tasks.latest_modified_record_date(release: dict) str[source]
Reads the manifest file and finds the most recent date of modification for the records
- Parameters:
manifest_file_path – the path to the manifest file
- Returns:
the most recent date of modification for the records
- academic_observatory_workflows.orcid_telescope.tasks.transform_orcid_record(record_path: str) Dict | str[source]
Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.
- Parameters:
download_path – The path to the file with the ORCID record.
transform_folder – The path where transformed files will be saved.
- Returns:
The transformed ORCID record. If the record is an error, the ORCID ID is returned.
- Raises:
KeyError – If the record is not valid - no ‘error’ or ‘record’ section found
ValueError – If the ORCID ID does not match the file name’s ID