Source code for academic_observatory_workflows.workflows.crossref_fundref_telescope

# Copyright 2020 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: Aniek Roelofs, Richard Hosking, James Diprose

from __future__ import annotations

import json
import logging
import os
import random
import shutil
import tarfile
import xml.etree.ElementTree as ET
from typing import List, Dict, Tuple

import pendulum
from airflow.api.common.experimental.pool import create_pool
from airflow.models.taskinstance import TaskInstance
from google.cloud.bigquery import SourceFormat

from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag
from observatory.api.client.model.dataset_release import DatasetRelease
from observatory.platform.api import make_observatory_api
from observatory.platform.bigquery import (
    bq_find_schema,
    bq_sharded_table_id,
    bq_table_exists,
    bq_load_table,
    bq_create_dataset,
)
from observatory.platform.config import AirflowConns
from observatory.platform.files import save_jsonl_gz, clean_dir
from observatory.platform.gcs import gcs_blob_name_from_path, gcs_upload_files, gcs_blob_uri
from observatory.platform.observatory_config import CloudWorkspace
from observatory.platform.utils.url_utils import retry_get_url
from observatory.platform.workflows.workflow import Workflow, SnapshotRelease, cleanup, set_task_state

[docs]RELEASES_URL = "https://gitlab.com/api/v4/projects/crossref%2Fopen_funder_registry/releases"
[docs]class CrossrefFundrefRelease(SnapshotRelease): def __init__(self, *, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, url: str): """Construct a RorRelease. :param dag_id: the DAG id. :param run_id: the DAG run id. :param snapshot_date: the release date. :param url: The url corresponding with this release date. """ super().__init__(dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date) self.url = url self.download_file_name = "crossref_fundref.tar.gz" self.extract_file_name = "crossref_fundref.rdf" self.transform_file_name = "crossref_fundref.jsonl.gz" self.download_file_path = os.path.join(self.download_folder, self.download_file_name) self.extract_file_path = os.path.join(self.extract_folder, self.extract_file_name) self.transform_file_path = os.path.join(self.transform_folder, self.transform_file_name)
[docs]class CrossrefFundrefTelescope(Workflow): def __init__( self, *, dag_id: str, cloud_workspace: CloudWorkspace, bq_dataset_id: str = "crossref_fundref", bq_table_name: str = "crossref_fundref", api_dataset_id: str = "crossref_fundref", schema_folder: str = os.path.join(default_schema_folder(), "crossref_fundref"), dataset_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/", table_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2014, 2, 23), schedule: str = "@weekly", catchup: bool = True, gitlab_pool_name: str = "gitlab_pool", gitlab_pool_slots: int = 2, gitlab_pool_description: str = "A pool to limit the connections to Gitlab", ): """Construct a CrossrefFundrefTelescope instance. :param dag_id: the id of the DAG. :param cloud_workspace: the cloud workspace settings. :param bq_dataset_id: the BigQuery dataset id. :param bq_table_name: the BigQuery table name. :param api_dataset_id: the Dataset ID to use when storing releases. :param schema_folder: the SQL schema path. :param dataset_description: description for the BigQuery dataset. :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. :param gitlab_pool_name: name of the Gitlab Pool. :param gitlab_pool_slots: number of slots for the Gitlab Pool. :param gitlab_pool_description: description for the Gitlab Pool. """ super().__init__( dag_id=dag_id, start_date=start_date, schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], ) self.cloud_workspace = cloud_workspace self.bq_dataset_id = bq_dataset_id self.bq_table_name = bq_table_name self.api_dataset_id = api_dataset_id self.schema_folder = schema_folder self.dataset_description = dataset_description self.table_description = table_description self.observatory_api_conn_id = observatory_api_conn_id # Create Gitlab pool to limit the number of connections to Gitlab, which is very quick to block requests if # there are too many at once. create_pool(gitlab_pool_name, gitlab_pool_slots, gitlab_pool_description) self.add_setup_task(self.check_dependencies) self.add_setup_task(self.get_release_info, pool=gitlab_pool_name) self.add_task(self.download, pool=gitlab_pool_name) self.add_task(self.upload_downloaded) self.add_task(self.extract) self.add_task(self.transform) self.add_task(self.upload_transformed) self.add_task(self.bq_load) self.add_task(self.add_new_dataset_releases) self.add_task(self.cleanup)
[docs] def make_release(self, **kwargs) -> List[CrossrefFundrefRelease]: """Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in 'task_callable'. :param kwargs: the context passed from the PythonOperator. See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: a list of CrossrefFundrefRelease instances. """ ti: TaskInstance = kwargs["ti"] release_info = ti.xcom_pull( key=CrossrefFundrefTelescope.RELEASE_INFO, task_ids=self.get_release_info.__name__, include_prior_dates=False, ) releases = [] run_id = kwargs["run_id"] for release in release_info: snapshot_date = pendulum.parse(release["date"]) releases.append( CrossrefFundrefRelease( dag_id=self.dag_id, run_id=run_id, snapshot_date=snapshot_date, url=release["url"] ) ) return releases
[docs] def get_release_info(self, **kwargs) -> bool: """Based on a list of all releases, checks which ones were released between the prev and this execution date of the DAG. If the release falls within the time period mentioned above, checks if a bigquery table doesn't exist yet for the release. A list of releases that passed both checks is passed to the next tasks. If the list is empty the workflow will stop.""" # List releases between a start date and an end date [ ) start_date = pendulum.instance(kwargs["data_interval_start"]) end_date = pendulum.instance(kwargs["data_interval_end"]) releases_list = list_releases(start_date, end_date) logging.info(f"Releases between start_date={start_date} and end_date={end_date}") logging.info(releases_list) # Check if the BigQuery table for each release already exists and only process release if the table # doesn't exist releases_list_out = [] for release in releases_list: snapshot_date = pendulum.parse(release["date"]) table_id = bq_sharded_table_id( self.cloud_workspace.output_project_id, self.bq_dataset_id, self.bq_table_name, snapshot_date ) logging.info("Checking if bigquery table already exists:") if bq_table_exists(table_id): logging.info(f"Skipping as table exists for {release['url']}: {table_id}") else: logging.info(f"Table does not exist yet, processing {release['url']} in this workflow") releases_list_out.append(release) # If releases_list_out contains items then the DAG will continue (return True) otherwise it will # stop (return False) continue_dag = len(releases_list_out) > 0 if continue_dag: ti: TaskInstance = kwargs["ti"] ti.xcom_push(CrossrefFundrefTelescope.RELEASE_INFO, releases_list_out, kwargs["logical_date"]) return continue_dag
[docs] def download(self, releases: List[CrossrefFundrefRelease], **kwargs): """Downloads release tar.gz file from url.""" for release in releases: logging.info(f"Downloading file: {release.download_file_path}, url: {release.url}") clean_dir(release.download_folder) # A selection of headers to prevent 403/forbidden error. headers_list = [ { "authority": "gitlab.com", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/83.0.4103.116 Safari/537.36", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng," "*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "sec-fetch-site": "none", "sec-fetch-mode": "navigate", "sec-fetch-dest": "document", "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", }, { "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://gitlab.com/", }, ] # Download release with retry_get_url(release.url, headers=random.choice(headers_list), stream=True) as response: with open(release.download_file_path, "wb") as file: shutil.copyfileobj(response.raw, file)
[docs] def upload_downloaded(self, releases: List[CrossrefFundrefRelease], **kwargs): """Upload the data to Cloud Storage.""" for release in releases: success = gcs_upload_files( bucket_name=self.cloud_workspace.download_bucket, file_paths=[release.download_file_path] ) set_task_state(success, self.upload_downloaded.__name__, release)
[docs] def extract(self, releases: List[CrossrefFundrefRelease], **kwargs): """Extract release from gzipped tar file.""" for release in releases: logging.info(f"Extracting file: {release.download_file_path}") clean_dir(release.extract_folder) with tarfile.open(release.download_file_path, "r:gz") as tar: for member in tar.getmembers(): if member.isfile() and member.name.endswith(".rdf"): # Extract RDF file to memory rdf_file = tar.extractfile(member) # Save file to desired path with open(release.extract_file_path, "wb") as output_file: output_file.write(rdf_file.read()) logging.info(f"File extracted to: {release.extract_file_path}")
[docs] def transform(self, releases: List[CrossrefFundrefRelease], **kwargs): """Transforms release by storing file content in gzipped json format. Relationships between funders are added.""" for release in releases: clean_dir(release.transform_folder) # Parse RDF funders data funders, funders_by_key = parse_fundref_registry_rdf(release.extract_file_path) funders = add_funders_relationships(funders, funders_by_key) # Save save_jsonl_gz(release.transform_file_path, funders) logging.info(f"Success transforming release: {release.url}")
[docs] def upload_transformed(self, releases: List[CrossrefFundrefRelease], **kwargs) -> None: """Upload the transformed data to Cloud Storage.""" for release in releases: success = gcs_upload_files( bucket_name=self.cloud_workspace.transform_bucket, file_paths=[release.transform_file_path] ) set_task_state(success, self.upload_transformed.__name__, release)
[docs] def bq_load(self, releases: List[CrossrefFundrefRelease], **kwargs) -> None: """Load the data into BigQuery.""" bq_create_dataset( project_id=self.cloud_workspace.output_project_id, dataset_id=self.bq_dataset_id, location=self.cloud_workspace.data_location, description=self.dataset_description, ) for release in releases: uri = gcs_blob_uri( self.cloud_workspace.transform_bucket, gcs_blob_name_from_path(release.transform_file_path) ) schema_file_path = bq_find_schema( path=self.schema_folder, table_name=self.bq_table_name, release_date=release.snapshot_date ) table_id = bq_sharded_table_id( self.cloud_workspace.output_project_id, self.bq_dataset_id, self.bq_table_name, release.snapshot_date ) success = bq_load_table( uri=uri, table_id=table_id, schema_file_path=schema_file_path, source_format=SourceFormat.NEWLINE_DELIMITED_JSON, table_description=self.table_description, ignore_unknown_values=True, ) set_task_state(success, self.bq_load.__name__, release)
[docs] def add_new_dataset_releases(self, releases: List[CrossrefFundrefRelease], **kwargs) -> None: """Adds release information to API.""" for release in releases: dataset_release = DatasetRelease( dag_id=self.dag_id, dataset_id=self.api_dataset_id, dag_run_id=release.run_id, snapshot_date=release.snapshot_date, data_interval_start=kwargs["data_interval_start"], data_interval_end=kwargs["data_interval_end"], ) api = make_observatory_api(observatory_api_conn_id=self.observatory_api_conn_id) api.post_dataset_release(dataset_release)
[docs] def cleanup(self, releases: List[CrossrefFundrefRelease], **kwargs) -> None: """Delete all files, folders and XComs associated with this release.""" for release in releases: cleanup( dag_id=self.dag_id, execution_date=kwargs["execution_date"], workflow_folder=release.workflow_folder )
[docs]def list_releases(start_date: pendulum.DateTime, end_date: pendulum.DateTime) -> List[dict]: """List all available CrossrefFundref releases between the start and end date :param start_date: The start date of the period to look for releases :param end_date: The end date of the period to look for releases :return: list with dictionaries of release info (url and release date) """ # A selection of headers to prevent 403/forbidden error. headers_list = [ { "authority": "gitlab.com", "upgrade-insecure-requests": "1", "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/84.0.4147.89 Safari/537.36", "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng," "*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "sec-fetch-site": "none", "sec-fetch-mode": "navigate", "sec-fetch-dest": "document", "accept-language": "en-GB,en-US;q=0.9,en;q=0.8", }, { "User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "DNT": "1", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", }, ] release_info = [] headers = random.choice(headers_list) current_page = 1 while True: # Fetch page url = f"{RELEASES_URL}?per_page=100&page={current_page}" response = retry_get_url(url, headers=headers) # Parse json num_pages = int(response.headers["X-Total-Pages"]) json_response = json.loads(response.text) def parse_version(r: Dict): try: # ValueError on "v.1.48" return float(r["tag_name"].lower().strip("v")) except ValueError: return float(r["name"].lower().strip("v")) # Parse release information for release in json_response: version = parse_version(release) for source in release["assets"]["sources"]: if source["format"] == "tar.gz": # Parse release date if version == 0.1: snapshot_date = pendulum.datetime(year=2014, month=3, day=1) elif version < 1.0: date_string = release["description"].split("\n")[0] snapshot_date = pendulum.from_format("01 " + date_string, "DD MMMM YYYY") else: snapshot_date = pendulum.parse(release["released_at"]) # Only include release if it is within start and end dates if start_date <= snapshot_date < end_date: release_info.append({"url": source["url"], "date": snapshot_date.format("YYYYMMDD")}) # Check if we should exit or get the next page if num_pages <= current_page: break current_page += 1 return release_info
[docs]def new_funder_template(): """Helper Function for creating a new Funder. :return: a blank funder object. """ return { "funder": None, "pre_label": None, "alt_label": [], "narrower": [], "broader": [], "modified": None, "created": None, "funding_body_type": None, "funding_body_sub_type": None, "region": None, "country": None, "country_code": None, "state": None, "tax_id": None, "continuation_of": [], "renamed_as": [], "replaces": [], "affil_with": [], "merged_with": [], "incorporated_into": [], "is_replaced_by": [], "incorporates": [], "split_into": [], "status": None, "merger_of": [], "split_from": None, "formly_known_as": None, "notation": None, }
[docs]def parse_fundref_registry_rdf(registry_file_path: str) -> Tuple[List, Dict]: """Helper function to parse a fundref registry rdf file and to return a python list containing each funder. :param registry_file_path: the filename of the registry.rdf file to be parsed. :return: funders list containing all the funders parsed from the input rdf and dictionary of funders with their id as key. """ funders = [] funders_by_key = {} # Strip leading white space from file this is present in fundref release 2019-06-01. If not removed it will give # an XML ParseError. with open(registry_file_path, "r") as f: xml_string = f.read().strip() root = ET.fromstring(xml_string) tag_prefix = root.tag.split("}")[0] + "}" for record in root: tag = record.tag.split("}")[-1] if tag == "ConceptScheme": for nested in record: tag = nested.tag.split("}")[-1] if tag == "hasTopConcept": funder_id = nested.attrib[tag_prefix + "resource"] funders_by_key[funder_id] = new_funder_template() if tag == "Concept": funder_id = record.attrib[tag_prefix + "about"] funder = funders_by_key[funder_id] funder["funder"] = funder_id for nested in record: tag = nested.tag.split("}")[-1] if tag == "inScheme": continue elif tag == "prefLabel": funder["pre_label"] = nested[0][0].text elif tag == "altLabel": alt_label = nested[0][0].text if alt_label is not None: funder["alt_label"].append(alt_label) elif tag == "narrower": funder["narrower"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "broader": funder["broader"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "modified": funder["modified"] = nested.text elif tag == "created": funder["created"] = nested.text elif tag == "fundingBodySubType": funder["funding_body_type"] = nested.text elif tag == "fundingBodyType": funder["funding_body_sub_type"] = nested.text elif tag == "region": funder["region"] = nested.text elif tag == "country": funder["country"] = nested.text elif tag == "state": funder["state"] = nested.text elif tag == "address": funder["country_code"] = nested[0][0].text elif tag == "taxId": funder["tax_id"] = nested.text elif tag == "continuationOf": funder["continuation_of"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "renamedAs": funder["renamed_as"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "replaces": funder["replaces"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "affilWith": funder["affil_with"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "mergedWith": funder["merged_with"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "incorporatedInto": funder["incorporated_into"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "isReplacedBy": funder["is_replaced_by"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "incorporates": funder["incorporates"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "splitInto": funder["split_into"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "status": funder["status"] = nested.attrib[tag_prefix + "resource"] elif tag == "mergerOf": funder["merger_of"].append(nested.attrib[tag_prefix + "resource"]) elif tag == "splitFrom": funder["split_from"] = nested.attrib[tag_prefix + "resource"] elif tag == "formerlyKnownAs": funder["formly_known_as"] = nested.attrib[tag_prefix + "resource"] elif tag == "notation": funder["notation"] = nested.text else: logging.info(f"Unrecognized tag for element: {nested}") funders.append(funder) return funders, funders_by_key
[docs]def add_funders_relationships(funders: List, funders_by_key: Dict) -> List: """Adds any children/parent relationships to funder instances in the funders list. :param funders: List of funders :param funders_by_key: Dictionary of funders with their id as key. :return: funders with added relationships. """ for funder in funders: children, returned_depth = recursive_funders(funders_by_key, funder, 0, "narrower", []) funder["children"] = children funder["bottom"] = len(children) > 0 parent, returned_depth = recursive_funders(funders_by_key, funder, 0, "broader", []) funder["parents"] = parent funder["top"] = len(parent) > 0 return funders
[docs]def recursive_funders( funders_by_key: Dict, funder: Dict, depth: int, direction: str, sub_funders: List ) -> Tuple[List, int]: """Recursively goes through a funder/sub_funder dict. The funder properties can be looked up with the funders_by_key dictionary that stores the properties per funder id. Any children/parents for the funder are already given in the xml element with the 'narrower' and 'broader' tags. For each funder in the list, it will recursively add any children/parents for those funders in 'narrower'/'broader' and their funder properties. :param funders_by_key: dictionary with id as key and funders object as value :param funder: dictionary of a given funder containing 'narrower' and 'broader' info :param depth: keeping track of nested depth :param direction: either 'narrower' or 'broader' to get 'children' or 'parents' :param sub_funders: list to keep track of which funder ids are parents :return: list of children and current depth """ starting_depth = depth children = [] # Loop through funder_ids in 'narrower' or 'broader' info for funder_id in funder[direction]: if funder_id in sub_funders: # Stop recursion if funder is it's own parent or child logging.info(f"Funder {funder_id} is it's own parent/child, skipping..") name = "NA" returned = [] returned_depth = depth sub_funders.append(funder_id) else: try: sub_funder = funders_by_key[funder_id] # Add funder id of sub_funder to list to keep track of 'higher' sub_funders in the recursion sub_funders.append(sub_funder["funder"]) # Store name to pass on to child object name = sub_funder["pre_label"] # Get children/parents of sub_funder returned, returned_depth = recursive_funders( funders_by_key, sub_funder, starting_depth + 1, direction, sub_funders ) except KeyError: logging.info(f"Could not find funder by id: {funder_id}, skipping..") name = "NA" returned = [] returned_depth = depth sub_funders.append(funder_id) # Add child/parent (containing nested children/parents) to list if direction == "narrower": child = {"funder": funder_id, "name": name, "children": returned} else: child = {"funder": funder_id, "name": name, "parent": returned} children.append(child) sub_funders.pop(-1) if returned_depth > depth: depth = returned_depth return children, depth