academic_observatory_workflows.openalex_telescope.openalex_telescope
Module Contents
Classes
Functions
|
Construct an OpenAlexTelescope instance. |
|
|
|
|
|
|
|
|
|
|
|
Get the AWS access key id and secret access key from the aws_conn_id airflow connection. |
|
Extracts the S3 bucket name and object key from the given S3 URI. |
|
Fetch OpenAlex manifests for a range of entity types. |
|
|
|
Transforms a single file. |
|
|
|
Transform an entry/object for one of the OpenAlex entities. |
|
Compare two Bigquery style schemas for if they have the same fields and/or data types. |
|
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found |
|
A quick trick using the JSON encoder and load string function to convert from a nested |
|
Read in a *.json file. |
|
Set the expiry time for a BigQuery table. |
Attributes
- academic_observatory_workflows.openalex_telescope.openalex_telescope.TEMP_TABLE_DESCRIPTION = 'Temporary table for internal use. Do not use.'[source]
- class academic_observatory_workflows.openalex_telescope.openalex_telescope.OpenAlexEntity(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, entity_name: str, bq_dataset_id: str, schema_folder: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, manifest: Manifest, merged_ids: List[MergedId], is_first_run: bool, prev_end_date: pendulum.DateTime | None)[source]
Bases:
observatory.platform.workflows.workflow.ChangefileRelease
- property release_folder[source]
Get the path to the release folder, which resides inside the workflow folder.
- Returns:
path to folder.
- static from_dict(dict_: Dict) OpenAlexEntity [source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'openalex', entity_names: List[str] = None, schema_folder: str = project_path('openalex_telescope', 'schema'), dataset_description: str = 'The OpenAlex dataset: https://docs.openalex.org/', temp_table_expiry_days: int = 7, snapshot_expiry_days: int = 31, n_transfer_trys: int = 3, max_processes: int = os.cpu_count(), primary_key: str = 'id', aws_conn_id: str = 'aws_openalex', aws_openalex_bucket: str = 'openalex', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, slack_conn_id: str | None = AirflowConns.SLACK, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), schedule: str = '@weekly', queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG [source]
Construct an OpenAlexTelescope instance.
- Parameters:
dag_id – the id of the DAG.
cloud_workspace – the cloud workspace settings.
bq_dataset_id – the BigQuery dataset id.
entity_names – the names of the OpenAlex entities to process.
schema_folder – the SQL schema path.
dataset_description – description for the BigQuery dataset.
temp_table_expiry_days – the number of days until the upsert or delete tables will expire.
snapshot_expiry_days – the number of days that a snapshot of each entity’s main table will take to expire,
which is set to 31 days so there is some time to rollback after an update. :param n_transfer_trys: how to many times to transfer data from AWS to GCS. :param max_processes: the maximum number of processes to use when transforming data. :param aws_conn_id: the AWS Airflow Connection ID. :param aws_openalex_bucket: the OpenAlex AWS bucket name. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param slack_conn_id: the Slack Connection ID. :param start_date: the Apache Airflow DAG start date. :param schedule: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, they are not released on any particular day of the month, so we instead simply run the workflow weekly on a Sunday as this will pickup new updates regularly. See here for past release dates: https://openalex.s3.amazonaws.com/RELEASE_NOTES.txt :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.get_entity(entity_index: dict, entity_name: str) OpenAlexEntity [source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.get_task_id(**context)[source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.make_no_updated_data_msg(task_id: str, entity_name: str) str [source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.make_first_run_message(task_id: str)[source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.make_no_merged_ids_msg(task_id: str, entity_name: str) str [source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.get_aws_key(aws_conn_id: str) Tuple[str, str] [source]
Get the AWS access key id and secret access key from the aws_conn_id airflow connection.
- Returns:
access key id and secret access key
- academic_observatory_workflows.openalex_telescope.openalex_telescope.s3_uri_parts(s3_uri: str) Tuple[str, str] [source]
Extracts the S3 bucket name and object key from the given S3 URI.
- Parameters:
s3_uri – str, S3 URI in format s3://mybucketname/path/to/object
- Returns:
tuple, (bucket_name, object_key)
- class academic_observatory_workflows.openalex_telescope.openalex_telescope.Meta(content_length, record_count)[source]
- class academic_observatory_workflows.openalex_telescope.openalex_telescope.ManifestEntry(url: str, meta: Meta)[source]
-
- static from_dict(dict_: Dict) ManifestEntry [source]
- class academic_observatory_workflows.openalex_telescope.openalex_telescope.Manifest(entries: List[ManifestEntry], meta: Meta)[source]
- class academic_observatory_workflows.openalex_telescope.openalex_telescope.MergedId(url: str, content_length: int)[source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) Manifest [source]
Fetch OpenAlex manifests for a range of entity types.
- Parameters:
bucket – the OpenAlex AWS bucket.
aws_key – the aws_access_key_id and aws_secret_key as a tuple.
entity_name – the entity type.
- Returns:
None
- academic_observatory_workflows.openalex_telescope.openalex_telescope.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[MergedId] [source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list] [source]
Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.
This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.
- Parameters:
download_path – The path to the file with the OpenAlex entries.
transform_path – The path where transformed data will be saved.
- Returns:
schema_map. A nested OrderedDict object produced by the SchemaGenertaor.
- Returns:
schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.clean_array_field(obj: dict, field: str)[source]
- academic_observatory_workflows.openalex_telescope.openalex_telescope.transform_object(obj: dict)[source]
Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.
- Parameters:
obj – Single object with entity information
field – The field of interested that is transformed.
- Returns:
None.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool [source]
Compare two Bigquery style schemas for if they have the same fields and/or data types.
- Parameters:
expected – the expected schema.
actual – the actual schema.
- Check_types_match:
Optional, if checking data types of fields is required.
- Returns:
whether the expected and actual match.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict [source]
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.
- Parameters:
to_add – The incoming schema to add to the existing “old” schema.
old – The existing old schema with previously populated values.
- Returns:
The old schema with newly added fields.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.flatten_schema(schema_map: collections.OrderedDict) dict [source]
A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.
- Parameters:
schema_map – The generated schema from SchemaGenerator.
- Return schema:
A Bigquery style schema.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.load_json(file_path: str) Any [source]
Read in a *.json file.
- academic_observatory_workflows.openalex_telescope.openalex_telescope.bq_set_table_expiry(*, table_id: str, days: int)[source]
Set the expiry time for a BigQuery table.
- Parameters:
table_id – the fully qualified BigQuery table identifier.
days – the number of days from now until the table expires.
- Returns: