Source code for academic_observatory_workflows.crossref_metadata_telescope.release

import os
import pendulum
from observatory_platform.airflow.release import SnapshotRelease
from observatory_platform.airflow.workflow import CloudWorkspace


[docs]class CrossrefMetadataRelease(SnapshotRelease): def __init__( self, *, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime, cloud_workspace: CloudWorkspace, ): """Construct a Crossref Metadata Release Object. :param dag_id: the DAG id. :param run_id: the DAG run id. :param data_interval_start: The start of the data interval :param data_interval_end: The end of the data interval :param snapshot_date: the release date. :param cloud_workspace: the cloud workspace settings. """ super().__init__(dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date)
[docs] self.cloud_workspace = cloud_workspace
[docs] self.download_file_name = "crossref_metadata.json.tar.gz"
[docs] self.extract_files_regex = r".*\.json$"
[docs] self.transform_files_regex = r".*\.jsonl$"
[docs] self.data_interval_start = data_interval_start
[docs] self.data_interval_end = data_interval_end
@property
[docs] def download_file_path(self): return os.path.join(self.download_folder, self.download_file_name)
@staticmethod
[docs] def from_dict(dict_: dict): dag_id = dict_["dag_id"] run_id = dict_["run_id"] snapshot_date = pendulum.parse(dict_["snapshot_date"]) data_interval_start = pendulum.parse(dict_["data_interval_start"]) data_interval_end = pendulum.parse(dict_["data_interval_end"]) cloud_workspace = CloudWorkspace.from_dict(dict_["cloud_workspace"]) return CrossrefMetadataRelease( dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date, data_interval_start=data_interval_start, data_interval_end=data_interval_end, cloud_workspace=cloud_workspace, )
[docs] def to_dict(self) -> dict: return dict( dag_id=self.dag_id, run_id=self.run_id, snapshot_date=self.snapshot_date.to_datetime_string(), data_interval_start=self.data_interval_start.to_datetime_string(), data_interval_end=self.data_interval_end.to_datetime_string(), cloud_workspace=self.cloud_workspace.to_dict(), )