academic_observatory_workflows.workflows.doi_workflow
Module Contents
Classes
Functions
|
|
|
Fetch the ROR affiliations for a given affiliation string. |
|
|
|
Traverse all of the ancestors of a set of child ROR ids. |
|
Make an index of child to ancestor relationships. |
Attributes
- class academic_observatory_workflows.workflows.doi_workflow.Table[source]
- academic_observatory_workflows.workflows.doi_workflow.make_sql_queries(input_project_id: str, output_project_id: str, dataset_id_crossref_events: str = 'crossref_events', dataset_id_crossref_metadata: str = 'crossref_metadata', dataset_id_crossref_fundref: str = 'crossref_fundref', dataset_id_ror: str = 'ror', dataset_id_orcid: str = 'orcid', dataset_id_open_citations: str = 'open_citations', dataset_id_unpaywall: str = 'unpaywall', dataset_id_scihub: str = 'scihub', dataset_id_openalex: str = 'openalex', dataset_id_pubmed: str = 'pubmed', dataset_id_settings: str = 'settings', dataset_id_observatory: str = 'observatory', dataset_id_observatory_intermediate: str = 'observatory_intermediate') List[List[SQLQuery]][source]
- academic_observatory_workflows.workflows.doi_workflow.fetch_ror_affiliations(repository_institution: str, num_retries: int = 3) Dict[source]
Fetch the ROR affiliations for a given affiliation string.
- Parameters:
repository_institution – the affiliation string to search with.
num_retries – the number of retries.
- Returns:
the list of ROR affiliations.
- academic_observatory_workflows.workflows.doi_workflow.get_snapshot_date(project_id: str, dataset_id: str, table_id: str, snapshot_date: pendulum.DateTime)[source]
- academic_observatory_workflows.workflows.doi_workflow.traverse_ancestors(index: Dict, child_ids: Set)[source]
Traverse all of the ancestors of a set of child ROR ids.
- Parameters:
index – the index.
child_ids – the child ids.
- Returns:
all of the ancestors of all child ids.
- academic_observatory_workflows.workflows.doi_workflow.ror_to_ror_hierarchy_index(ror: List[Dict]) Dict[source]
Make an index of child to ancestor relationships.
- Parameters:
ror – the ROR dataset as a list of dicts.
- Returns:
the index.
- class academic_observatory_workflows.workflows.doi_workflow.DoiWorkflow(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_intermediate_dataset_id: str = 'observatory_intermediate', bq_dashboards_dataset_id: str = 'coki_dashboards', bq_observatory_dataset_id: str = 'observatory', bq_unpaywall_dataset_id: str = 'unpaywall', bq_ror_dataset_id: str = 'ror', api_dataset_id: str = 'doi', sql_queries: List[List[SQLQuery]] = None, max_fetch_threads: int = 4, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime | None = pendulum.datetime(2020, 8, 30), schedule: str | None = '@weekly', sensor_dag_ids: List[str] = None)[source]
Bases:
observatory.platform.workflows.workflow.Workflow- SENSOR_DAG_IDS = ['crossref_metadata', 'crossref_fundref', 'geonames', 'ror', 'open_citations', 'unpaywall',...[source]
- make_release(**kwargs) observatory.platform.workflows.workflow.SnapshotRelease[source]
Make a release instance. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.
- Parameters:
kwargs – the context passed from the PythonOperator. See
https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: A release instance or list of release instances
- create_datasets(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Create required BigQuery datasets.
- create_repo_institution_to_ror_table(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Create the repository_institution_to_ror_table.
- create_ror_hierarchy_table(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Create the ROR hierarchy table.
- create_intermediate_table(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Create an intermediate table.
- create_aggregate_table(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Runs the aggregate table query.
- update_table_descriptions(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Update descriptions for tables.
- copy_to_dashboards(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Copy tables to dashboards dataset.
- create_dashboard_views(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Create views for dashboards dataset.
- add_new_dataset_releases(release: observatory.platform.workflows.workflow.SnapshotRelease, **kwargs)[source]
Adds release information to API.
- remove_aggregations(input_aggregations: List[Aggregation], aggregations_to_remove: Set[str]) List[Aggregation][source]
Remove a set of aggregations from a given list. Removal is based on mathcing the table_name string in the Aggregation object.
When removing items from the Aggregation list, you will need to reflect the same changes in unit test for the DOI Workflow DAG structure.
This function actually works in reverse and builds a list of aggregations if it does not match to the table_name in the “aggregations_to_remove” set. This is to get around a strange memory bug that refuses to modify the AGGREGATIONS list with .remove()
- Parameters:
input_aggregations – List of Aggregations to have elements removed.
aggregations_to_remove – Set of aggregations to remove, matching on “table_name” in the Aggregation class.
- Return aggregations_removed:
Return a list of Aggregations with the desired items removed.