academic_observatory_workflows.workflows.unpaywall_telescope

Module Contents

Classes

Changefile

UnpaywallRelease

UnpaywallTelescope

Functions

parse_release_msg(msg)

snapshot_url(→ str)

Snapshot URL

get_snapshot_file_name(→ str)

Get the Unpaywall snapshot filename.

changefiles_url(→ str)

Data Feed URL

changefile_download_url(filename, api_key)

get_unpaywall_changefiles(→ List[Changefile])

unpaywall_filename_to_datetime(→ pendulum.DateTime)

Parses a release date from a file name.

Attributes

SNAPSHOT_URL

CHANGEFILES_URL

CHANGEFILES_DOWNLOAD_URL

academic_observatory_workflows.workflows.unpaywall_telescope.SNAPSHOT_URL = 'https://api.unpaywall.org/feed/snapshot'[source]
academic_observatory_workflows.workflows.unpaywall_telescope.CHANGEFILES_URL = 'https://api.unpaywall.org/feed/changefiles'[source]
academic_observatory_workflows.workflows.unpaywall_telescope.CHANGEFILES_DOWNLOAD_URL = 'https://api.unpaywall.org/daily-feed/changefile'[source]
class academic_observatory_workflows.workflows.unpaywall_telescope.Changefile(filename: str, changefile_date: pendulum.DateTime, changefile_release: observatory.platform.workflows.workflow.ChangefileRelease = None)[source]
property download_file_path[source]
property extract_file_path[source]
property transform_file_path[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) Changefile[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, is_first_run: bool, snapshot_date: pendulum.DateTime, start_date: pendulum.DateTime, end_date: pendulum.DateTime, changefiles: List[Changefile], prev_end_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.Release

class academic_observatory_workflows.workflows.unpaywall_telescope.UnpaywallTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'unpaywall', bq_table_name: str = 'unpaywall', api_dataset_id: str = 'unpaywall', schema_folder: str = os.path.join(default_schema_folder(), 'unpaywall'), dataset_description: str = 'Unpaywall Data Feed: https://unpaywall.org/products/data-feed', table_description: str = 'Unpaywall Data Feed: https://unpaywall.org/products/data-feed', primary_key: str = 'doi', snapshot_expiry_days: int = 7, http_header: str = None, unpaywall_conn_id: str = 'unpaywall', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 7, 2), schedule: str = '@daily')[source]

Bases: observatory.platform.workflows.workflow.Workflow

property api_key: str[source]

The API key for accessing Unpaywall.

fetch_releases(**kwargs) bool[source]

Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible for no changefiles to be found after the first run, in which case the rest of the tasks are skipped. Publish any available releases as an XCOM to avoid re-querying Unpaywall servers.

Parameters:

kwargs – the context passed from the Airflow Operator.

See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: True to continue, False to skip.

make_release(**kwargs) UnpaywallRelease[source]

Make a Release instance. Gets the list of releases available from the release check (setup task).

create_datasets(release: UnpaywallRelease, **kwargs) None[source]

Create datasets.

bq_create_main_table_snapshot(release: UnpaywallRelease, **kwargs) None[source]

Create a snapshot of the main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.

download_snapshot(release: UnpaywallRelease, **kwargs)[source]

Downlaod the most recent Unpaywall snapshot.

upload_downloaded_snapshot(release: UnpaywallRelease, **kwargs)[source]

Upload the downloaded snapshot for the given release.

extract_snapshot(release: UnpaywallRelease, **kwargs)[source]

Gunzip the downloaded Unpaywall snapshot.

transform_snapshot(release: UnpaywallRelease, **kwargs)[source]

Transform the snapshot into the main table file. Find and replace the ‘authenticated-orcid’ string in the jsonl to ‘authenticated_orcid’.

upload_main_table_files(release: UnpaywallRelease, **kwargs) None[source]

Upload the main table files to Cloud Storage.

bq_load_main_table(release: UnpaywallRelease, **kwargs) None[source]

Load the main table.

download_change_files(release: UnpaywallRelease, **kwargs)[source]

Download the Unpaywall change files that are required for this release.

upload_downloaded_change_files(release: UnpaywallRelease, **kwargs)[source]

Upload the downloaded changefiles for the given release.

extract_change_files(release: UnpaywallRelease, **kwargs)[source]

Task to gunzip the downloaded Unpaywall changefiles.

transform_change_files(release: UnpaywallRelease, **kwargs)[source]

Task to transform the Unpaywall changefiles merging them into the upsert file. Find and replace the ‘authenticated-orcid’ string in the jsonl to ‘authenticated_orcid’.

upload_upsert_files(release: UnpaywallRelease, **kwargs) None[source]

Upload the transformed data to Cloud Storage. :raises AirflowException: Raised if the files to be uploaded are not found.

bq_load_upsert_table(release: UnpaywallRelease, **kwargs) None[source]

Load the upsert table.

bq_upsert_records(release: UnpaywallRelease, **kwargs) None[source]

Upsert the records from the upserts table into the main table.

add_new_dataset_releases(release: UnpaywallRelease, **kwargs) None[source]

Adds release information to API.

cleanup(release: UnpaywallRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.workflows.unpaywall_telescope.parse_release_msg(msg: Dict)[source]
academic_observatory_workflows.workflows.unpaywall_telescope.snapshot_url(api_key: str) str[source]

Snapshot URL

academic_observatory_workflows.workflows.unpaywall_telescope.get_snapshot_file_name(api_key: str) str[source]

Get the Unpaywall snapshot filename.

Returns:

Snapshot file date.

academic_observatory_workflows.workflows.unpaywall_telescope.changefiles_url(api_key: str) str[source]

Data Feed URL

academic_observatory_workflows.workflows.unpaywall_telescope.changefile_download_url(filename: str, api_key: str)[source]
academic_observatory_workflows.workflows.unpaywall_telescope.get_unpaywall_changefiles(api_key: str) List[Changefile][source]
academic_observatory_workflows.workflows.unpaywall_telescope.unpaywall_filename_to_datetime(file_name: str) pendulum.DateTime[source]

Parses a release date from a file name.

Parameters:

file_name – Unpaywall release file name (contains date string).

Returns:

date.