# Copyright 2020-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Tuan Chien, James Diprose
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor
import datetime
import logging
import os
import re
import requests
from typing import List, Any, Optional
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
from airflow.models import DagRun
from airflow.operators.bash import BashOperator
from google.cloud import bigquery
from google.cloud.bigquery import SourceFormat
import jsonlines
import pendulum
from academic_observatory_workflows.unpaywall_telescope.release import Changefile, UnpaywallRelease
from observatory_platform.airflow.airflow import get_airflow_connection_password, is_first_dag_run
from observatory_platform.airflow.release import release_from_bucket, release_to_bucket
from observatory_platform.airflow.workflow import cleanup, CloudWorkspace
from observatory_platform.dataset_api import DatasetAPI, DatasetRelease
from observatory_platform.files import clean_dir, gunzip_files, list_files, merge_update_files, yield_jsonl
from observatory_platform.google.bigquery import bq_load_table, bq_snapshot, bq_upsert_records
from observatory_platform.google.gcs import gcs_upload_files
from observatory_platform.http_download import download_file, download_files, DownloadInfo
from observatory_platform.url_utils import get_http_response_json
# See https://unpaywall.org/products/data-feed for details of available APIs
[docs]UNPAYWALL_BASE_URL = "https://api.unpaywall.org"
[docs]SNAPSHOT_URL = "https://api.unpaywall.org/feed/snapshot"
[docs]CHANGEFILES_URL = "https://api.unpaywall.org/feed/changefiles"
[docs]CHANGEFILES_DOWNLOAD_URL = "https://api.unpaywall.org/daily-feed/changefile"
[docs]def snapshot_url(base_url: str, api_key: str) -> str:
"""Snapshot URL.
:param base_url: The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.
:param api_key: The api key for the unpaywall feed api.
"""
return f"{base_url}/feed/snapshot?api_key={api_key}"
[docs]def changefile_download_url(base_url: str, changefile: str, api_key: str):
"""Generate the changefile download url
:param base_url: The base url. e.g. https://api.unpaywall.org. Can be changed for testing purposes.
:param api_key: The api key for the unpaywall feed api.
"""
return f"{base_url}/daily-feed/changefile/{changefile}?api_key={api_key}"
[docs]def fetch_release(
dag_id: str,
run_id: str,
dag_run: DagRun,
cloud_workspace: CloudWorkspace,
bq_dataset_id: str,
bq_table_name: str,
api_bq_dataset_id: str,
unpaywall_conn_id: str,
base_url: str,
) -> str | None:
"""Fetches the release information. On the first DAG run gets the latest snapshot and the necessary changefiles
required to get the dataset up to date. On subsequent runs it fetches unseen changefiles. It is possible
for no changefiles to be found after the first run, in which case the rest of the tasks are skipped.
Release will be published to bigquery with a unique identifier.
:param dag_id: The ID of the dag running
:param run_id: The ID of the dag run
:param dag_run: The DagRun object
:param cloud_workspace: The CloudWorkspace Object
:param bq_dataset_id: The bigquery dataset id
:param bq_table_name: The bigquery table name
:param api_bq_dataset_id: The name of the api dataset
:param unpaywall_conn_id: The airflow connection ID for unpaywall
:param base_url: The unpaywall base url
:return: None if there are no release files to be processed, otherwise the release ID
"""
api = DatasetAPI(bq_project_id=cloud_workspace.project_id, bq_dataset_id=api_bq_dataset_id)
prev_release = api.get_dataset_releases(dag_id=dag_id, entity_id="unpaywall", limit=1)
# Get Unpaywall changefiles and sort from newest to oldest
api_key = get_airflow_connection_password(unpaywall_conn_id)
all_changefiles = get_unpaywall_changefiles(changefiles_url(base_url, api_key))
all_changefiles.sort(key=lambda c: c.changefile_date, reverse=True)
logging.info(f"fetch_release: {len(all_changefiles)} JSONL changefiles discovered")
changefiles = []
is_first_run = is_first_dag_run(dag_run)
prev_end_date = pendulum.instance(datetime.datetime.min)
if is_first_run:
assert (
len(prev_release) == 0
), "fetch_release: there should be no DatasetReleases stored in the Observatory API on the first DAG run."
# Get snapshot date as this is used to determine what changefile to get
snapshot_file_name = get_snapshot_file_name(base_url, api_key)
snapshot_date = unpaywall_filename_to_datetime(snapshot_file_name)
# On first run, add changefiles from present until the changefile before the snapshot_date
# As per Unpaywall changefiles documentation: https://unpaywall.org/products/data-feed/changefiles
for changefile in all_changefiles:
changefiles.append(changefile)
if changefile.changefile_date < snapshot_date:
break
# Assert that there is at least 1 changefile
assert len(changefiles) >= 1, f"fetch_release: there should be at least 1 changefile when loading a snapshot"
else:
assert (
len(prev_release) >= 1
), f"fetch_release: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run"
# On subsequent runs, fetch changefiles from after the previous changefile date
snapshot_date = pendulum.instance(
prev_release[0].snapshot_date
) # so that we can easily see what snapshot is being used
prev_end_date = pendulum.instance(prev_release[0].changefile_end_date)
for changefile in all_changefiles:
if prev_end_date < changefile.changefile_date:
changefiles.append(changefile)
# Sort from oldest to newest
changefiles.sort(key=lambda c: c.changefile_date, reverse=False)
if len(changefiles) == 0:
msg = "fetch_release: no changefiles found, skipping"
logging.info(msg)
return
# Print summary information
logging.info(f"is_first_run: {is_first_run}")
logging.info(f"snapshot_date: {snapshot_date}")
logging.info(f"changefiles: {changefiles}")
logging.info(f"prev_end_date: {prev_end_date}")
id = release_to_bucket(
UnpaywallRelease(
dag_id=dag_id,
run_id=run_id,
cloud_workspace=cloud_workspace,
bq_dataset_id=bq_dataset_id,
bq_table_name=bq_table_name,
is_first_run=is_first_run,
snapshot_date=snapshot_date,
changefiles=changefiles,
prev_end_date=prev_end_date,
).to_dict(),
cloud_workspace.download_bucket,
)
return id
[docs]def bq_create_main_table_snapshot(release_id: str, cloud_workspace: CloudWorkspace, snapshot_expiry_days: int) -> None:
"""Create a snapshot of the main table. The purpose of this table is to be able to rollback the table
if something goes wrong.
:param release_id: The unique ID assigned to the release object. Used for retrieval from the download bucket.
:param cloud_workspace: The CloudWorkspace object
:param snapshot_expiry_days: Number of days before the created snapshot is automatically deleted.
"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
if release.is_first_run:
msg = f"bq_create_main_table_snapshots: skipping as snapshots are not created on the first run"
logging.info(msg)
return
expiry_date = pendulum.now().add(days=snapshot_expiry_days)
success = bq_snapshot(
src_table_id=release.bq_main_table_id,
dst_table_id=release.bq_snapshot_table_id,
expiry_date=expiry_date,
)
if not success:
raise AirflowException("bq_create_main_table_snapshot: failed to create BigQuery snapshot")
[docs]def load_snapshot_download(
release_id: str,
cloud_workspace: CloudWorkspace,
base_url: str,
http_header: Optional[dict] = None,
):
"""Downloads the snapshot file. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.
:param base_url: The unpaywall base url. e.g. https://api.unpaywall.org
:param http_header: A dictionary of headers to send with the http request
"""
# Clean all files
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
clean_dir(release.snapshot_release.download_folder)
# Download the most recent Unpaywall snapshot
# Use a read buffer size of 8MiB as we are downloading a large file
api_key = os.environ.get("UNPAYWALL_API_KEY")
if not api_key:
raise AirflowException("API key 'UNPAYWALL_API_KEY' not found")
# Assert that the date on the filename matches the snapshot date stored in the release object as there is a
# small chance that the snapshot changed between when we collated the releases and when we downloaded the snapshot
snapshot_date = unpaywall_filename_to_datetime(get_snapshot_file_name(base_url, api_key))
if not release.snapshot_release.snapshot_date == snapshot_date:
raise AirflowException(
f"download: release snapshot_date {release.snapshot_release.snapshot_date} != snapshot_date of current snapshot file {snapshot_date}. This can happen because the snapshot was updated between fetch_release() and download()"
)
success, download_info = download_file(
url=snapshot_url(base_url, api_key),
headers=http_header,
prefix_dir=release.snapshot_release.download_folder,
read_buffer_size=2**23,
)
if not success:
raise AirflowException("download: failed to download snapshot")
# Rename file so that it is easier to deal with
os.rename(download_info.file_path, release.snapshot_download_file_path)
[docs]def load_snapshot_upload_downloaded(release_id: str, cloud_workspace: CloudWorkspace):
"""Uploads the downloaded snapshot file to the download bucket"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
success = gcs_upload_files(
bucket_name=cloud_workspace.download_bucket,
file_paths=[release.snapshot_download_file_path],
)
if not success:
raise AirflowException("gcs_upload_files: failed to upload snapshot")
[docs]def load_snapshot_split_main_table_file(release_id: str, cloud_workspace: CloudWorkspace, **context):
"""Splits the main snapshot table file into many smaller files"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
op = BashOperator(
task_id="split_main_table_file",
bash_command=f"cd { release.snapshot_release.transform_folder } && split -C 4G --numeric-suffixes=1 --suffix-length=12 --additional-suffix=.jsonl main_table.jsonl main_table",
do_xcom_push=False,
)
op.execute(context)
[docs]def load_snapshot_upload_main_table_files(release_id: str, cloud_workspace: CloudWorkspace):
"""Uploads the snapshot files to the transform bucket"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
files_list = list_files(release.snapshot_release.transform_folder, release.main_table_files_regex)
success = gcs_upload_files(bucket_name=cloud_workspace.transform_bucket, file_paths=files_list)
if not success:
raise AirflowException(f"upload_main_table_files: failed to upload main table files")
[docs]def load_snapshot_bq_load(
release_id: str, cloud_workspace: CloudWorkspace, schema_file_path: str, table_description: str
):
"""Loads the snapshot files into the main bigquery table
:param schema_file_path: The file path of the schema file to use
:param table_description: The description to give the main table
"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
success = bq_load_table(
uri=release.main_table_uri,
table_id=release.bq_main_table_id,
schema_file_path=schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
table_description=table_description,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=True,
)
if not success:
raise AirflowException("bq_load: failed to load main table")
[docs]def load_changefiles_download(
release_id: str,
cloud_workspace: CloudWorkspace,
base_url: str,
http_header: Optional[str] = None,
):
"""Downloads the changefiles. Expects the API key to be set in the environment as UNPAWAYWALL_API_KEY.
:param base_url: The unpaywall base url. e.g. https://api.unpaywall.org
:param http_header: A dictionary of headers to send with the http request
"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
clean_dir(release.changefile_release.download_folder)
api_key = os.environ.get("UNPAYWALL_API_KEY")
if not api_key:
raise AirflowException("API key 'UNPAYWALL_API_KEY' not found")
download_list = []
for changefile in release.changefiles:
url = changefile_download_url(base_url, changefile.filename, api_key)
# TODO: it is a bit confusing that you have to set prefix_dir and filename, but can't just directly set filepath
download_list.append(
DownloadInfo(
url=url,
filename=changefile.filename,
prefix_dir=release.changefile_release.download_folder,
retry=True,
)
)
download_files(download_list=download_list, headers=http_header)
[docs]def load_changefiles_upload_downloaded(release_id: str, cloud_workspace: CloudWorkspace):
"""Uploads the downloaded changefiles to the download bucket"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
files_list = [changefile.download_file_path for changefile in release.changefiles]
success = gcs_upload_files(bucket_name=cloud_workspace.download_bucket, file_paths=files_list)
if not success:
raise AirflowException("upload_downloaded: failed to upload downloaded changefiles")
[docs]def load_changefiles_upload(release_id: str, cloud_workspace: CloudWorkspace):
"""Uploads the downloaded changefiles to the download bucket"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
success = gcs_upload_files(
bucket_name=cloud_workspace.transform_bucket, file_paths=[release.upsert_table_file_path]
)
if not success:
raise AirflowException("upload: failed to upload upsert files")
[docs]def load_changefiles_bq_load(
release_id: str, cloud_workspace: CloudWorkspace, schema_file_path: str, table_description: str
):
"""Loads the changefiles into the 'upsert' bigquery table
:param schema_file_path: The file path of the schema file to use
:param table_description: The description to give the main table
"""
# Will overwrite any existing upsert table
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
success = bq_load_table(
uri=release.upsert_table_uri,
table_id=release.bq_upsert_table_id,
schema_file_path=schema_file_path,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
table_description=table_description,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
ignore_unknown_values=True,
)
if not success:
raise AirflowException("bq_load: failed to load upsert table")
[docs]def load_changefiles_bq_upsert(release_id: str, cloud_workspace: CloudWorkspace, primary_key: str):
"""Upserts the changefiles into the main table from the upsert table
:param primary_key: A single key or a list of keys to use to determine which records to upsert.
"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
bq_upsert_records(
main_table_id=release.bq_main_table_id,
upsert_table_id=release.bq_upsert_table_id,
primary_key=primary_key,
)
[docs]def add_dataset_release(release_id: str, cloud_workspace: CloudWorkspace, api_bq_dataset_id: str):
"""Adds a release to the API dataset
:param api_bq_dataset_id: The ID of the API dataset
"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
api = DatasetAPI(bq_project_id=release.cloud_workspace.project_id, bq_dataset_id=api_bq_dataset_id)
now = pendulum.now()
dataset_release = DatasetRelease(
dag_id=release.dag_id,
entity_id="unpaywall",
dag_run_id=release.run_id,
created=now,
modified=now,
snapshot_date=release.snapshot_date,
changefile_start_date=release.changefile_release.start_date,
changefile_end_date=release.changefile_release.end_date,
)
api.add_dataset_release(dataset_release)
[docs]def cleanup_workflow(release_id: str, cloud_workspace: CloudWorkspace):
"""Cleans up the workflow, deleting folders and xcoms"""
release = release_from_bucket(cloud_workspace.download_bucket, release_id)
release = UnpaywallRelease.from_dict(release)
cleanup(dag_id=release.dag_id, workflow_folder=release.workflow_folder)
[docs]def get_snapshot_file_name(base_url: str, api_key: str) -> str:
"""Get the Unpaywall snapshot filename. Raises errors if reponses from unpaywall are unexpected.
:param base_url: The Unpaywall feed base url
:param api_key: The Unpaywall feed api key
:return: Snapshot file date.
"""
url = snapshot_url(base_url, api_key)
response = requests.head(url)
if response.status_code != 302: # Expect a redirect attempt
raise AirflowException(f"Unexpected status code: url={response.url}, status_code={response.status_code}")
if not response.headers.get("location"): # The filename is in the "location" header item that points to AWS
raise AirflowException(
f'Could not determine filename. Missing "location" in header: {response.headers.items()}'
)
parsed_url = urlparse(response.headers["location"])
fname = parsed_url.path.split("/")[-1]
logging.info(f"Found snapshot filename: {fname}")
return fname
[docs]def changefiles_url(base_url: str, api_key: str):
return f"{base_url}/feed/changefiles?interval=day&api_key={api_key}"
[docs]def get_unpaywall_changefiles(url: str) -> List[Changefile]:
"""Get all changefiles from unpaywall"""
response = get_http_response_json(url)
# Only include jsonl files, parse date and strip out api key
changefiles = []
for changefile in response["list"]:
filetype = changefile["filetype"]
if filetype == "jsonl":
filename = changefile["filename"]
changefiles.append(Changefile(filename, unpaywall_filename_to_datetime(filename)))
# Make sure sorted from oldest to newest
changefiles.sort(key=lambda c: c.changefile_date, reverse=False)
return changefiles
[docs]def unpaywall_filename_to_datetime(file_name: str) -> pendulum.DateTime:
"""Parses a release date from a file name, e.g. 2023-04-25T080001
:param file_name: Unpaywall release file name (contains date string).
:return: date.
"""
date_string_re = re.search(r"\d{4}-\d{2}-\d{2}(T\d{6})?", file_name)
if not date_string_re:
raise AirflowException(f"Could not find date in file name: {file_name}")
date_string = date_string_re.group()
try:
# Try to parse full date time
return pendulum.from_format(date_string, "YYYY-MM-DDTHHmmss")
except ValueError:
# Try to just parse date
return pendulum.from_format(date_string, "YYYY-MM-DD")