academic_observatory_workflows.workflows.openalex_telescope

Module Contents

Classes

OpenAlexEntity

OpenAlexRelease

OpenAlexTelescope

OpenAlex telescope

Meta

ManifestEntry

Manifest

MergedId

Functions

get_task_id(**kwargs)

make_no_updated_data_msg(→ str)

make_first_run_message(task_id)

make_no_merged_ids_msg(→ str)

make_download_bash_op(...)

Download files for an entity from the bucket.

parse_release_msg(msg)

s3_uri_parts(→ Tuple[str, str])

Extracts the S3 bucket name and object key from the given S3 URI.

fetch_manifest(→ Manifest)

Fetch OpenAlex manifests for a range of entity types.

fetch_merged_ids(→ List[MergedId])

transform_file(→ Tuple[collections.OrderedDict, list])

Transforms a single file.

transform_object(obj)

Transform an entry/object for one of the OpenAlex entities.

bq_compare_schemas(→ bool)

Compare two Bigquery style schemas for if they have the same fields and/or data types.

merge_schema_maps(→ collections.OrderedDict)

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found

flatten_schema(→ dict)

A quick trick using the JSON encoder and load string function to convert from a nested

load_json(→ Any)

Read in a *.json file.

class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexEntity(entity_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, manifest: Manifest, merged_ids: List[MergedId], is_first_run: bool, prev_end_date: pendulum.DateTime | None, release: OpenAlexRelease = None)[source]
property table_description[source]
property schema_file_path[source]
property generated_schema_path[source]
property upsert_uri[source]
property merged_ids_uri[source]
property merged_ids_schema_file_path[source]
property bq_main_table_id[source]
property bq_upsert_table_id[source]
property bq_delete_table_id[source]
property bq_snapshot_table_id[source]
property current_entries[source]
property has_merged_ids[source]
property current_merged_ids[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) OpenAlexEntity[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, entities: List[OpenAlexEntity], start_date: pendulum.DateTime, end_date: pendulum.DateTime, is_first_run: bool, schema_folder: str)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

get_entity(entity_name: str) OpenAlexEntity | None[source]
academic_observatory_workflows.workflows.openalex_telescope.get_task_id(**kwargs)[source]
academic_observatory_workflows.workflows.openalex_telescope.make_no_updated_data_msg(task_id: str, entity_name: str) str[source]
academic_observatory_workflows.workflows.openalex_telescope.make_first_run_message(task_id: str)[source]
academic_observatory_workflows.workflows.openalex_telescope.make_no_merged_ids_msg(task_id: str, entity_name: str) str[source]
class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'openalex', entity_names: List[str] = None, schema_folder: str = os.path.join(default_schema_folder(), 'openalex'), dataset_description: str = 'The OpenAlex dataset: https://docs.openalex.org/', snapshot_expiry_days: int = 31, n_transfer_trys: int = 3, max_processes: int = os.cpu_count(), primary_key: str = 'id', aws_conn_id: str = 'aws_openalex', aws_openalex_bucket: str = 'openalex', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, slack_conn_id: str | None = AirflowConns.SLACK, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), schedule: str = '@weekly', queue: str = 'remote_queue')[source]

Bases: observatory.platform.workflows.workflow.Workflow

OpenAlex telescope

property aws_key: Tuple[str, str][source]

Get the AWS access key id and secret access key from the aws_conn_id airflow connection.

Returns:

access key id and secret access key

make_dag() airflow.DAG[source]
fetch_releases(**kwargs) bool[source]

Fetch OpenAlex releases.

Returns:

True to continue, False to skip.

make_release(**kwargs) OpenAlexRelease[source]

Make a Release instance. Gets the list of releases available from the release check (setup task).

Parameters:

kwargs – the context passed from the Airflow Operator.

See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return OpenAlexRelease.

create_datasets(**kwargs) None[source]

Create datasets.

aws_to_gcs_transfer(release: OpenAlexRelease, **kwargs)[source]

Transfer files from AWS bucket to Google Cloud bucket

determine_branches(release: OpenAlexRelease, **kwargs)[source]

Determine which entity branches to follow

bq_snapshot(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.

transform(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Transform all files for the Work, Concept and Institution entities. Transforms one file per process.

This step also scans through each file and generates a Biguqery style schema from the incoming data.

upload_schema(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Upload the generated schema from the transform step to GCS.

compare_schemas(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Compare the generated schema against the expected schema for each entity.

upload_upsert_files(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Upload the transformed data to Cloud Storage. :raises AirflowException: Raised if the files to be uploaded are not found.

bq_load_upsert_tables(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Load the upsert table for each entity.

bq_upsert_records(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Upsert the records from each upserts table into the main table.

bq_load_delete_tables(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Load the delete tables.

bq_delete_records(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]

Delete records from main tables that are in delete tables.

add_dataset_releases(release: OpenAlexRelease, entity_name: str = None, **kwargs) None[source]

Adds release information to API.

cleanup(release: OpenAlexRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.workflows.openalex_telescope.make_download_bash_op(workflow: observatory.platform.workflows.workflow.Workflow, entity_name: str, task_id: str, **kwargs) observatory.platform.workflows.workflow.WorkflowBashOperator[source]

Download files for an entity from the bucket.

Gsutil is used instead of the standard Google Cloud Python library, because it is faster at downloading files than the Google Cloud Python library.

Parameters:
  • workflow – the workflow.

  • entity_name – the name of the OpenAlex entity, e.g. authors, institutions etc.

  • task_id – the task id.

Returns:

a WorkflowBashOperator instance.

academic_observatory_workflows.workflows.openalex_telescope.parse_release_msg(msg: Dict)[source]
academic_observatory_workflows.workflows.openalex_telescope.s3_uri_parts(s3_uri: str) Tuple[str, str][source]

Extracts the S3 bucket name and object key from the given S3 URI.

Parameters:

s3_uri – str, S3 URI in format s3://mybucketname/path/to/object

Returns:

tuple, (bucket_name, object_key)

class academic_observatory_workflows.workflows.openalex_telescope.Meta(content_length, record_count)[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) Meta[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.openalex_telescope.ManifestEntry(url: str, meta: Meta)[source]
property object_key[source]
property updated_date: pendulum.DateTime[source]
property file_name[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) ManifestEntry[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.openalex_telescope.Manifest(entries: List[ManifestEntry], meta: Meta)[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) Manifest[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.openalex_telescope.MergedId(url: str, content_length: int)[source]
property object_key[source]
property updated_date: pendulum.DateTime[source]
property file_name[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) MergedId[source]
to_dict() Dict[source]
academic_observatory_workflows.workflows.openalex_telescope.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) Manifest[source]

Fetch OpenAlex manifests for a range of entity types.

Parameters:
  • bucket – the OpenAlex AWS bucket.

  • aws_key – the aws_access_key_id and aws_secret_key as a tuple.

  • entity_name – the entity type.

Returns:

None

academic_observatory_workflows.workflows.openalex_telescope.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[MergedId][source]
academic_observatory_workflows.workflows.openalex_telescope.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list][source]

Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.

This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.

Parameters:
  • download_path – The path to the file with the OpenAlex entries.

  • transform_path – The path where transformed data will be saved.

Returns:

schema_map. A nested OrderedDict object produced by the SchemaGenertaor.

Returns:

schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.

academic_observatory_workflows.workflows.openalex_telescope.transform_object(obj: dict)[source]

Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.

Parameters:
  • obj – Single object with entity information

  • field – The field of interested that is transformed.

Returns:

None.

academic_observatory_workflows.workflows.openalex_telescope.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool[source]

Compare two Bigquery style schemas for if they have the same fields and/or data types.

Parameters:
  • expected – the expected schema.

  • actual – the actual schema.

Check_types_match:

Optional, if checking data types of fields is required.

Returns:

whether the expected and actual match.

academic_observatory_workflows.workflows.openalex_telescope.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict[source]

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.

Parameters:
  • to_add – The incoming schema to add to the existing “old” schema.

  • old – The existing old schema with previously populated values.

Returns:

The old schema with newly added fields.

academic_observatory_workflows.workflows.openalex_telescope.flatten_schema(schema_map: collections.OrderedDict) dict[source]

A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.

Parameters:

schema_map – The generated schema from SchemaGenerator.

Return schema:

A Bigquery style schema.

academic_observatory_workflows.workflows.openalex_telescope.load_json(file_path: str) Any[source]

Read in a *.json file.