academic_observatory_workflows.workflows.orcid_telescope
Module Contents
Classes
Describes a single ORCID batch and its related files/folders |
|
ORCID telescope |
Functions
|
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the |
|
Reads the manifest file and finds the most recent date of modification for the records |
|
Create a list of all the possible ORCID directories |
|
Transform a single ORCID file/record. |
Attributes
- academic_observatory_workflows.workflows.orcid_telescope.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
- academic_observatory_workflows.workflows.orcid_telescope.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
- academic_observatory_workflows.workflows.orcid_telescope.ORCID_RECORD_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\.xml$'[source]
- academic_observatory_workflows.workflows.orcid_telescope.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
- class academic_observatory_workflows.workflows.orcid_telescope.OrcidBatch(download_dir: str, transform_dir: str, batch_str: str)[source]
Describes a single ORCID batch and its related files/folders
- property existing_records: List[str][source]
List of existing ORCID records on disk for this ORCID directory.
- property missing_records: List[str][source]
List of missing ORCID records on disk for this ORCID directory.
- class academic_observatory_workflows.workflows.orcid_telescope.OrcidRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_release_end: pendulum.DateTime, prev_latest_modified_record: pendulum.DateTime, is_first_run: bool)[source]
Bases:
observatory.platform.workflows.workflow.ChangefileRelease- property orcid_directory_paths: List[str][source]
Generates the paths to the orcid directories in the download folder
- orcid_batches() List[OrcidBatch][source]
Creates the orcid directories in the download folder if they don’t exist and returns them
- class academic_observatory_workflows.workflows.orcid_telescope.OrcidTelescope(dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, orcid_bucket: str = 'ao-orcid', orcid_summaries_prefix: str = 'orcid_summaries', bq_dataset_id: str = 'orcid', bq_main_table_name: str = 'orcid', bq_upsert_table_name: str = 'orcid_upsert', bq_delete_table_name: str = 'orcid_delete', dataset_description: str = 'The ORCID dataset and supporting tables', table_description: str = 'The ORCID dataset', snapshot_expiry_days: int = 31, schema_file_path: str = os.path.join(default_schema_folder(), 'orcid', 'orcid.json'), delete_schema_file_path: str = os.path.join(default_schema_folder(), 'orcid', 'orcid_delete.json'), transfer_attempts: int = 5, max_workers: int = os.cpu_count() * 2, api_dataset_id: str = 'orcid', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, aws_orcid_conn_id: str = 'aws_orcid', start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1), schedule: str = '0 0 * * 0', queue: str = 'remote_queue')[source]
Bases:
observatory.platform.workflows.workflow.WorkflowORCID telescope
- make_release(**kwargs) OrcidRelease[source]
Generates the OrcidRelease object.
- create_datasets(release: OrcidRelease, **kwargs) None[source]
Create datasets
- transfer_orcid(release: OrcidRelease, **kwargs)[source]
Sync files from AWS bucket to Google Cloud bucket.
- bq_create_main_table_snapshot(release: OrcidRelease, **kwargs)[source]
Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.
- create_manifests(release: OrcidRelease, **kwargs)[source]
Create a manifest of all the modified files in the orcid bucket.
- download(release: OrcidRelease, **kwargs)[source]
Reads each batch’s manifest and downloads the files from the gcs bucket.
- transform(release: OrcidRelease, **kwargs)[source]
Transforms the downloaded files into serveral bigquery-compatible .jsonl files
- upload_transformed(release: OrcidRelease, **kwargs)[source]
Uploads the upsert and delete files to the transform bucket.
- bq_load_main_table(release: OrcidRelease, **kwargs)[source]
Load the main table.
- bq_load_upsert_table(release: OrcidRelease, **kwargs)[source]
Load the upsert table into bigquery
- bq_load_delete_table(release: OrcidRelease, **kwargs)[source]
Load the delete table into bigquery
- bq_upsert_records(release: OrcidRelease, **kwargs)[source]
Upsert the records from the upserts table into the main table.
- bq_delete_records(release: OrcidRelease, **kwargs)[source]
Delete the records in the delete table from the main table.
- add_new_dataset_release(release: OrcidRelease, **kwargs) None[source]
Adds release information to API.
- cleanup(release: OrcidRelease, **kwargs) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.workflows.orcid_telescope.create_orcid_batch_manifest(orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None[source]
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.
- Parameters:
orcid_batch – The OrcidBatch instance for this orcid directory
reference_date – The date to use as a reference for the manifest
bucket – The name of the bucket
bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located
- Returns:
The path to the date manifest, the path to the uri manifest
- academic_observatory_workflows.workflows.orcid_telescope.latest_modified_record_date(manifest_file_path: str | os.PathLike) pendulum.DateTime[source]
Reads the manifest file and finds the most recent date of modification for the records
- Parameters:
manifest_file_path – the path to the manifest file
- Returns:
the most recent date of modification for the records
- academic_observatory_workflows.workflows.orcid_telescope.orcid_batch_names() List[str][source]
Create a list of all the possible ORCID directories
- Returns:
A list of all the possible ORCID directories
- academic_observatory_workflows.workflows.orcid_telescope.transform_orcid_record(record_path: str) Dict | str[source]
Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.
- Parameters:
download_path – The path to the file with the ORCID record.
transform_folder – The path where transformed files will be saved.
- Returns:
The transformed ORCID record. If the record is an error, the ORCID ID is returned.
- Raises:
KeyError – If the record is not valid - no ‘error’ or ‘record’ section found
ValueError – If the ORCID ID does not match the file name’s ID