academic_observatory_workflows.orcid_telescope.orcid_telescope

Module Contents

Classes

OrcidBatch

Describes a single ORCID batch and its related files/folders

OrcidRelease

Functions

create_dag(, delete_schema_file_path, ...)

Construct an ORCID telescope instance.

aws_orcid_key(→ Tuple[str, str])

Return API login and password

create_orcid_batch_manifest(→ None)

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the

latest_modified_record_date(→ pendulum.DateTime)

Reads the manifest file and finds the most recent date of modification for the records

orcid_batch_names(→ List[str])

Create a list of all the possible ORCID directories

transform_orcid_record(→ Union[Dict, str])

Transform a single ORCID file/record.

Attributes

ORCID_AWS_SUMMARIES_BUCKET

ORCID_REGEX

ORCID_RECORD_REGEX

MANIFEST_HEADER

BATCH_REGEX

academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_RECORD_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\.xml$'[source]
academic_observatory_workflows.orcid_telescope.orcid_telescope.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
academic_observatory_workflows.orcid_telescope.orcid_telescope.BATCH_REGEX = '^\\d{2}(\\d|X)$'[source]
class academic_observatory_workflows.orcid_telescope.orcid_telescope.OrcidBatch(download_dir: str, transform_dir: str, batch_str: str)[source]

Describes a single ORCID batch and its related files/folders

property existing_records: List[str][source]

List of existing ORCID records on disk for this ORCID directory.

property missing_records: List[str][source]

List of missing ORCID records on disk for this ORCID directory.

expected_records() List[str][source]

List of expected ORCID records for this ORCID directory. Derived from the manifest file

blob_uris() List[str][source]

List of blob URIs from the manifest this ORCID directory.

class academic_observatory_workflows.orcid_telescope.orcid_telescope.OrcidRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_release_end: pendulum.DateTime, prev_latest_modified_record: pendulum.DateTime, is_first_run: bool)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property upsert_files[source]
property delete_files[source]
property downloaded_records[source]
property orcid_directory_paths: List[str][source]

Generates the paths to the orcid directories in the download folder

orcid_batches() List[OrcidBatch][source]

Creates the orcid directories in the download folder if they don’t exist and returns them

to_dict() Dict[source]
static from_dict(dict_: Dict) OrcidRelease[source]
academic_observatory_workflows.orcid_telescope.orcid_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, orcid_bucket: str = 'ao-orcid', orcid_summaries_prefix: str = 'orcid_summaries', bq_dataset_id: str = 'orcid', bq_main_table_name: str = 'orcid', bq_upsert_table_name: str = 'orcid_upsert', bq_delete_table_name: str = 'orcid_delete', dataset_description: str = 'The ORCID dataset and supporting tables', snapshot_expiry_days: int = 31, schema_file_path: str = project_path('orcid_telescope', 'schema', 'orcid.json'), delete_schema_file_path: str = project_path('orcid_telescope', 'schema', 'orcid_delete.json'), transfer_attempts: int = 5, max_workers: int = os.cpu_count() * 2, api_dataset_id: str = 'orcid', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, aws_orcid_conn_id: str = 'aws_orcid', start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1), schedule: str = '0 0 * * 0', queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

Construct an ORCID telescope instance.

Parameters:
  • dag_id – the id of the DAG.

  • cloud_workspace – the cloud workspace settings.

  • orcid_bucket – the Google Cloud Storage bucket where the ORCID files are stored.

  • orcid_summaries_prefix – the base folder containing the ORCID summaries.

  • bq_dataset_id – BigQuery dataset ID.

  • bq_main_table_name – BigQuery main table name for the ORCID table.

  • bq_upsert_table_name – BigQuery table name for the ORCID upsert table.

  • bq_delete_table_name – BigQuery table name for the ORCID delete table.

  • dataset_description – BigQuery dataset description.

  • snapshot_expiry_days – the number of days that a snapshot of each entity’s main table will take to expire,

which is set to 31 days so there is some time to rollback after an update. :param schema_file_path: the path to the schema file for the records produced by this workflow. :param delete_schema_file_path: the path to the delete schema file for the records produced by this workflow. :param transfer_attempts: the number of AWS to GCP transfer attempts. :param max_workers: maximum processes to use when transforming files. :param api_dataset_id: the Dataset ID to use when storing releases. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param aws_orcid_conn_id: Airflow Connection ID for the AWS ORCID bucket. :param start_date: the Apache Airflow DAG start date. :param schedule: the Apache Airflow schedule interval. :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task. :return: an Airflow DAG.

academic_observatory_workflows.orcid_telescope.orcid_telescope.aws_orcid_key(conn_id: str) Tuple[str, str][source]

Return API login and password

academic_observatory_workflows.orcid_telescope.orcid_telescope.create_orcid_batch_manifest(orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None[source]

Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.

Parameters:
  • orcid_batch – The OrcidBatch instance for this orcid directory

  • reference_date – The date to use as a reference for the manifest

  • bucket – The name of the bucket

  • bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located

Returns:

The path to the date manifest, the path to the uri manifest

academic_observatory_workflows.orcid_telescope.orcid_telescope.latest_modified_record_date(manifest_file_path: str | os.PathLike) pendulum.DateTime[source]

Reads the manifest file and finds the most recent date of modification for the records

Parameters:

manifest_file_path – the path to the manifest file

Returns:

the most recent date of modification for the records

academic_observatory_workflows.orcid_telescope.orcid_telescope.orcid_batch_names() List[str][source]

Create a list of all the possible ORCID directories

Returns:

A list of all the possible ORCID directories

academic_observatory_workflows.orcid_telescope.orcid_telescope.transform_orcid_record(record_path: str) Dict | str[source]

Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.

Parameters:
  • download_path – The path to the file with the ORCID record.

  • transform_folder – The path where transformed files will be saved.

Returns:

The transformed ORCID record. If the record is an error, the ORCID ID is returned.

Raises:
  • KeyError – If the record is not valid - no ‘error’ or ‘record’ section found

  • ValueError – If the ORCID ID does not match the file name’s ID