academic_observatory_workflows.crossref_fundref_telescope.tasks

Functions

fetch_releases(→ List[Dict])

Based on a list of all releases, checks which ones were released between the prev and this execution date

download(→ None)

Task to Download release tar.gz file from url.

upload_downloaded(→ None)

Task to upload the downloaded Fundref data to GCS bucket

extract(→ None)

Task to extract a Fundref data release

transform(→ None)

Task to transform a Fundref data release

upload_transformed(→ None)

Task to upload the downloaded Fundref data to GCS bucket

bq_load(→ None)

Task to load the Fundref release data into a sharded bigquery table

add_dataset_releases(→ None)

Task to add the release to the api dataset

cleanup_workflow(→ None)

Task to clean up the workflow

list_releases(→ List[dict])

List all available CrossrefFundref releases between the start and end date

parse_fundref_registry_rdf(→ Tuple[List, dict])

Helper function to parse a fundref registry rdf file and to return a python list containing each funder.

add_funders_relationships(→ List)

Adds any children/parent relationships to funder instances in the funders list.

new_funder_template()

Helper Function for creating a new Funder.

recursive_funders(→ Tuple[List, int])

Recursively goes through a funder/sub_funder dict. The funder properties can be looked up with the

Module Contents

academic_observatory_workflows.crossref_fundref_telescope.tasks.fetch_releases(cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, dag_id: str, run_id: str, data_interval_start: str, data_interval_end: str, bq_dataset_id: str, bq_table_name: str) List[Dict][source]

Based on a list of all releases, checks which ones were released between the prev and this execution date of the DAG. If the release falls within the time period mentioned above, checks if a bigquery table doesn’t exist yet for the release. A list of releases that passed both checks is passed to the next tasks. If the list is empty the workflow will stop.

Parameters:
  • cloud_workspace – The cloud workspace object to work with

  • dag_id – The ID of the dag

  • run_id – The ID of this dag run

  • data_interval_start – The start of the data interval for this run

  • data_interval_end – The end of the data interval for this run

Bq_dataset_id:

The ID of the bigquery dataset containing existing fundref data

Bq_table_name:

The name of the bigquery table (without the shard date)

academic_observatory_workflows.crossref_fundref_telescope.tasks.download(release: Dict) None[source]

Task to Download release tar.gz file from url.

Parameters:

release – The Fundref release object in dictionary form

academic_observatory_workflows.crossref_fundref_telescope.tasks.upload_downloaded(release: Dict) None[source]

Task to upload the downloaded Fundref data to GCS bucket

academic_observatory_workflows.crossref_fundref_telescope.tasks.extract(release: Dict | academic_observatory_workflows.crossref_fundref_telescope.release.CrossrefFundrefRelease) None[source]

Task to extract a Fundref data release

academic_observatory_workflows.crossref_fundref_telescope.tasks.transform(release: Dict | academic_observatory_workflows.crossref_fundref_telescope.release.CrossrefFundrefRelease) None[source]

Task to transform a Fundref data release

academic_observatory_workflows.crossref_fundref_telescope.tasks.upload_transformed(release: Dict | academic_observatory_workflows.crossref_fundref_telescope.release.CrossrefFundrefRelease) None[source]

Task to upload the downloaded Fundref data to GCS bucket

academic_observatory_workflows.crossref_fundref_telescope.tasks.bq_load(release: Dict, bq_dataset_id: str, dataset_description: str, bq_table_name: str, table_description: str, schema_folder: str) None[source]

Task to load the Fundref release data into a sharded bigquery table

Parameters:
  • bq_dataset_id – The bigquery dataset ID

  • dataset_description – The description to give the dataset

  • bq_table_name – The name of the bigquery table to create

  • table_description – The description to give the table

  • schema_folder – The location of the schema file to use

academic_observatory_workflows.crossref_fundref_telescope.tasks.add_dataset_releases(release: Dict, api_bq_dataset_id: str) None[source]

Task to add the release to the api dataset

Parameters:

api_bq_dataset_id – The dataset containing the api table

academic_observatory_workflows.crossref_fundref_telescope.tasks.cleanup_workflow(release: Dict) None[source]

Task to clean up the workflow

academic_observatory_workflows.crossref_fundref_telescope.tasks.list_releases(start_date: pendulum.DateTime, end_date: pendulum.DateTime) List[dict][source]

List all available CrossrefFundref releases between the start and end date

Parameters:
  • start_date – The start date of the period to look for releases

  • end_date – The end date of the period to look for releases

Returns:

list with dictionaries of release info (url and release date)

academic_observatory_workflows.crossref_fundref_telescope.tasks.parse_fundref_registry_rdf(registry_file_path: str) Tuple[List, dict][source]

Helper function to parse a fundref registry rdf file and to return a python list containing each funder.

Parameters:

registry_file_path – the filename of the registry.rdf file to be parsed.

Returns:

funders list containing all the funders parsed from the input rdf and dictionary of funders with their

id as key.

academic_observatory_workflows.crossref_fundref_telescope.tasks.add_funders_relationships(funders: List, funders_by_key: Dict) List[source]

Adds any children/parent relationships to funder instances in the funders list.

Parameters:
  • funders – List of funders

  • funders_by_key – Dictionary of funders with their id as key.

Returns:

funders with added relationships.

academic_observatory_workflows.crossref_fundref_telescope.tasks.new_funder_template()[source]

Helper Function for creating a new Funder.

Returns:

a blank funder object.

academic_observatory_workflows.crossref_fundref_telescope.tasks.recursive_funders(funders_by_key: Dict, funder: Dict, depth: int, direction: str, sub_funders: List) Tuple[List, int][source]

Recursively goes through a funder/sub_funder dict. The funder properties can be looked up with the funders_by_key dictionary that stores the properties per funder id. Any children/parents for the funder are already given in the xml element with the ‘narrower’ and ‘broader’ tags. For each funder in the list, it will recursively add any children/parents for those funders in ‘narrower’/’broader’ and their funder properties.

Parameters:
  • funders_by_key – dictionary with id as key and funders object as value

  • funder – dictionary of a given funder containing ‘narrower’ and ‘broader’ info

  • depth – keeping track of nested depth

  • direction – either ‘narrower’ or ‘broader’ to get ‘children’ or ‘parents’

  • sub_funders – list to keep track of which funder ids are parents

Returns:

list of children and current depth