academic_observatory_workflows.openalex_telescope.tasks

Attributes

DATASET_API_ENTITY_ID

Functions

fetch_entities(→ dict[str, dict])

aws_to_gcs_transfer(*, entity, gc_project_id, ...)

download(*, entity, **context)

transform(*, entity)

upload_schema(*, entity, transform_bucket)

compare_schemas(*, entity, transform_bucket, ...)

upload_files(*, entity, transform_bucket)

bq_load_table(*, entity)

expire_previous_version(dag_id, project_id, ...[, ...])

add_dataset_release(*, dag_id, run_id, snapshot_date, ...)

get_entity(...)

get_task_id(**context)

make_no_updated_data_msg(→ str)

make_first_run_message(task_id)

make_no_merged_ids_msg(→ str)

get_aws_key(→ Tuple[str, str])

Get the AWS access key id and secret access key from the aws_conn_id airflow connection.

fetch_manifest(...)

Fetch OpenAlex manifests for a range of entity types.

fetch_merged_ids(...)

transform_file(→ Tuple[collections.OrderedDict, list])

Transforms a single file.

remove_none_from_array(obj, field)

Remove None values from an array.

safe_get_dict(→ dict)

transform_object(obj)

Transform an entry/object for one of the OpenAlex entities.

bq_compare_schemas(→ bool)

Compare two Bigquery style schemas for if they have the same fields and/or data types.

merge_schema_maps(→ collections.OrderedDict)

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found

flatten_schema(→ dict)

A quick trick using the JSON encoder and load string function to convert from a nested

load_json(→ Any)

Read in a *.json file.

bq_set_table_expiry(*, table_id, days)

Set the expiry time for a BigQuery table.

Module Contents

academic_observatory_workflows.openalex_telescope.tasks.DATASET_API_ENTITY_ID = 'openalex'[source]
academic_observatory_workflows.openalex_telescope.tasks.fetch_entities(*, dag_id: str, run_id: str, is_first_run: bool, entity_names: list[str], cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_folder: str, bq_dataset_id: str, api_bq_dataset_id: str, aws_conn_id: str, aws_openalex_bucket: str, entity_id=DATASET_API_ENTITY_ID) dict[str, dict][source]
academic_observatory_workflows.openalex_telescope.tasks.aws_to_gcs_transfer(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, gc_project_id: str, aws_conn_id: str, n_transfer_trys: int, aws_openalex_bucket: str)[source]
academic_observatory_workflows.openalex_telescope.tasks.download(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, **context)[source]
academic_observatory_workflows.openalex_telescope.tasks.transform(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity)[source]
academic_observatory_workflows.openalex_telescope.tasks.upload_schema(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str)[source]
academic_observatory_workflows.openalex_telescope.tasks.compare_schemas(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str, slack_conn_id: str, **context)[source]
academic_observatory_workflows.openalex_telescope.tasks.upload_files(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str)[source]
academic_observatory_workflows.openalex_telescope.tasks.bq_load_table(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity)[source]
academic_observatory_workflows.openalex_telescope.tasks.expire_previous_version(dag_id: str, project_id: str, dataset_id: str, table_id: str, snapshot_date: pendulum.DateTime, expiry_days: int, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID, client: google.cloud.bigquery.Client = None)[source]
academic_observatory_workflows.openalex_telescope.tasks.add_dataset_release(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, bq_project_id: str, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID)[source]
academic_observatory_workflows.openalex_telescope.tasks.get_entity(entity_index: dict, entity_name: str) academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity | None[source]
academic_observatory_workflows.openalex_telescope.tasks.get_task_id(**context)[source]
academic_observatory_workflows.openalex_telescope.tasks.make_no_updated_data_msg(task_id: str, entity_name: str) str[source]
academic_observatory_workflows.openalex_telescope.tasks.make_first_run_message(task_id: str)[source]
academic_observatory_workflows.openalex_telescope.tasks.make_no_merged_ids_msg(task_id: str, entity_name: str) str[source]
academic_observatory_workflows.openalex_telescope.tasks.get_aws_key(aws_conn_id: str) Tuple[str, str][source]

Get the AWS access key id and secret access key from the aws_conn_id airflow connection.

Returns:

access key id and secret access key

academic_observatory_workflows.openalex_telescope.tasks.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) academic_observatory_workflows.openalex_telescope.release.Manifest[source]

Fetch OpenAlex manifests for a range of entity types.

Parameters:
  • bucket – the OpenAlex AWS bucket.

  • aws_key – the aws_access_key_id and aws_secret_key as a tuple.

  • entity_name – the entity type.

Returns:

None

academic_observatory_workflows.openalex_telescope.tasks.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[academic_observatory_workflows.openalex_telescope.release.MergedId][source]
academic_observatory_workflows.openalex_telescope.tasks.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list][source]

Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.

This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.

Parameters:
  • download_path – The path to the file with the OpenAlex entries.

  • transform_path – The path where transformed data will be saved.

Returns:

schema_map. A nested OrderedDict object produced by the SchemaGenertaor.

Returns:

schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.

academic_observatory_workflows.openalex_telescope.tasks.remove_none_from_array(obj: dict, field: str)[source]

Remove None values from an array.

Parameters:
  • obj – the dictionary containing the array field.

  • field – the key for the array field.

Returns:

None

academic_observatory_workflows.openalex_telescope.tasks.safe_get_dict(obj: dict, field: str) dict[source]
academic_observatory_workflows.openalex_telescope.tasks.transform_object(obj: dict)[source]

Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.

Parameters:
  • obj – Single object with entity information

  • field – The field of interested that is transformed.

Returns:

None.

academic_observatory_workflows.openalex_telescope.tasks.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool[source]

Compare two Bigquery style schemas for if they have the same fields and/or data types.

Parameters:
  • expected – the expected schema.

  • actual – the actual schema.

Check_types_match:

Optional, if checking data types of fields is required.

Returns:

whether the expected and actual match.

academic_observatory_workflows.openalex_telescope.tasks.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict[source]

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.

Parameters:
  • to_add – The incoming schema to add to the existing “old” schema.

  • old – The existing old schema with previously populated values.

Returns:

The old schema with newly added fields.

academic_observatory_workflows.openalex_telescope.tasks.flatten_schema(schema_map: collections.OrderedDict) dict[source]

A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.

Parameters:

schema_map – The generated schema from SchemaGenerator.

Return schema:

A Bigquery style schema.

academic_observatory_workflows.openalex_telescope.tasks.load_json(file_path: str) Any[source]

Read in a *.json file.

academic_observatory_workflows.openalex_telescope.tasks.bq_set_table_expiry(*, table_id: str, days: int)[source]

Set the expiry time for a BigQuery table.

Parameters:
  • table_id – the fully qualified BigQuery table identifier.

  • days – the number of days from now until the table expires.

Returns: