from __future__ import annotations
import os
from typing import Dict
import pendulum
from observatory_platform.google.gcs import gcs_blob_name_from_path, gcs_blob_uri
from observatory_platform.airflow.workflow import CloudWorkspace
from observatory_platform.airflow.release import SnapshotRelease
[docs]class RorRelease(SnapshotRelease):
def __init__(
self,
*,
dag_id: str,
run_id: str,
snapshot_date: pendulum.DateTime,
url: str,
checksum: str,
cloud_workspace: CloudWorkspace,
data_interval_start: pendulum.Datetime,
data_interval_end: pendulum.Datetime,
):
"""Construct a RorRelease.
:param dag_id: the DAG id.
:param run_id: the DAG id.
:param snapshot_date: the release date.
:param url: The url to the ror snapshot.
:param checksum: the file checksum.
:param cloud_workspace: the cloud workspace settings.
:param data_interval_start: The beginning of this release's data interval
:param data_interval_end: The end of this release's data interval
"""
super().__init__(dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date)
[docs] self.checksum = checksum
[docs] self.download_file_name = "ror.zip"
[docs] self.download_file_path = os.path.join(self.download_folder, self.download_file_name)
[docs] self.cloud_workspace = cloud_workspace
[docs] self.data_interval_start = data_interval_start
[docs] self.data_interval_end = data_interval_end
@property
[docs] def download_blob_name(self):
return gcs_blob_name_from_path(self.download_file_path)
@property
@property
[docs] def download_uri(self):
return gcs_blob_uri(self.cloud_workspace.download_bucket, self.download_blob_name)
@property
[docs] def to_dict(self) -> Dict:
return dict(
dag_id=self.dag_id,
run_id=self.run_id,
snapshot_date=self.snapshot_date.to_date_string(),
url=self.url,
checksum=self.checksum,
cloud_workspace=self.cloud_workspace.to_dict(),
)
@staticmethod
[docs] def from_dict(dict_: Dict) -> RorRelease:
dag_id = dict_["dag_id"]
snapshot_date = dict_["snapshot_date"]
url = dict_["url"]
checksum = dict_["checksum"]
cloud_workspace = dict_["cloud_workspace"]
data_interval_start = dict_["data_interval_start"]
data_interval_end = dict_["data_interval_end"]
return RorRelease(
dag_id=dag_id,
run_id=dict_["run_id"],
snapshot_date=pendulum.parse(snapshot_date),
url=url,
checksum=checksum,
cloud_workspace=CloudWorkspace.from_dict(cloud_workspace),
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
)