academic_observatory_workflows.unpaywall_telescope.tasks ======================================================== .. py:module:: academic_observatory_workflows.unpaywall_telescope.tasks Attributes ---------- .. autoapisummary:: academic_observatory_workflows.unpaywall_telescope.tasks.UNPAYWALL_BASE_URL academic_observatory_workflows.unpaywall_telescope.tasks.SNAPSHOT_URL academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_URL academic_observatory_workflows.unpaywall_telescope.tasks.CHANGEFILES_DOWNLOAD_URL Functions --------- .. autoapisummary:: academic_observatory_workflows.unpaywall_telescope.tasks.snapshot_url academic_observatory_workflows.unpaywall_telescope.tasks.changefile_download_url academic_observatory_workflows.unpaywall_telescope.tasks.fetch_release academic_observatory_workflows.unpaywall_telescope.tasks.bq_create_main_table_snapshot academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_download academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_downloaded academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_extract academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_transform academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_split_main_table_file academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_upload_main_table_files academic_observatory_workflows.unpaywall_telescope.tasks.load_snapshot_bq_load academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_download academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload_downloaded academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_extract academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_transform academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_upload academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_load academic_observatory_workflows.unpaywall_telescope.tasks.load_changefiles_bq_upsert academic_observatory_workflows.unpaywall_telescope.tasks.add_dataset_release academic_observatory_workflows.unpaywall_telescope.tasks.cleanup_workflow academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_file academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_transform_row academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name academic_observatory_workflows.unpaywall_telescope.tasks.changefiles_url academic_observatory_workflows.unpaywall_telescope.tasks.get_unpaywall_changefiles academic_observatory_workflows.unpaywall_telescope.tasks.unpaywall_filename_to_datetime Module Contents --------------- .. py:data:: UNPAYWALL_BASE_URL :value: 'https://api.unpaywall.org' .. py:data:: SNAPSHOT_URL :value: 'https://api.unpaywall.org/feed/snapshot' .. py:data:: CHANGEFILES_URL :value: 'https://api.unpaywall.org/feed/changefiles' .. py:data:: CHANGEFILES_DOWNLOAD_URL :value: 'https://api.unpaywall.org/daily-feed/changefile' .. py:function:: snapshot_url(base_url: str, api_key: str) -> str Snapshot URL. :param base_url: The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes. :param api_key: The api key for the unpaywall feed api. .. py:function:: changefile_download_url(base_url: str, changefile: str, api_key: str) Generate the changefile download url :param base_url: The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes. :param api_key: The api key for the unpaywall feed api. .. py:function:: fetch_release(dag_id: str, run_id: str, dag_run: airflow.models.DagRun, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, api_bq_dataset_id: str, unpaywall_conn_id: str, base_url: str) -> str | None Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible for no changefiles to be found after the first run, in which case the rest of the tasks are skipped. Release will be published to bigquery with a unique identifier. :param dag_id: The ID of the dag running :param run_id: The ID of the dag run :param dag_run: The DagRun object :param cloud_workspace: The CloudWorkspace Object :param bq_dataset_id: The bigquery dataset id :param bq_table_name: The bigquery table name :param api_bq_dataset_id: The name of the api dataset :param unpaywall_conn_id: The airflow connection ID for unpaywall :param base_url: The unpaywall base url :return: None if there are no release files to be processed, otherwise the release ID .. py:function:: bq_create_main_table_snapshot(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, snapshot_expiry_days: int) -> None Create a snapshot of the main table. The purpose of this table is to be able to rollback the table if something goes wrong. :param release_id: The unique ID assigned to the release object. Used for retrieval from the download bucket. :param cloud_workspace: The CloudWorkspace object :param snapshot_expiry_days: Number of days before the created snapshot is automatically deleted. .. py:function:: load_snapshot_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: Optional[dict] = None) Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY. :param base_url: The unpaywall base url. e.g. https://api.unpaywall.org :param http_header: A dictionary of headers to send with the http request .. py:function:: load_snapshot_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Uploads the downloaded snapshot file to the download bucket .. py:function:: load_snapshot_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Extracts the downloaded snapshot file .. py:function:: load_snapshot_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Transforms all of the snapshot files .. py:function:: load_snapshot_split_main_table_file(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, **context) Splits the main snapshot table file into many smaller files .. py:function:: load_snapshot_upload_main_table_files(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Uploads the snapshot files to the transform bucket .. py:function:: load_snapshot_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str) Loads the snapshot files into the main bigquery table :param schema_file_path: The file path of the schema file to use :param table_description: The description to give the main table .. py:function:: load_changefiles_download(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, base_url: str, http_header: Optional[str] = None) Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY. :param base_url: The unpaywall base url. e.g. https://api.unpaywall.org :param http_header: A dictionary of headers to send with the http request .. py:function:: load_changefiles_upload_downloaded(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Uploads the downloaded changefiles to the download bucket .. py:function:: load_changefiles_extract(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Extracts the downloaded changefiles .. py:function:: load_changefiles_transform(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str) Transforms all of the changefiles :param primary_key: The primary key to use for merging the changefiles .. py:function:: load_changefiles_upload(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Uploads the downloaded changefiles to the download bucket .. py:function:: load_changefiles_bq_load(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_file_path: str, table_description: str) Loads the changefiles into the 'upsert' bigquery table :param schema_file_path: The file path of the schema file to use :param table_description: The description to give the main table .. py:function:: load_changefiles_bq_upsert(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, primary_key: str) Upserts the changefiles into the main table from the upsert table :param primary_key: A single key or a list of keys to use to determine which records to upsert. .. py:function:: add_dataset_release(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, api_bq_dataset_id: str) Adds a release to the API dataset :param api_bq_dataset_id: The ID of the API dataset .. py:function:: cleanup_workflow(release_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace) Cleans up the workflow, deleting folders and xcoms .. py:function:: unpaywall_transform_file(input_file: str, output_file: str) -> None Transforms the unpaywall .jsonl file. Writes the transformed data to a new file :param input_file: The unpaywall file to transform :param output_file: The file to write to .. py:function:: unpaywall_transform_row(row: dict) -> dict Transforms a single unpwaywall row :param row: The unpaywall entry to transform :param return: The transformed row .. py:function:: get_snapshot_file_name(base_url: str, api_key: str) -> str Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected. :param base_url: The Unpaywall feed base url :param api_key: The Unpaywall feed api key :return: Snapshot file date. .. py:function:: changefiles_url(base_url: str, api_key: str) .. py:function:: get_unpaywall_changefiles(url: str) -> List[academic_observatory_workflows.unpaywall_telescope.release.Changefile] Get all changefiles from unpaywall .. py:function:: unpaywall_filename_to_datetime(file_name: str) -> pendulum.DateTime Parses a release date from a file name, e.g. 2023-04-25T080001 :param file_name: Unpaywall release file name (contains date string). :return: date.