# Copyright 2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Keegan Smith
from __future__ import annotations
from functools import cached_property
import datetime
import time
import logging
import os
from os import PathLike
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from datetime import timedelta
from typing import List, Dict, Tuple, Union
import itertools
import csv
import re
import pendulum
import xmltodict
from airflow.hooks.base import BaseHook
from airflow.operators.dummy import DummyOperator
from google.auth import default
from google.cloud import bigquery
from google.cloud.bigquery import SourceFormat
from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag
from academic_observatory_workflows.s5cmd import S5Cmd
from observatory.api.client.model.dataset_release import DatasetRelease
from observatory.platform.airflow import PreviousDagRunSensor, is_first_dag_run
from observatory.platform.api import get_dataset_releases, get_latest_dataset_release
from observatory.platform.api import make_observatory_api
from observatory.platform.bigquery import (
bq_table_id,
bq_load_table,
bq_upsert_records,
bq_delete_records,
bq_sharded_table_id,
bq_create_dataset,
bq_snapshot,
)
from observatory.platform.config import AirflowConns
from observatory.platform.files import list_files, save_jsonl_gz, change_keys
from observatory.platform.gcs import (
gcs_upload_files,
gcs_blob_uri,
gcs_blob_name_from_path,
gcs_create_aws_transfer,
gcs_list_blobs,
gcs_hmac_key,
)
from observatory.platform.observatory_config import CloudWorkspace
from observatory.platform.workflows.workflow import (
Workflow,
ChangefileRelease,
cleanup,
set_task_state,
)
[docs]ORCID_AWS_SUMMARIES_BUCKET = "v2.0-summaries"
[docs]ORCID_REGEX = r"\d{4}-\d{4}-\d{4}-\d{3}(\d|X)\b"
[docs]ORCID_RECORD_REGEX = r"\d{4}-\d{4}-\d{4}-\d{3}(\d|X)\.xml$"
[docs]BATCH_REGEX = r"^\d{2}(\d|X)$"
[docs]class OrcidBatch:
"""Describes a single ORCID batch and its related files/folders"""
def __init__(self, download_dir: str, transform_dir: str, batch_str: str):
self.download_dir = download_dir
self.transform_dir = transform_dir
self.batch_str = batch_str
self.download_batch_dir = os.path.join(self.download_dir, batch_str)
self.download_log_file = os.path.join(self.download_dir, f"{self.batch_str}_log.txt")
self.download_error_file = os.path.join(self.download_dir, f"{self.batch_str}_error.txt")
self.manifest_file = os.path.join(self.download_dir, f"{self.batch_str}_manifest.csv")
self.transform_upsert_file = os.path.join(self.transform_dir, f"{self.batch_str}_upsert.jsonl.gz")
self.transform_delete_file = os.path.join(self.transform_dir, f"{self.batch_str}_delete.jsonl.gz")
if not os.path.exists(self.download_dir):
raise NotADirectoryError(f"Directory {self.download_dir} does not exist.")
if not os.path.exists(self.transform_dir):
raise NotADirectoryError(f"Directory {self.transform_dir} does not exist.")
if not re.match(BATCH_REGEX, self.batch_str):
raise ValueError(f"Batch string {self.batch_str} is not valid.")
os.makedirs(self.download_batch_dir, exist_ok=True)
@property
[docs] def existing_records(self) -> List[str]:
"""List of existing ORCID records on disk for this ORCID directory."""
return [os.path.basename(path) for path in list_files(self.download_batch_dir, ORCID_RECORD_REGEX)]
@property
[docs] def missing_records(self) -> List[str]:
"""List of missing ORCID records on disk for this ORCID directory."""
return list(set(self.expected_records) - set(self.existing_records))
@cached_property
[docs] def expected_records(self) -> List[str]:
"""List of expected ORCID records for this ORCID directory. Derived from the manifest file"""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [os.path.basename(row["blob_name"]) for row in reader]
@cached_property
[docs] def blob_uris(self) -> List[str]:
"""List of blob URIs from the manifest this ORCID directory."""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [gcs_blob_uri(bucket_name=row["bucket_name"], blob_name=row["blob_name"]) for row in reader]
[docs]class OrcidRelease(ChangefileRelease):
def __init__(
self,
*,
dag_id: str,
run_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str,
bq_main_table_name: str,
bq_upsert_table_name: str,
bq_delete_table_name: str,
start_date: pendulum.DateTime,
end_date: pendulum.DateTime,
prev_release_end: pendulum.DateTime,
prev_latest_modified_record: pendulum.DateTime,
is_first_run: bool,
):
"""Construct a CrossrefEventsRelease instance
:param dag_id: DAG ID.
:param run_id: DAG run ID.
:param cloud_workspace: Cloud workspace object for this release.
:param bq_dataset_id: BigQuery dataset ID.
:param bq_main_table_name: BigQuery main table name for the ORCID table.
:param bq_upsert_table_name: BigQuery table name for the ORCID upsert table.
:param bq_delete_table_name: BigQuery table name for the ORCID delete table.
:param start_date: Start date for the release.
:param end_date: End date for the release.
:param prev_end_date: End date for the previous release. Used for making the snapshot table date.
:param prev_latest_modified_record: Latest modified record for the previous release. Used to decide which records to update.
:param is_first_run: Whether this is the first run of the DAG.
"""
super().__init__(
dag_id=dag_id,
run_id=run_id,
start_date=start_date,
end_date=end_date,
)
self.cloud_workspace = cloud_workspace
self.bq_dataset_id = bq_dataset_id
self.bq_main_table_name = bq_main_table_name
self.bq_upsert_table_name = bq_upsert_table_name
self.bq_delete_table_name = bq_delete_table_name
self.prev_release_end = prev_release_end
self.prev_latest_modified_record = prev_latest_modified_record
self.is_first_run = is_first_run
# Files/folders
self.master_manifest_file = os.path.join(self.workflow_folder, "manifest.csv")
# Table names and URIs
self.upsert_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_upsert.jsonl.gz"
self.upsert_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.upsert_blob_glob)
self.delete_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_delete.jsonl.gz"
self.delete_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.delete_blob_glob)
self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_main_table_name)
self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_upsert_table_name)
self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_delete_table_name)
self.bq_snapshot_table_id = bq_sharded_table_id(
cloud_workspace.project_id, bq_dataset_id, f"{bq_main_table_name}_snapshot", prev_release_end
)
@property
[docs] def upsert_files(self): # The 'upsert' files in the transform_folder
return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_upsert\.jsonl\.gz$")
@property
[docs] def delete_files(self): # The 'delete' files in the transform_folder
return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_delete\.jsonl\.gz$")
@property
[docs] def downloaded_records(self): # Every downloaded record
return list_files(self.download_folder, ORCID_RECORD_REGEX)
@property
[docs] def orcid_directory_paths(self) -> List[str]:
"""Generates the paths to the orcid directories in the download folder"""
return [os.path.join(self.download_folder, folder) for folder in orcid_batch_names()]
[docs] def orcid_batches(self) -> List[OrcidBatch]:
"""Creates the orcid directories in the download folder if they don't exist and returns them"""
return [OrcidBatch(self.download_folder, self.transform_folder, batch) for batch in orcid_batch_names()]
[docs]class OrcidTelescope(Workflow):
"""ORCID telescope"""
def __init__(
self,
dag_id: str,
cloud_workspace: CloudWorkspace,
orcid_bucket: str = "ao-orcid",
orcid_summaries_prefix: str = "orcid_summaries",
bq_dataset_id: str = "orcid",
bq_main_table_name: str = "orcid",
bq_upsert_table_name: str = "orcid_upsert",
bq_delete_table_name: str = "orcid_delete",
dataset_description: str = "The ORCID dataset and supporting tables",
table_description: str = "The ORCID dataset",
snapshot_expiry_days: int = 31,
schema_file_path: str = os.path.join(default_schema_folder(), "orcid", "orcid.json"),
delete_schema_file_path: str = os.path.join(default_schema_folder(), "orcid", "orcid_delete.json"),
transfer_attempts: int = 5,
max_workers: int = os.cpu_count() * 2,
api_dataset_id: str = "orcid",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
aws_orcid_conn_id: str = "aws_orcid",
start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1),
schedule: str = "0 0 * * 0", # Midnight UTC every Sunday
queue: str = "remote_queue",
):
"""Construct an ORCID telescope instance"""
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id, aws_orcid_conn_id],
tags=[Tag.academic_observatory],
queue=queue,
)
self.cloud_workspace = cloud_workspace
self.orcid_bucket = orcid_bucket
self.orcid_summaries_prefix = orcid_summaries_prefix
self.bq_dataset_id = bq_dataset_id
self.bq_main_table_name = bq_main_table_name
self.bq_upsert_table_name = bq_upsert_table_name
self.bq_delete_table_name = bq_delete_table_name
self.api_dataset_id = api_dataset_id
self.schema_file_path = schema_file_path
self.delete_schema_file_path = delete_schema_file_path
self.transfer_attempts = transfer_attempts
self.dataset_description = dataset_description
self.table_description = table_description
self.snapshot_expiry_days = snapshot_expiry_days
self.max_workers = max_workers
self.observatory_api_conn_id = observatory_api_conn_id
self.aws_orcid_conn_id = aws_orcid_conn_id
external_task_id = "dag_run_complete"
self.add_operator(
PreviousDagRunSensor(
dag_id=self.dag_id,
external_task_id=external_task_id,
execution_delta=timedelta(days=7), # To match the @weekly schedule
)
)
self.add_setup_task(self.check_dependencies)
self.add_task(self.create_datasets)
# Download the data
self.add_task(self.transfer_orcid)
# Create snapshots of main table in case we mess up
self.add_task(self.bq_create_main_table_snapshot)
# Scour the data for updates
self.add_task(self.create_manifests)
# Download and transform updated files
self.add_task(self.download)
self.add_task(self.transform)
# Load the data to table
self.add_task(self.upload_transformed)
self.add_task(self.bq_load_main_table)
self.add_task(self.bq_load_upsert_table)
self.add_task(self.bq_load_delete_table)
self.add_task(self.bq_upsert_records)
self.add_task(self.bq_delete_records)
# Finish
self.add_task(self.add_new_dataset_release)
self.add_task(self.cleanup)
# The last task that the next DAG run's ExternalTaskSensor waits for.
self.add_operator(DummyOperator(task_id=external_task_id))
@property
[docs] def aws_orcid_key(self) -> Tuple[str, str]:
"""Return API login and password"""
connection = BaseHook.get_connection(self.aws_orcid_conn_id)
return connection.login, connection.password
[docs] def make_release(self, **kwargs) -> OrcidRelease:
"""Generates the OrcidRelease object."""
dag_run = kwargs["dag_run"]
is_first_run = is_first_dag_run(dag_run)
releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=self.api_dataset_id)
# Determine the modication cutoff for the new release
if is_first_run:
if not len(releases) == 0 and kwargs["task"].task_id != self.cleanup.__name__:
raise ValueError(
"fetch_releases: there should be no DatasetReleases stored in the Observatory API on the first DAG run."
)
prev_latest_modified_record = pendulum.instance(datetime.datetime.min)
prev_release_end = pendulum.instance(datetime.datetime.min)
else:
if not len(releases) >= 1:
raise ValueError(
"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run"
)
prev_release = get_latest_dataset_release(releases, "changefile_end_date")
prev_release_end = prev_release.changefile_end_date
prev_latest_modified_record = pendulum.parse(prev_release.extra["latest_modified_record_date"])
return OrcidRelease(
dag_id=self.dag_id,
run_id=kwargs["run_id"],
cloud_workspace=self.cloud_workspace,
bq_dataset_id=self.bq_dataset_id,
bq_main_table_name=self.bq_main_table_name,
bq_upsert_table_name=self.bq_upsert_table_name,
bq_delete_table_name=self.bq_delete_table_name,
prev_release_end=prev_release_end,
start_date=kwargs["data_interval_start"],
end_date=kwargs["data_interval_end"],
prev_latest_modified_record=prev_latest_modified_record,
is_first_run=is_first_run,
)
[docs] def create_datasets(self, release: OrcidRelease, **kwargs) -> None:
"""Create datasets"""
bq_create_dataset(
project_id=self.cloud_workspace.project_id,
dataset_id=self.bq_dataset_id,
location=self.cloud_workspace.data_location,
description=self.dataset_description,
)
[docs] def transfer_orcid(self, release: OrcidRelease, **kwargs):
"""Sync files from AWS bucket to Google Cloud bucket."""
success = False
for i in range(self.transfer_attempts):
logging.info(f"Beginning AWS to GCP transfer attempt no. {i+1}")
success, objects_count = gcs_create_aws_transfer(
aws_key=self.aws_orcid_key,
aws_bucket=ORCID_AWS_SUMMARIES_BUCKET,
include_prefixes=[],
gc_project_id=self.cloud_workspace.project_id,
gc_bucket_dst_uri=gcs_blob_uri(self.orcid_bucket, f"{self.orcid_summaries_prefix}/"),
description="Transfer ORCID data from AWS to GCP",
)
logging.info(f"Attempt {i+1}: Total number of objects transferred: {objects_count}")
logging.info(f"Attempt {i+1}: Success? {success}")
if success:
break
set_task_state(success, self.transfer_orcid.__name__, release)
[docs] def bq_create_main_table_snapshot(self, release: OrcidRelease, **kwargs):
"""Create a snapshot of each main table. The purpose of this table is to be able to rollback the table
if something goes wrong. The snapshot expires after self.snapshot_expiry_days."""
if release.is_first_run:
logging.info(f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run")
return
expiry_date = pendulum.now().add(days=self.snapshot_expiry_days)
success = bq_snapshot(
src_table_id=release.bq_main_table_id, dst_table_id=release.bq_snapshot_table_id, expiry_date=expiry_date
)
set_task_state(success, self.bq_create_main_table_snapshot.__name__, release)
[docs] def create_manifests(self, release: OrcidRelease, **kwargs):
"""Create a manifest of all the modified files in the orcid bucket."""
logging.info("Creating manifest")
orcid_batches = release.orcid_batches()
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = []
for orcid_batch in orcid_batches:
futures.append(
executor.submit(
create_orcid_batch_manifest,
orcid_batch=orcid_batch,
reference_date=release.prev_latest_modified_record,
bucket=self.orcid_bucket,
bucket_prefix=self.orcid_summaries_prefix,
)
)
for future in as_completed(futures):
future.result()
# Open and write each batch manifest to the master manifest file
logging.info("Joining manifest files")
with open(release.master_manifest_file, "w") as f:
writer = csv.DictWriter(f, fieldnames=MANIFEST_HEADER)
writer.writeheader()
for orcid_batch in orcid_batches:
with open(orcid_batch.manifest_file, "r") as df:
reader = csv.DictReader(df)
writer.writerows(reader)
[docs] def download(self, release: OrcidRelease, **kwargs):
"""Reads each batch's manifest and downloads the files from the gcs bucket."""
gcs_creds, project_id = default()
with gcs_hmac_key(project_id, gcs_creds.service_account_email) as (key, secret):
total_files = 0
start_time = time.time()
for orcid_batch in release.orcid_batches():
if not orcid_batch.missing_records:
logging.info(f"All files present for {orcid_batch.batch_str}. Skipping download.")
continue
logging.info(f"Downloading files for ORCID directory: {orcid_batch.batch_str}")
with open(orcid_batch.download_log_file, "w") as f:
s5cmd = S5Cmd(access_credentials=(key.access_id, secret), out_stream=f)
returncode = s5cmd.download_from_bucket(
uris=orcid_batch.blob_uris, local_path=orcid_batch.download_batch_dir
)[-1]
# Check for errors
if returncode != 0:
raise RuntimeError(
f"Download attempt '{orcid_batch.batch_str}': returned non-zero exit code: {returncode}. See log file: {orcid_batch.download_log_file}"
)
if orcid_batch.missing_records:
raise FileNotFoundError(f"All files were not downloaded for {orcid_batch.batch_str}. Aborting.")
# Keep track of files downloaded
total_files += len(orcid_batch.expected_records)
logging.info(
f"Downloaded {len(orcid_batch.expected_records)} records for batch '{orcid_batch.batch_str}'"
)
total_time = time.time() - start_time
# Check all files exist
if total_files != len(release.downloaded_records):
raise FileNotFoundError(
f"Downloaded {total_files} files but found {len(release.downloaded_records)} records on disk."
)
logging.info(f"Completed download for {total_files} files in {str(datetime.timedelta(seconds=total_time))}")
[docs] def bq_load_main_table(self, release: OrcidRelease, **kwargs):
"""Load the main table."""
if not release.is_first_run:
logging.info(f"bq_load_main_table: skipping as the main table is only created on the first run")
return
# Check that the number of files matches the number of blobs
blobs = gcs_list_blobs(self.cloud_workspace.transform_bucket, match_glob=release.upsert_blob_glob)
if not len(blobs) == len(release.upsert_files):
raise ValueError(
f"Number of blobs ({len(blobs)}) does not match number of files ({len(release.upsert_files)})"
)
success = bq_load_table(
uri=[gcs_blob_uri(blob.bucket.name, blob.name) for blob in blobs],
table_id=release.bq_main_table_id,
schema_file_path=self.schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_EMPTY,
ignore_unknown_values=False,
)
set_task_state(success, self.bq_load_main_table.__name__, release)
[docs] def bq_load_upsert_table(self, release: OrcidRelease, **kwargs):
"""Load the upsert table into bigquery"""
if release.is_first_run:
logging.info(f"bq_load_upsert_table: skipping as no records are upserted on the first run")
return
# Check that the number of files matches the number of blobs
blobs = gcs_list_blobs(self.cloud_workspace.transform_bucket, match_glob=release.upsert_blob_glob)
if not len(blobs) == len(release.upsert_files):
raise ValueError(
f"Number of blobs ({len(blobs)}) does not match number of files ({len(release.upsert_files)})"
)
success = bq_load_table(
uri=[gcs_blob_uri(blob.bucket.name, blob.name) for blob in blobs],
table_id=release.bq_upsert_table_id,
schema_file_path=self.schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=True,
)
set_task_state(success, self.bq_load_upsert_table.__name__, release)
[docs] def bq_load_delete_table(self, release: OrcidRelease, **kwargs):
"""Load the delete table into bigquery"""
if release.is_first_run:
logging.info(f"bq_load_delete_table: skipping as no records are deleted on the first run")
return
if not len(release.delete_files):
logging.info(f"bq_load_delete_table: skipping as no records require deleting")
return
# Check that the number of files matches the number of blobs
blobs = gcs_list_blobs(self.cloud_workspace.transform_bucket, match_glob=release.delete_blob_glob)
if not len(blobs) == len(release.delete_files):
raise ValueError(
f"Number of blobs ({len(blobs)}) does not match number of files ({len(release.delete_files)})"
)
success = bq_load_table(
uri=[gcs_blob_uri(blob.bucket.name, blob.name) for blob in blobs],
table_id=release.bq_delete_table_id,
schema_file_path=self.delete_schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=True,
)
set_task_state(success, self.bq_load_delete_table.__name__, release)
[docs] def bq_upsert_records(self, release: OrcidRelease, **kwargs):
"""Upsert the records from the upserts table into the main table."""
if release.is_first_run:
logging.info("bq_upsert_records: skipping as no records are upserted on the first run")
return
bq_upsert_records(
main_table_id=release.bq_main_table_id,
upsert_table_id=release.bq_upsert_table_id,
primary_key="orcid_identifier",
)
[docs] def bq_delete_records(self, release: OrcidRelease, **kwargs):
"""Delete the records in the delete table from the main table."""
if release.is_first_run:
logging.info("bq_delete_records: skipping as no records are deleted on the first run")
return
if not len(release.delete_files):
logging.info(f"bq_load_delete_table: skipping as no records require deleting")
return
bq_delete_records(
main_table_id=release.bq_main_table_id,
delete_table_id=release.bq_delete_table_id,
main_table_primary_key="orcid_identifier.path",
delete_table_primary_key="id",
)
[docs] def add_new_dataset_release(self, release: OrcidRelease, **kwargs) -> None:
"""Adds release information to API."""
dataset_release = DatasetRelease(
dag_id=self.dag_id,
dataset_id=self.api_dataset_id,
dag_run_id=release.run_id,
changefile_start_date=release.start_date,
changefile_end_date=release.end_date,
extra={"latest_modified_record_date": latest_modified_record_date(release.master_manifest_file)},
)
api = make_observatory_api(observatory_api_conn_id=self.observatory_api_conn_id)
api.post_dataset_release(dataset_release)
[docs] def cleanup(self, release: OrcidRelease, **kwargs) -> None:
"""Delete all files, folders and XComs associated with this release."""
cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder)
[docs]def create_orcid_batch_manifest(
orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None
) -> None:
"""Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the
modified files in the bucket's orcid directory. Only blobs modified after the reference date are included.
:param orcid_batch: The OrcidBatch instance for this orcid directory
:param reference_date: The date to use as a reference for the manifest
:param bucket: The name of the bucket
:param bucket_prefix: The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located
:return: The path to the date manifest, the path to the uri manifest
"""
prefix = f"{bucket_prefix}/{orcid_batch.batch_str}/" if bucket_prefix else f"{orcid_batch.batch_str}/"
logging.info(f"Creating manifests for {orcid_batch.batch_str}")
blobs = gcs_list_blobs(bucket, prefix=prefix)
manifest = []
for blob in blobs:
if pendulum.instance(blob.updated) >= reference_date:
manifest.append(
{
MANIFEST_HEADER[0]: blob.bucket.name,
MANIFEST_HEADER[1]: blob.name,
MANIFEST_HEADER[2]: blob.updated,
}
)
with open(orcid_batch.manifest_file, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=MANIFEST_HEADER)
writer.writeheader()
writer.writerows(manifest)
logging.info(f"Manifest saved to {orcid_batch.manifest_file}")
[docs]def latest_modified_record_date(manifest_file_path: Union[str, PathLike]) -> pendulum.DateTime:
"""Reads the manifest file and finds the most recent date of modification for the records
:param manifest_file_path: the path to the manifest file
:return: the most recent date of modification for the records
"""
with open(manifest_file_path, "r") as f:
reader = csv.DictReader(f)
modified_dates = sorted([pendulum.parse(row["updated"]) for row in reader])
return modified_dates[-1]
[docs]def orcid_batch_names() -> List[str]:
"""Create a list of all the possible ORCID directories
:return: A list of all the possible ORCID directories
"""
n_1_2 = [str(i) for i in range(10)] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
n_3 = n_1_2.copy() + ["X"] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, X
combinations = list(itertools.product(n_1_2, n_1_2, n_3)) # Creates the 000 to 99X directory structure
return ["".join(i) for i in combinations]