import itertools
import os
from typing import Dict, List
import pendulum
from academic_observatory_workflows.orcid_telescope.batch import OrcidBatch, ORCID_RECORD_REGEX
from observatory_platform.google.gcs import (
gcs_blob_name_from_path,
gcs_blob_uri,
)
from observatory_platform.airflow.release import ChangefileRelease
from observatory_platform.airflow.workflow import CloudWorkspace
from observatory_platform.google import bigquery as bq
from observatory_platform.files import list_files
[docs]class OrcidRelease(ChangefileRelease):
def __init__(
self,
*,
dag_id: str,
run_id: str,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str,
bq_main_table_name: str,
bq_upsert_table_name: str,
bq_delete_table_name: str,
start_date: pendulum.DateTime,
end_date: pendulum.DateTime,
prev_release_end: pendulum.DateTime,
prev_latest_modified_record: pendulum.DateTime,
is_first_run: bool,
):
"""Construct a Orcid Release instance
:param dag_id: DAG ID.
:param run_id: DAG run ID.
:param cloud_workspace: Cloud workspace object for this release.
:param bq_dataset_id: BigQuery dataset ID.
:param bq_main_table_name: BigQuery main table name for the ORCID table.
:param bq_upsert_table_name: BigQuery table name for the ORCID upsert table.
:param bq_delete_table_name: BigQuery table name for the ORCID delete table.
:param start_date: Start date for the release.
:param end_date: End date for the release.
:param prev_release_end: End date for the previous release. Used for making the snapshot table date.
:param prev_latest_modified_record: Latest modified record for the previous release. Used to decide which records to update.
:param is_first_run: Whether this is the first run of the DAG.
"""
super().__init__(
dag_id=dag_id,
run_id=run_id,
start_date=start_date,
end_date=end_date,
)
[docs] self.cloud_workspace = cloud_workspace
[docs] self.bq_dataset_id = bq_dataset_id
[docs] self.bq_main_table_name = bq_main_table_name
[docs] self.bq_upsert_table_name = bq_upsert_table_name
[docs] self.bq_delete_table_name = bq_delete_table_name
[docs] self.prev_release_end = prev_release_end
[docs] self.prev_latest_modified_record = prev_latest_modified_record
[docs] self.is_first_run = is_first_run
# Files/folders
[docs] self.master_manifest_file = os.path.join(self.release_folder, "manifest.csv")
# Table names and URIs
[docs] self.upsert_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_upsert.jsonl.gz"
[docs] self.upsert_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.upsert_blob_glob)
[docs] self.delete_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_delete.jsonl.gz"
[docs] self.delete_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.delete_blob_glob)
[docs] self.bq_main_table_id = bq.bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_main_table_name)
[docs] self.bq_upsert_table_id = bq.bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_upsert_table_name)
[docs] self.bq_delete_table_id = bq.bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_delete_table_name)
[docs] self.bq_snapshot_table_id = bq.bq_sharded_table_id(
cloud_workspace.project_id, bq_dataset_id, f"{bq_main_table_name}_snapshot", prev_release_end
)
@property
[docs] def upsert_files(self): # The 'upsert' files in the transform_folder
return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_upsert\.jsonl\.gz$")
@property
[docs] def delete_files(self): # The 'delete' files in the transform_folder
return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_delete\.jsonl\.gz$")
@property
[docs] def downloaded_records(self): # Every downloaded record
return list_files(self.download_folder, ORCID_RECORD_REGEX)
@property
[docs] def orcid_directory_paths(self) -> List[str]:
"""Generates the paths to the orcid directories in the download folder"""
return [os.path.join(self.download_folder, folder) for folder in orcid_batch_names()]
[docs] def orcid_batches(self) -> List[OrcidBatch]:
"""Creates the orcid directories in the download folder if they don't exist and returns them"""
return [OrcidBatch(self.download_folder, self.transform_folder, batch) for batch in orcid_batch_names()]
[docs] def to_dict(self) -> Dict:
return dict(
dag_id=self.dag_id,
run_id=self.run_id,
cloud_workspace=self.cloud_workspace.to_dict(),
bq_dataset_id=self.bq_dataset_id,
bq_main_table_name=self.bq_main_table_name,
bq_upsert_table_name=self.bq_upsert_table_name,
bq_delete_table_name=self.bq_delete_table_name,
start_date=self.start_date.timestamp(),
end_date=self.end_date.timestamp(),
prev_release_end=self.prev_release_end.timestamp(),
prev_latest_modified_record=self.prev_latest_modified_record.timestamp(),
is_first_run=self.is_first_run,
)
@staticmethod
[docs] def from_dict(dict_: Dict):
return OrcidRelease(
dag_id=dict_["dag_id"],
run_id=dict_["run_id"],
cloud_workspace=CloudWorkspace.from_dict(dict_["cloud_workspace"]),
bq_dataset_id=dict_["bq_dataset_id"],
bq_main_table_name=dict_["bq_main_table_name"],
bq_upsert_table_name=dict_["bq_upsert_table_name"],
bq_delete_table_name=dict_["bq_delete_table_name"],
start_date=pendulum.from_timestamp(dict_["start_date"]),
end_date=pendulum.from_timestamp(dict_["end_date"]),
prev_release_end=pendulum.from_timestamp(dict_["prev_release_end"]),
prev_latest_modified_record=pendulum.from_timestamp(dict_["prev_latest_modified_record"]),
is_first_run=dict_["is_first_run"],
)
[docs]def orcid_batch_names() -> List[str]:
"""Create a list of all the possible ORCID directories
:return: A list of all the possible ORCID directories
"""
n_1_2 = [str(i) for i in range(10)] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9
n_3 = n_1_2.copy() + ["X"] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, X
combinations = list(itertools.product(n_1_2, n_1_2, n_3)) # Creates the 000 to 99X directory structure
return ["".join(i) for i in combinations]