academic_observatory_workflows.unpaywall_telescope.tasks

Attributes

UNPAYWALL_BASE_URL

SNAPSHOT_URL

CHANGEFILES_URL

CHANGEFILES_DOWNLOAD_URL

Functions

snapshot_url(→ str)

Snapshot URL.

changefile_download_url(base_url, changefile, api_key)

Generate the changefile download url

fetch_release(→ str | None)

Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles

bq_create_main_table_snapshot(→ None)

Create a snapshot of the main table. The purpose of this table is to be able to rollback the table

load_snapshot_download(release_id, cloud_workspace, ...)

Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.

load_snapshot_upload_downloaded(release_id, ...)

Uploads the downloaded snapshot file to the download bucket

load_snapshot_extract(release_id, cloud_workspace)

Extracts the downloaded snapshot file

load_snapshot_transform(release_id, cloud_workspace)

Transforms all of the snapshot files

load_snapshot_split_main_table_file(release_id, ...)

Splits the main snapshot table file into many smaller files

load_snapshot_upload_main_table_files(release_id, ...)

Uploads the snapshot files to the transform bucket

load_snapshot_bq_load(release_id, cloud_workspace, ...)

Loads the snapshot files into the main bigquery table

load_changefiles_download(release_id, cloud_workspace, ...)

Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.

load_changefiles_upload_downloaded(release_id, ...)

Uploads the downloaded changefiles to the download bucket

load_changefiles_extract(release_id, cloud_workspace)

Extracts the downloaded changefiles

load_changefiles_transform(release_id, ...)

Transforms all of the changefiles

load_changefiles_upload(release_id, cloud_workspace)

Uploads the downloaded changefiles to the download bucket

load_changefiles_bq_load(release_id, cloud_workspace, ...)

Loads the changefiles into the 'upsert' bigquery table

load_changefiles_bq_upsert(release_id, ...)

Upserts the changefiles into the main table from the upsert table

add_dataset_release(release_id, cloud_workspace, ...)

Adds a release to the API dataset

cleanup_workflow(release_id, cloud_workspace)

Cleans up the workflow, deleting folders and xcoms

unpaywall_transform_file(→ None)

Transforms the unpaywall .jsonl file. Writes the transformed data to a new file

unpaywall_transform_row(→ dict)

Transforms a single unpwaywall row

get_snapshot_file_name(→ str)

Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected.

changefiles_url(base_url, api_key)

get_unpaywall_changefiles(...)

Get all changefiles from unpaywall

unpaywall_filename_to_datetime(→ pendulum.DateTime)

Parses a release date from a file name, e.g. 2023-04-25T080001

Module Contents

academic_observatory_workflows.unpaywall_telescope.tasks.UNPAYWALL_BASE_URL = 'https://api.unpaywall.org'[source]
academic_observatory_workflows.unpaywall_telescope.tasks.SNAPSHOT_URL = 'https://api.unpaywall.org/feed/snapshot'[source]
academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_URL = 'https://api.unpaywall.org/feed/changefiles'[source]
academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_DOWNLOAD_URL = 'https://api.unpaywall.org/daily-feed/changefile'[source]
academic_observatory_workflows.unpaywall_telescope.tasks.snapshot_url(base_url: str, api_key: str) str[source]

Snapshot URL.

Parameters:
  • base_url – The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.

  • api_key – The api key for the unpaywall feed api.

academic_observatory_workflows.unpaywall_telescope.tasks.changefile_download_url(base_url: str, changefile: str, api_key: str)[source]

Generate the changefile download url

Parameters:
  • base_url – The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.

  • api_key – The api key for the unpaywall feed api.

academic_observatory_workflows.unpaywall_telescope.tasks.fetch_release(dag_id: str, run_id: str, dag_run: airflow.models.DagRun, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, api_bq_dataset_id: str, unpaywall_conn_id: str, base_url: str) str | None[source]

Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible for no changefiles to be found after the first run, in which case the rest of the tasks are skipped.

Release will be published to bigquery with a unique identifier.

Parameters:
  • dag_id – The ID of the dag running

  • run_id – The ID of the dag run

  • dag_run – The DagRun object

  • cloud_workspace – The CloudWorkspace Object

  • bq_dataset_id – The bigquery dataset id

  • bq_table_name – The bigquery table name

  • api_bq_dataset_id – The name of the api dataset

  • unpaywall_conn_id – The airflow connection ID for unpaywall

  • base_url – The unpaywall base url

Returns:

None if there are no release files to be processed, otherwise the release ID

academic_observatory_workflows.unpaywall_telescope.tasks.bq_create_main_table_snapshot(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, snapshot_expiry_days: int) None[source]

Create a snapshot of the main table. The purpose of this table is to be able to rollback the table if something goes wrong.

Parameters:
  • release_id – The unique ID assigned to the release object. Used for retrieval from the download bucket.

  • cloud_workspace – The CloudWorkspace object

  • snapshot_expiry_days – Number of days before the created snapshot is automatically deleted.

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: dict | None = None)[source]

Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.

Parameters:
  • base_url – The unpaywall base url. e.g. https://api.unpaywall.org

  • http_header – A dictionary of headers to send with the http request

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Uploads the downloaded snapshot file to the download bucket

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Extracts the downloaded snapshot file

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Transforms all of the snapshot files

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_split_main_table_file(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, **context)[source]

Splits the main snapshot table file into many smaller files

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_main_table_files(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Uploads the snapshot files to the transform bucket

academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str)[source]

Loads the snapshot files into the main bigquery table

Parameters:
  • schema_file_path – The file path of the schema file to use

  • table_description – The description to give the main table

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: str | None = None)[source]

Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.

Parameters:
  • base_url – The unpaywall base url. e.g. https://api.unpaywall.org

  • http_header – A dictionary of headers to send with the http request

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Uploads the downloaded changefiles to the download bucket

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Extracts the downloaded changefiles

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str)[source]

Transforms all of the changefiles

Parameters:

primary_key – The primary key to use for merging the changefiles

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Uploads the downloaded changefiles to the download bucket

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str)[source]

Loads the changefiles into the ‘upsert’ bigquery table

Parameters:
  • schema_file_path – The file path of the schema file to use

  • table_description – The description to give the main table

academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_upsert(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str)[source]

Upserts the changefiles into the main table from the upsert table

Parameters:

primary_key – A single key or a list of keys to use to determine which records to upsert.

academic_observatory_workflows.unpaywall_telescope.tasks.add_dataset_release(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, api_bq_dataset_id: str)[source]

Adds a release to the API dataset

Parameters:

api_bq_dataset_id – The ID of the API dataset

academic_observatory_workflows.unpaywall_telescope.tasks.cleanup_workflow(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]

Cleans up the workflow, deleting folders and xcoms

academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_file(input_file: str, output_file: str) None[source]

Transforms the unpaywall .jsonl file. Writes the transformed data to a new file

Parameters:
  • input_file – The unpaywall file to transform

  • output_file – The file to write to

academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_row(row: dict) dict[source]

Transforms a single unpwaywall row

Parameters:
  • row – The unpaywall entry to transform

  • return – The transformed row

academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name(base_url: str, api_key: str) str[source]

Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected.

Parameters:
  • base_url – The Unpaywall feed base url

  • api_key – The Unpaywall feed api key

Returns:

Snapshot file date.

academic_observatory_workflows.unpaywall_telescope.tasks.changefiles_url(base_url: str, api_key: str)[source]
academic_observatory_workflows.unpaywall_telescope.tasks.get_unpaywall_changefiles(url: str) List[academic_observatory_workflows.unpaywall_telescope.release.Changefile][source]

Get all changefiles from unpaywall

academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_filename_to_datetime(file_name: str) pendulum.DateTime[source]

Parses a release date from a file name, e.g. 2023-04-25T080001

Parameters:

file_name – Unpaywall release file name (contains date string).

Returns:

date.