academic_observatory_workflows.doi_workflow.doi_workflow

Module Contents

Classes

Table

SQLQuery

Aggregation

DOIRelease

Functions

create_dag(, schedule, sensor_dag_ids, ...)

Create the DoiWorkflow.

make_sql_queries(→ List[List[SQLQuery]])

fetch_ror_affiliations(→ Dict)

Fetch the ROR affiliations for a given affiliation string.

get_snapshot_date(project_id, dataset_id, table_id, ...)

traverse_ancestors(index, child_ids)

Traverse all of the ancestors of a set of child ROR ids.

ror_to_ror_hierarchy_index(→ Dict)

Make an index of child to ancestor relationships.

Attributes

MAX_QUERIES

SENSOR_DAG_IDS

AGGREGATIONS

academic_observatory_workflows.doi_workflow.doi_workflow.MAX_QUERIES = 100[source]
class academic_observatory_workflows.doi_workflow.doi_workflow.Table[source]
property table_id[source]

Generates the BigQuery table_id for both sharded and non-sharded tables.

Returns:

BigQuery table_id.

project_id: str[source]
dataset_id: str[source]
table_name: str[source]
sharded: bool = False[source]
snapshot_date: pendulum.DateTime[source]
class academic_observatory_workflows.doi_workflow.doi_workflow.SQLQuery[source]
name: str[source]
inputs: Dict[source]
output_table: Table[source]
output_clustering_fields: List[source]
class academic_observatory_workflows.doi_workflow.doi_workflow.Aggregation[source]
table_name: str[source]
aggregation_field: str[source]
group_by_time_field: str = 'published_year'[source]
relate_to_institutions: bool = False[source]
relate_to_countries: bool = False[source]
relate_to_groups: bool = False[source]
relate_to_members: bool = False[source]
relate_to_journals: bool = False[source]
relate_to_funders: bool = False[source]
relate_to_publishers: bool = False[source]
academic_observatory_workflows.doi_workflow.doi_workflow.SENSOR_DAG_IDS = ['crossref_metadata', 'crossref_fundref', 'geonames', 'ror', 'open_citations', 'unpaywall',...[source]
academic_observatory_workflows.doi_workflow.doi_workflow.AGGREGATIONS[source]
class academic_observatory_workflows.doi_workflow.doi_workflow.DOIRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

static from_dict(dict_: dict)[source]
to_dict() dict[source]
academic_observatory_workflows.doi_workflow.doi_workflow.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_intermediate_dataset_id: str = 'observatory_intermediate', bq_dashboards_dataset_id: str = 'coki_dashboards', bq_observatory_dataset_id: str = 'observatory', bq_unpaywall_dataset_id: str = 'unpaywall', bq_ror_dataset_id: str = 'ror', api_dataset_id: str = 'doi', sql_queries: List[List[SQLQuery]] = None, max_fetch_threads: int = 4, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime | None = pendulum.datetime(2020, 8, 30), schedule: str | None = '@weekly', sensor_dag_ids: List[str] = None, max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

Create the DoiWorkflow. :param dag_id: the DAG ID. :param cloud_workspace: the cloud workspace settings. :param bq_intermediate_dataset_id: the BigQuery intermediate dataset id. :param bq_dashboards_dataset_id: the BigQuery dashboards dataset id. :param bq_observatory_dataset_id: the BigQuery observatory dataset id. :param bq_unpaywall_dataset_id: the BigQuery Unpaywall dataset id. :param bq_ror_dataset_id: the BigQuery ROR dataset id. :param api_dataset_id: the DOI dataset id. :param sql_queries: a list of batches of SQLQuery objects. :param max_fetch_threads: maximum number of threads to use when fetching. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date. :param schedule: the schedule interval. :param sensor_dag_ids: a list of DAG IDs to wait for with sensors. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task.

academic_observatory_workflows.doi_workflow.doi_workflow.make_sql_queries(input_project_id: str, output_project_id: str, dataset_id_crossref_events: str = 'crossref_events', dataset_id_crossref_metadata: str = 'crossref_metadata', dataset_id_crossref_fundref: str = 'crossref_fundref', dataset_id_ror: str = 'ror', dataset_id_orcid: str = 'orcid', dataset_id_open_citations: str = 'open_citations', dataset_id_unpaywall: str = 'unpaywall', dataset_id_scihub: str = 'scihub', dataset_id_openalex: str = 'openalex', dataset_id_pubmed: str = 'pubmed', dataset_id_settings: str = 'settings', dataset_id_observatory: str = 'observatory', dataset_id_observatory_intermediate: str = 'observatory_intermediate') List[List[SQLQuery]][source]
academic_observatory_workflows.doi_workflow.doi_workflow.fetch_ror_affiliations(repository_institution: str, num_retries: int = 3) Dict[source]

Fetch the ROR affiliations for a given affiliation string.

Parameters:
  • repository_institution – the affiliation string to search with.

  • num_retries – the number of retries.

Returns:

the list of ROR affiliations.

academic_observatory_workflows.doi_workflow.doi_workflow.get_snapshot_date(project_id: str, dataset_id: str, table_id: str, snapshot_date: pendulum.DateTime)[source]
academic_observatory_workflows.doi_workflow.doi_workflow.traverse_ancestors(index: Dict, child_ids: Set)[source]

Traverse all of the ancestors of a set of child ROR ids.

Parameters:
  • index – the index.

  • child_ids – the child ids.

Returns:

all of the ancestors of all child ids.

academic_observatory_workflows.doi_workflow.doi_workflow.ror_to_ror_hierarchy_index(ror: List[Dict]) Dict[source]

Make an index of child to ancestor relationships.

Parameters:

ror – the ROR dataset as a list of dicts.

Returns:

the index.