academic_observatory_workflows.openalex_telescope.openalex_telescope

Module Contents

Classes

OpenAlexEntity

Meta

ManifestEntry

Manifest

MergedId

Functions

create_dag(, dataset_description, ...)

Construct an OpenAlexTelescope instance.

get_entity(→ OpenAlexEntity)

get_task_id(**context)

make_no_updated_data_msg(→ str)

make_first_run_message(task_id)

make_no_merged_ids_msg(→ str)

get_aws_key(→ Tuple[str, str])

Get the AWS access key id and secret access key from the aws_conn_id airflow connection.

s3_uri_parts(→ Tuple[str, str])

Extracts the S3 bucket name and object key from the given S3 URI.

fetch_manifest(→ Manifest)

Fetch OpenAlex manifests for a range of entity types.

fetch_merged_ids(→ List[MergedId])

transform_file(→ Tuple[collections.OrderedDict, list])

Transforms a single file.

clean_array_field(obj, field)

transform_object(obj)

Transform an entry/object for one of the OpenAlex entities.

bq_compare_schemas(→ bool)

Compare two Bigquery style schemas for if they have the same fields and/or data types.

merge_schema_maps(→ collections.OrderedDict)

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found

flatten_schema(→ dict)

A quick trick using the JSON encoder and load string function to convert from a nested

load_json(→ Any)

Read in a *.json file.

bq_set_table_expiry(*, table_id, days)

Set the expiry time for a BigQuery table.

Attributes

TEMP_TABLE_DESCRIPTION

UPSERT_BYTE_LIMIT

academic_observatory_workflows.openalex_telescope.openalex_telescope.TEMP_TABLE_DESCRIPTION = 'Temporary table for internal use. Do not use.'[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.UPSERT_BYTE_LIMIT[source]
class academic_observatory_workflows.openalex_telescope.openalex_telescope.OpenAlexEntity(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, entity_name: str, bq_dataset_id: str, schema_folder: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, manifest: Manifest, merged_ids: List[MergedId], is_first_run: bool, prev_end_date: pendulum.DateTime | None)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property release_folder[source]

Get the path to the release folder, which resides inside the workflow folder.

Returns:

path to folder.

property table_description[source]
property schema_file_path[source]
property generated_schema_path[source]
property data_uri[source]
property merged_ids_uri[source]
property merged_ids_schema_file_path[source]
property bq_main_table_id[source]
property bq_upsert_table_id[source]
property bq_delete_table_id[source]
property bq_snapshot_table_id[source]
property current_entries[source]
property has_merged_ids[source]
property current_merged_ids[source]
static from_dict(dict_: Dict) OpenAlexEntity[source]
to_dict() Dict[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'openalex', entity_names: List[str] = None, schema_folder: str = project_path('openalex_telescope', 'schema'), dataset_description: str = 'The OpenAlex dataset: https://docs.openalex.org/', temp_table_expiry_days: int = 7, snapshot_expiry_days: int = 31, n_transfer_trys: int = 3, max_processes: int = os.cpu_count(), primary_key: str = 'id', aws_conn_id: str = 'aws_openalex', aws_openalex_bucket: str = 'openalex', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, slack_conn_id: str | None = AirflowConns.SLACK, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), schedule: str = '@weekly', queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

Construct an OpenAlexTelescope instance.

Parameters:
  • dag_id – the id of the DAG.

  • cloud_workspace – the cloud workspace settings.

  • bq_dataset_id – the BigQuery dataset id.

  • entity_names – the names of the OpenAlex entities to process.

  • schema_folder – the SQL schema path.

  • dataset_description – description for the BigQuery dataset.

  • temp_table_expiry_days – the number of days until the upsert or delete tables will expire.

  • snapshot_expiry_days – the number of days that a snapshot of each entity’s main table will take to expire,

which is set to 31 days so there is some time to rollback after an update. :param n_transfer_trys: how to many times to transfer data from AWS to GCS. :param max_processes: the maximum number of processes to use when transforming data. :param aws_conn_id: the AWS Airflow Connection ID. :param aws_openalex_bucket: the OpenAlex AWS bucket name. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param slack_conn_id: the Slack Connection ID. :param start_date: the Apache Airflow DAG start date. :param schedule: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, they are not released on any particular day of the month, so we instead simply run the workflow weekly on a Sunday as this will pickup new updates regularly. See here for past release dates: https://openalex.s3.amazonaws.com/RELEASE_NOTES.txt :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task.

academic_observatory_workflows.openalex_telescope.openalex_telescope.get_entity(entity_index: dict, entity_name: str) OpenAlexEntity[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.get_task_id(**context)[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.make_no_updated_data_msg(task_id: str, entity_name: str) str[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.make_first_run_message(task_id: str)[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.make_no_merged_ids_msg(task_id: str, entity_name: str) str[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.get_aws_key(aws_conn_id: str) Tuple[str, str][source]

Get the AWS access key id and secret access key from the aws_conn_id airflow connection.

Returns:

access key id and secret access key

academic_observatory_workflows.openalex_telescope.openalex_telescope.s3_uri_parts(s3_uri: str) Tuple[str, str][source]

Extracts the S3 bucket name and object key from the given S3 URI.

Parameters:

s3_uri – str, S3 URI in format s3://mybucketname/path/to/object

Returns:

tuple, (bucket_name, object_key)

class academic_observatory_workflows.openalex_telescope.openalex_telescope.Meta(content_length, record_count)[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) Meta[source]
to_dict() Dict[source]
class academic_observatory_workflows.openalex_telescope.openalex_telescope.ManifestEntry(url: str, meta: Meta)[source]
property object_key[source]
property updated_date: pendulum.DateTime[source]
property file_name[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) ManifestEntry[source]
to_dict() Dict[source]
class academic_observatory_workflows.openalex_telescope.openalex_telescope.Manifest(entries: List[ManifestEntry], meta: Meta)[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) Manifest[source]
to_dict() Dict[source]
class academic_observatory_workflows.openalex_telescope.openalex_telescope.MergedId(url: str, content_length: int)[source]
property object_key[source]
property updated_date: pendulum.DateTime[source]
property file_name[source]
__eq__(other)[source]

Return self==value.

static from_dict(dict_: Dict) MergedId[source]
to_dict() Dict[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) Manifest[source]

Fetch OpenAlex manifests for a range of entity types.

Parameters:
  • bucket – the OpenAlex AWS bucket.

  • aws_key – the aws_access_key_id and aws_secret_key as a tuple.

  • entity_name – the entity type.

Returns:

None

academic_observatory_workflows.openalex_telescope.openalex_telescope.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[MergedId][source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list][source]

Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.

This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.

Parameters:
  • download_path – The path to the file with the OpenAlex entries.

  • transform_path – The path where transformed data will be saved.

Returns:

schema_map. A nested OrderedDict object produced by the SchemaGenertaor.

Returns:

schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.

academic_observatory_workflows.openalex_telescope.openalex_telescope.clean_array_field(obj: dict, field: str)[source]
academic_observatory_workflows.openalex_telescope.openalex_telescope.transform_object(obj: dict)[source]

Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.

Parameters:
  • obj – Single object with entity information

  • field – The field of interested that is transformed.

Returns:

None.

academic_observatory_workflows.openalex_telescope.openalex_telescope.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool[source]

Compare two Bigquery style schemas for if they have the same fields and/or data types.

Parameters:
  • expected – the expected schema.

  • actual – the actual schema.

Check_types_match:

Optional, if checking data types of fields is required.

Returns:

whether the expected and actual match.

academic_observatory_workflows.openalex_telescope.openalex_telescope.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict[source]

Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.

Parameters:
  • to_add – The incoming schema to add to the existing “old” schema.

  • old – The existing old schema with previously populated values.

Returns:

The old schema with newly added fields.

academic_observatory_workflows.openalex_telescope.openalex_telescope.flatten_schema(schema_map: collections.OrderedDict) dict[source]

A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.

Parameters:

schema_map – The generated schema from SchemaGenerator.

Return schema:

A Bigquery style schema.

academic_observatory_workflows.openalex_telescope.openalex_telescope.load_json(file_path: str) Any[source]

Read in a *.json file.

academic_observatory_workflows.openalex_telescope.openalex_telescope.bq_set_table_expiry(*, table_id: str, days: int)[source]

Set the expiry time for a BigQuery table.

Parameters:
  • table_id – the fully qualified BigQuery table identifier.

  • days – the number of days from now until the table expires.

Returns: