academic_observatory_workflows.workflows.unpaywall_telescope
Module Contents
Classes
Functions
|
|
|
Snapshot URL |
|
Get the Unpaywall snapshot filename. |
|
Data Feed URL |
|
|
|
|
|
Parses a release date from a file name. |
Attributes
- academic_observatory_workflows.workflows.unpaywall_telescope.SNAPSHOT_URL = 'https://api.unpaywall.org/feed/snapshot'[source]
- academic_observatory_workflows.workflows.unpaywall_telescope.CHANGEFILES_URL = 'https://api.unpaywall.org/feed/changefiles'[source]
- academic_observatory_workflows.workflows.unpaywall_telescope.CHANGEFILES_DOWNLOAD_URL = 'https://api.unpaywall.org/daily-feed/changefile'[source]
- class academic_observatory_workflows.workflows.unpaywall_telescope.Changefile(filename: str, changefile_date: pendulum.DateTime, changefile_release: observatory.platform.workflows.workflow.ChangefileRelease = None)[source]
-
- static from_dict(dict_: Dict) Changefile[source]
- class academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, is_first_run: bool, snapshot_date: pendulum.DateTime, start_date: pendulum.DateTime, end_date: pendulum.DateTime, changefiles: List[Changefile], prev_end_date: pendulum.DateTime)[source]
Bases:
observatory.platform.workflows.workflow.Release
- class academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'unpaywall', bq_table_name: str = 'unpaywall', api_dataset_id: str = 'unpaywall', schema_folder: str = os.path.join(default_schema_folder(), 'unpaywall'), dataset_description: str = 'Unpaywall Data Feed: https://unpaywall.org/products/data-feed', table_description: str = 'Unpaywall Data Feed: https://unpaywall.org/products/data-feed', primary_key: str = 'doi', snapshot_expiry_days: int = 7, http_header: str = None, unpaywall_conn_id: str = 'unpaywall', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 7, 2), schedule: str = '@daily')[source]
Bases:
observatory.platform.workflows.workflow.Workflow- fetch_releases(**kwargs) bool[source]
Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible for no changefiles to be found after the first run, in which case the rest of the tasks are skipped. Publish any available releases as an XCOM to avoid re-querying Unpaywall servers.
- Parameters:
kwargs – the context passed from the Airflow Operator.
See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: True to continue, False to skip.
- make_release(**kwargs) UnpaywallRelease[source]
Make a Release instance. Gets the list of releases available from the release check (setup task).
- create_datasets(release: UnpaywallRelease, **kwargs) None[source]
Create datasets.
- bq_create_main_table_snapshot(release: UnpaywallRelease, **kwargs) None[source]
Create a snapshot of the main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.
- download_snapshot(release: UnpaywallRelease, **kwargs)[source]
Downlaod the most recent Unpaywall snapshot.
- upload_downloaded_snapshot(release: UnpaywallRelease, **kwargs)[source]
Upload the downloaded snapshot for the given release.
- extract_snapshot(release: UnpaywallRelease, **kwargs)[source]
Gunzip the downloaded Unpaywall snapshot.
- transform_snapshot(release: UnpaywallRelease, **kwargs)[source]
Transform the snapshot into the main table file. Find and replace the ‘authenticated-orcid’ string in the jsonl to ‘authenticated_orcid’.
- upload_main_table_files(release: UnpaywallRelease, **kwargs) None[source]
Upload the main table files to Cloud Storage.
- bq_load_main_table(release: UnpaywallRelease, **kwargs) None[source]
Load the main table.
- download_change_files(release: UnpaywallRelease, **kwargs)[source]
Download the Unpaywall change files that are required for this release.
- upload_downloaded_change_files(release: UnpaywallRelease, **kwargs)[source]
Upload the downloaded changefiles for the given release.
- extract_change_files(release: UnpaywallRelease, **kwargs)[source]
Task to gunzip the downloaded Unpaywall changefiles.
- transform_change_files(release: UnpaywallRelease, **kwargs)[source]
Task to transform the Unpaywall changefiles merging them into the upsert file. Find and replace the ‘authenticated-orcid’ string in the jsonl to ‘authenticated_orcid’.
- upload_upsert_files(release: UnpaywallRelease, **kwargs) None[source]
Upload the transformed data to Cloud Storage. :raises AirflowException: Raised if the files to be uploaded are not found.
- bq_load_upsert_table(release: UnpaywallRelease, **kwargs) None[source]
Load the upsert table.
- bq_upsert_records(release: UnpaywallRelease, **kwargs) None[source]
Upsert the records from the upserts table into the main table.
- add_new_dataset_releases(release: UnpaywallRelease, **kwargs) None[source]
Adds release information to API.
- cleanup(release: UnpaywallRelease, **kwargs) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.workflows.unpaywall_telescope.snapshot_url(api_key: str) str[source]
Snapshot URL
- academic_observatory_workflows.workflows.unpaywall_telescope.get_snapshot_file_name(api_key: str) str[source]
Get the Unpaywall snapshot filename.
- Returns:
Snapshot file date.
- academic_observatory_workflows.workflows.unpaywall_telescope.changefiles_url(api_key: str) str[source]
Data Feed URL
- academic_observatory_workflows.workflows.unpaywall_telescope.changefile_download_url(filename: str, api_key: str)[source]
- academic_observatory_workflows.workflows.unpaywall_telescope.get_unpaywall_changefiles(api_key: str) List[Changefile][source]