academic_observatory_workflows.workflows.orcid_telescope

Module Contents

Classes

OrcidBatch

Describes a single ORCID batch and its related files/folders

OrcidRelease

OrcidTelescope

ORCID telescope

Functions

create_orcid_batch_manifest(→ None)

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the

latest_modified_record_date(→ pendulum.DateTime)

Reads the manifest file and finds the most recent date of modification for the records

orcid_batch_names(→ List[str])

Create a list of all the possible ORCID directories

transform_orcid_record(→ Union[Dict, str])

Transform a single ORCID file/record.

Attributes

ORCID_AWS_SUMMARIES_BUCKET

ORCID_REGEX

ORCID_RECORD_REGEX

MANIFEST_HEADER

BATCH_REGEX

academic_observatory_workflows.workflows.orcid_telescope.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
academic_observatory_workflows.workflows.orcid_telescope.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
academic_observatory_workflows.workflows.orcid_telescope.ORCID_RECORD_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\.xml$'[source]
academic_observatory_workflows.workflows.orcid_telescope.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
academic_observatory_workflows.workflows.orcid_telescope.BATCH_REGEX = '^\\d{2}(\\d|X)$'[source]
class academic_observatory_workflows.workflows.orcid_telescope.OrcidBatch(download_dir: str, transform_dir: str, batch_str: str)[source]

Describes a single ORCID batch and its related files/folders

property existing_records: List[str][source]

List of existing ORCID records on disk for this ORCID directory.

property missing_records: List[str][source]

List of missing ORCID records on disk for this ORCID directory.

expected_records() List[str][source]

List of expected ORCID records for this ORCID directory. Derived from the manifest file

blob_uris() List[str][source]

List of blob URIs from the manifest this ORCID directory.

class academic_observatory_workflows.workflows.orcid_telescope.OrcidRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_release_end: pendulum.DateTime, prev_latest_modified_record: pendulum.DateTime, is_first_run: bool)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property upsert_files[source]
property delete_files[source]
property downloaded_records[source]
property orcid_directory_paths: List[str][source]

Generates the paths to the orcid directories in the download folder

orcid_batches() List[OrcidBatch][source]

Creates the orcid directories in the download folder if they don’t exist and returns them

class academic_observatory_workflows.workflows.orcid_telescope.OrcidTelescope(dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, orcid_bucket: str = 'ao-orcid', orcid_summaries_prefix: str = 'orcid_summaries', bq_dataset_id: str = 'orcid', bq_main_table_name: str = 'orcid', bq_upsert_table_name: str = 'orcid_upsert', bq_delete_table_name: str = 'orcid_delete', dataset_description: str = 'The ORCID dataset and supporting tables', table_description: str = 'The ORCID dataset', snapshot_expiry_days: int = 31, schema_file_path: str = os.path.join(default_schema_folder(), 'orcid', 'orcid.json'), delete_schema_file_path: str = os.path.join(default_schema_folder(), 'orcid', 'orcid_delete.json'), transfer_attempts: int = 5, max_workers: int = os.cpu_count() * 2, api_dataset_id: str = 'orcid', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, aws_orcid_conn_id: str = 'aws_orcid', start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1), schedule: str = '0 0 * * 0', queue: str = 'remote_queue')[source]

Bases: observatory.platform.workflows.workflow.Workflow

ORCID telescope

property aws_orcid_key: Tuple[str, str][source]

Return API login and password

make_release(**kwargs) OrcidRelease[source]

Generates the OrcidRelease object.

create_datasets(release: OrcidRelease, **kwargs) None[source]

Create datasets

transfer_orcid(release: OrcidRelease, **kwargs)[source]

Sync files from AWS bucket to Google Cloud bucket.

bq_create_main_table_snapshot(release: OrcidRelease, **kwargs)[source]

Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.

create_manifests(release: OrcidRelease, **kwargs)[source]

Create a manifest of all the modified files in the orcid bucket.

download(release: OrcidRelease, **kwargs)[source]

Reads each batch’s manifest and downloads the files from the gcs bucket.

transform(release: OrcidRelease, **kwargs)[source]

Transforms the downloaded files into serveral bigquery-compatible .jsonl files

upload_transformed(release: OrcidRelease, **kwargs)[source]

Uploads the upsert and delete files to the transform bucket.

bq_load_main_table(release: OrcidRelease, **kwargs)[source]

Load the main table.

bq_load_upsert_table(release: OrcidRelease, **kwargs)[source]

Load the upsert table into bigquery

bq_load_delete_table(release: OrcidRelease, **kwargs)[source]

Load the delete table into bigquery

bq_upsert_records(release: OrcidRelease, **kwargs)[source]

Upsert the records from the upserts table into the main table.

bq_delete_records(release: OrcidRelease, **kwargs)[source]

Delete the records in the delete table from the main table.

add_new_dataset_release(release: OrcidRelease, **kwargs) None[source]

Adds release information to API.

cleanup(release: OrcidRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.workflows.orcid_telescope.create_orcid_batch_manifest(orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None[source]

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.

Parameters:
  • orcid_batch – The OrcidBatch instance for this orcid directory

  • reference_date – The date to use as a reference for the manifest

  • bucket – The name of the bucket

  • bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located

Returns:

The path to the date manifest, the path to the uri manifest

academic_observatory_workflows.workflows.orcid_telescope.latest_modified_record_date(manifest_file_path: str | os.PathLike) pendulum.DateTime[source]

Reads the manifest file and finds the most recent date of modification for the records

Parameters:

manifest_file_path – the path to the manifest file

Returns:

the most recent date of modification for the records

academic_observatory_workflows.workflows.orcid_telescope.orcid_batch_names() List[str][source]

Create a list of all the possible ORCID directories

Returns:

A list of all the possible ORCID directories

academic_observatory_workflows.workflows.orcid_telescope.transform_orcid_record(record_path: str) Dict | str[source]

Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.

Parameters:
  • download_path – The path to the file with the ORCID record.

  • transform_folder – The path where transformed files will be saved.

Returns:

The transformed ORCID record. If the record is an error, the ORCID ID is returned.

Raises:
  • KeyError – If the record is not valid - no ‘error’ or ‘record’ section found

  • ValueError – If the ORCID ID does not match the file name’s ID