- Module code
- academic_observatory_workflows.orcid_telescope.batch
-
Source code for academic_observatory_workflows.orcid_telescope.batch
import csv
from typing import List
import os
import re
from functools import cached_property
from observatory_platform.files import list_files
from observatory_platform.google.gcs import gcs_blob_uri
[docs]BATCH_REGEX = r"^\d{2}(\d|X)$"
[docs]ORCID_RECORD_REGEX = r"\d{4}-\d{4}-\d{4}-\d{3}(\d|X)\.xml$"
[docs]class OrcidBatch:
"""Describes a single ORCID batch and its related files/folders"""
def __init__(self, download_dir: str, transform_dir: str, batch_str: str):
[docs] self.download_dir = download_dir
[docs] self.batch_str = batch_str
[docs] self.download_batch_dir = os.path.join(self.download_dir, batch_str)
[docs] self.download_log_file = os.path.join(self.download_dir, f"{self.batch_str}_log.txt")
[docs] self.download_error_file = os.path.join(self.download_dir, f"{self.batch_str}_error.txt")
[docs] self.manifest_file = os.path.join(self.download_dir, f"{self.batch_str}_manifest.csv")
if not os.path.exists(self.download_dir):
raise NotADirectoryError(f"Directory {self.download_dir} does not exist.")
if not os.path.exists(self.transform_dir):
raise NotADirectoryError(f"Directory {self.transform_dir} does not exist.")
if not re.match(BATCH_REGEX, self.batch_str):
raise ValueError(f"Batch string {self.batch_str} is not valid.")
os.makedirs(self.download_batch_dir, exist_ok=True)
@property
[docs] def existing_records(self) -> List[str]:
"""List of existing ORCID records on disk for this ORCID directory."""
return [os.path.basename(path) for path in list_files(self.download_batch_dir, ORCID_RECORD_REGEX)]
@property
[docs] def missing_records(self) -> List[str]:
"""List of missing ORCID records on disk for this ORCID directory."""
return list(set(self.expected_records) - set(self.existing_records))
@cached_property
[docs] def expected_records(self) -> List[str]:
"""List of expected ORCID records for this ORCID directory. Derived from the manifest file"""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [os.path.basename(row["blob_name"]) for row in reader]
@cached_property
[docs] def blob_uris(self) -> List[str]:
"""List of blob URIs from the manifest this ORCID directory."""
with open(self.manifest_file, "r") as f:
reader = csv.DictReader(f)
return [gcs_blob_uri(bucket_name=row["bucket_name"], blob_name=row["blob_name"]) for row in reader]