academic_observatory_workflows.unpaywall_telescope.tasks
Attributes
Functions
|
Snapshot URL. |
|
Generate the changefile download url |
|
Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles |
|
Create a snapshot of the main table. The purpose of this table is to be able to rollback the table |
|
Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY. |
|
Uploads the downloaded snapshot file to the download bucket |
|
Extracts the downloaded snapshot file |
|
Transforms all of the snapshot files |
|
Splits the main snapshot table file into many smaller files |
|
Uploads the snapshot files to the transform bucket |
|
Loads the snapshot files into the main bigquery table |
|
Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY. |
|
Uploads the downloaded changefiles to the download bucket |
|
Extracts the downloaded changefiles |
|
Transforms all of the changefiles |
|
Uploads the downloaded changefiles to the download bucket |
|
Loads the changefiles into the 'upsert' bigquery table |
|
Upserts the changefiles into the main table from the upsert table |
|
Adds a release to the API dataset |
|
Cleans up the workflow, deleting folders and xcoms |
|
Transforms the unpaywall .jsonl file. Writes the transformed data to a new file |
|
Transforms a single unpwaywall row |
|
Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected. |
|
|
Get all changefiles from unpaywall |
|
|
Parses a release date from a file name, e.g. 2023-04-25T080001 |
Module Contents
- academic_observatory_workflows.unpaywall_telescope.tasks.UNPAYWALL_BASE_URL = 'https://api.unpaywall.org'[source]
- academic_observatory_workflows.unpaywall_telescope.tasks.SNAPSHOT_URL = 'https://api.unpaywall.org/feed/snapshot'[source]
- academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_URL = 'https://api.unpaywall.org/feed/changefiles'[source]
- academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_DOWNLOAD_URL = 'https://api.unpaywall.org/daily-feed/changefile'[source]
- academic_observatory_workflows.unpaywall_telescope.tasks.snapshot_url(base_url: str, api_key: str) str[source]
Snapshot URL.
- Parameters:
base_url – The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.
api_key – The api key for the unpaywall feed api.
- academic_observatory_workflows.unpaywall_telescope.tasks.changefile_download_url(base_url: str, changefile: str, api_key: str)[source]
Generate the changefile download url
- Parameters:
base_url – The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.
api_key – The api key for the unpaywall feed api.
- academic_observatory_workflows.unpaywall_telescope.tasks.fetch_release(dag_id: str, run_id: str, dag_run: airflow.models.DagRun, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, api_bq_dataset_id: str, unpaywall_conn_id: str, base_url: str) str | None[source]
Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible for no changefiles to be found after the first run, in which case the rest of the tasks are skipped.
Release will be published to bigquery with a unique identifier.
- Parameters:
dag_id – The ID of the dag running
run_id – The ID of the dag run
dag_run – The DagRun object
cloud_workspace – The CloudWorkspace Object
bq_dataset_id – The bigquery dataset id
bq_table_name – The bigquery table name
api_bq_dataset_id – The name of the api dataset
unpaywall_conn_id – The airflow connection ID for unpaywall
base_url – The unpaywall base url
- Returns:
None if there are no release files to be processed, otherwise the release ID
- academic_observatory_workflows.unpaywall_telescope.tasks.bq_create_main_table_snapshot(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, snapshot_expiry_days: int) None[source]
Create a snapshot of the main table. The purpose of this table is to be able to rollback the table if something goes wrong.
- Parameters:
release_id – The unique ID assigned to the release object. Used for retrieval from the download bucket.
cloud_workspace – The CloudWorkspace object
snapshot_expiry_days – Number of days before the created snapshot is automatically deleted.
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: dict | None = None)[source]
Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.
- Parameters:
base_url – The unpaywall base url. e.g. https://api.unpaywall.org
http_header – A dictionary of headers to send with the http request
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Uploads the downloaded snapshot file to the download bucket
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Extracts the downloaded snapshot file
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Transforms all of the snapshot files
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_split_main_table_file(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, **context)[source]
Splits the main snapshot table file into many smaller files
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_main_table_files(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Uploads the snapshot files to the transform bucket
- academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str)[source]
Loads the snapshot files into the main bigquery table
- Parameters:
schema_file_path – The file path of the schema file to use
table_description – The description to give the main table
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: str | None = None)[source]
Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.
- Parameters:
base_url – The unpaywall base url. e.g. https://api.unpaywall.org
http_header – A dictionary of headers to send with the http request
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Uploads the downloaded changefiles to the download bucket
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Extracts the downloaded changefiles
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str)[source]
Transforms all of the changefiles
- Parameters:
primary_key – The primary key to use for merging the changefiles
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Uploads the downloaded changefiles to the download bucket
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str)[source]
Loads the changefiles into the ‘upsert’ bigquery table
- Parameters:
schema_file_path – The file path of the schema file to use
table_description – The description to give the main table
- academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_upsert(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str)[source]
Upserts the changefiles into the main table from the upsert table
- Parameters:
primary_key – A single key or a list of keys to use to determine which records to upsert.
- academic_observatory_workflows.unpaywall_telescope.tasks.add_dataset_release(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, api_bq_dataset_id: str)[source]
Adds a release to the API dataset
- Parameters:
api_bq_dataset_id – The ID of the API dataset
- academic_observatory_workflows.unpaywall_telescope.tasks.cleanup_workflow(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace)[source]
Cleans up the workflow, deleting folders and xcoms
- academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_file(input_file: str, output_file: str) None[source]
Transforms the unpaywall .jsonl file. Writes the transformed data to a new file
- Parameters:
input_file – The unpaywall file to transform
output_file – The file to write to
- academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_row(row: dict) dict[source]
Transforms a single unpwaywall row
- Parameters:
row – The unpaywall entry to transform
return – The transformed row
- academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name(base_url: str, api_key: str) str[source]
Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected.
- Parameters:
base_url – The Unpaywall feed base url
api_key – The Unpaywall feed api key
- Returns:
Snapshot file date.
- academic_observatory_workflows.unpaywall_telescope.tasks.changefiles_url(base_url: str, api_key: str)[source]
- academic_observatory_workflows.unpaywall_telescope.tasks.get_unpaywall_changefiles(url: str) List[academic_observatory_workflows.unpaywall_telescope.release.Changefile][source]
Get all changefiles from unpaywall