academic_observatory_workflows.orcid_telescope.orcid_telescope
Module Contents
Classes
Describes a single ORCID batch and its related files/folders |
|
Functions
|
Construct an ORCID telescope instance. |
|
Return API login and password |
|
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the |
|
Reads the manifest file and finds the most recent date of modification for the records |
|
Create a list of all the possible ORCID directories |
|
Transform a single ORCID file/record. |
Attributes
- academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_AWS_SUMMARIES_BUCKET = 'v2.0-summaries'[source]
- academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\b'[source]
- academic_observatory_workflows.orcid_telescope.orcid_telescope.ORCID_RECORD_REGEX = '\\d{4}-\\d{4}-\\d{4}-\\d{3}(\\d|X)\\.xml$'[source]
- academic_observatory_workflows.orcid_telescope.orcid_telescope.MANIFEST_HEADER = ['bucket_name', 'blob_name', 'updated'][source]
- academic_observatory_workflows.orcid_telescope.orcid_telescope.BATCH_REGEX = '^\\d{2}(\\d|X)$'[source]
- class academic_observatory_workflows.orcid_telescope.orcid_telescope.OrcidBatch(download_dir: str, transform_dir: str, batch_str: str)[source]
Describes a single ORCID batch and its related files/folders
- property existing_records: List[str][source]
List of existing ORCID records on disk for this ORCID directory.
- property missing_records: List[str][source]
List of missing ORCID records on disk for this ORCID directory.
- class academic_observatory_workflows.orcid_telescope.orcid_telescope.OrcidRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_main_table_name: str, bq_upsert_table_name: str, bq_delete_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_release_end: pendulum.DateTime, prev_latest_modified_record: pendulum.DateTime, is_first_run: bool)[source]
Bases:
observatory.platform.workflows.workflow.ChangefileRelease
- property orcid_directory_paths: List[str][source]
Generates the paths to the orcid directories in the download folder
- orcid_batches() List[OrcidBatch] [source]
Creates the orcid directories in the download folder if they don’t exist and returns them
- static from_dict(dict_: Dict) OrcidRelease [source]
- academic_observatory_workflows.orcid_telescope.orcid_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, orcid_bucket: str = 'ao-orcid', orcid_summaries_prefix: str = 'orcid_summaries', bq_dataset_id: str = 'orcid', bq_main_table_name: str = 'orcid', bq_upsert_table_name: str = 'orcid_upsert', bq_delete_table_name: str = 'orcid_delete', dataset_description: str = 'The ORCID dataset and supporting tables', snapshot_expiry_days: int = 31, schema_file_path: str = project_path('orcid_telescope', 'schema', 'orcid.json'), delete_schema_file_path: str = project_path('orcid_telescope', 'schema', 'orcid_delete.json'), transfer_attempts: int = 5, max_workers: int = os.cpu_count() * 2, api_dataset_id: str = 'orcid', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, aws_orcid_conn_id: str = 'aws_orcid', start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1), schedule: str = '0 0 * * 0', queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG [source]
Construct an ORCID telescope instance.
- Parameters:
dag_id – the id of the DAG.
cloud_workspace – the cloud workspace settings.
orcid_bucket – the Google Cloud Storage bucket where the ORCID files are stored.
orcid_summaries_prefix – the base folder containing the ORCID summaries.
bq_dataset_id – BigQuery dataset ID.
bq_main_table_name – BigQuery main table name for the ORCID table.
bq_upsert_table_name – BigQuery table name for the ORCID upsert table.
bq_delete_table_name – BigQuery table name for the ORCID delete table.
dataset_description – BigQuery dataset description.
snapshot_expiry_days – the number of days that a snapshot of each entity’s main table will take to expire,
which is set to 31 days so there is some time to rollback after an update. :param schema_file_path: the path to the schema file for the records produced by this workflow. :param delete_schema_file_path: the path to the delete schema file for the records produced by this workflow. :param transfer_attempts: the number of AWS to GCP transfer attempts. :param max_workers: maximum processes to use when transforming files. :param api_dataset_id: the Dataset ID to use when storing releases. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param aws_orcid_conn_id: Airflow Connection ID for the AWS ORCID bucket. :param start_date: the Apache Airflow DAG start date. :param schedule: the Apache Airflow schedule interval. :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task. :return: an Airflow DAG.
- academic_observatory_workflows.orcid_telescope.orcid_telescope.aws_orcid_key(conn_id: str) Tuple[str, str] [source]
Return API login and password
- academic_observatory_workflows.orcid_telescope.orcid_telescope.create_orcid_batch_manifest(orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None) None [source]
Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the modified files in the bucket’s orcid directory. Only blobs modified after the reference date are included.
- Parameters:
orcid_batch – The OrcidBatch instance for this orcid directory
reference_date – The date to use as a reference for the manifest
bucket – The name of the bucket
bucket_prefix – The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located
- Returns:
The path to the date manifest, the path to the uri manifest
- academic_observatory_workflows.orcid_telescope.orcid_telescope.latest_modified_record_date(manifest_file_path: str | os.PathLike) pendulum.DateTime [source]
Reads the manifest file and finds the most recent date of modification for the records
- Parameters:
manifest_file_path – the path to the manifest file
- Returns:
the most recent date of modification for the records
- academic_observatory_workflows.orcid_telescope.orcid_telescope.orcid_batch_names() List[str] [source]
Create a list of all the possible ORCID directories
- Returns:
A list of all the possible ORCID directories
- academic_observatory_workflows.orcid_telescope.orcid_telescope.transform_orcid_record(record_path: str) Dict | str [source]
Transform a single ORCID file/record. Streams the content from the blob URI and turns this into a dictionary. The xml file is turned into a dictionary, a record should have either a valid ‘record’ section or an ‘error’section. The keys of the dictionary are slightly changed so they are valid BigQuery fields. If the record is valid, it is returned. If it is an error, the ORCID ID is returned.
- Parameters:
download_path – The path to the file with the ORCID record.
transform_folder – The path where transformed files will be saved.
- Returns:
The transformed ORCID record. If the record is an error, the ORCID ID is returned.
- Raises:
KeyError – If the record is not valid - no ‘error’ or ‘record’ section found
ValueError – If the ORCID ID does not match the file name’s ID