academic_observatory_workflows.workflows.openalex_telescope
Module Contents
Classes
OpenAlex telescope |
|
Functions
|
|
|
|
|
|
|
|
Download files for an entity from the bucket. |
|
|
|
|
Extracts the S3 bucket name and object key from the given S3 URI. |
|
Fetch OpenAlex manifests for a range of entity types. |
|
|
|
Transforms a single file. |
|
Transform an entry/object for one of the OpenAlex entities. |
|
Compare two Bigquery style schemas for if they have the same fields and/or data types. |
|
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found |
|
A quick trick using the JSON encoder and load string function to convert from a nested |
|
Read in a *.json file. |
- class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexEntity(entity_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, manifest: Manifest, merged_ids: List[MergedId], is_first_run: bool, prev_end_date: pendulum.DateTime | None, release: OpenAlexRelease = None)[source]
-
- static from_dict(dict_: Dict) OpenAlexEntity[source]
- class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, entities: List[OpenAlexEntity], start_date: pendulum.DateTime, end_date: pendulum.DateTime, is_first_run: bool, schema_folder: str)[source]
Bases:
observatory.platform.workflows.workflow.ChangefileRelease- get_entity(entity_name: str) OpenAlexEntity | None[source]
- academic_observatory_workflows.workflows.openalex_telescope.make_no_updated_data_msg(task_id: str, entity_name: str) str[source]
- academic_observatory_workflows.workflows.openalex_telescope.make_first_run_message(task_id: str)[source]
- academic_observatory_workflows.workflows.openalex_telescope.make_no_merged_ids_msg(task_id: str, entity_name: str) str[source]
- class academic_observatory_workflows.workflows.openalex_telescope.OpenAlexTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'openalex', entity_names: List[str] = None, schema_folder: str = os.path.join(default_schema_folder(), 'openalex'), dataset_description: str = 'The OpenAlex dataset: https://docs.openalex.org/', snapshot_expiry_days: int = 31, n_transfer_trys: int = 3, max_processes: int = os.cpu_count(), primary_key: str = 'id', aws_conn_id: str = 'aws_openalex', aws_openalex_bucket: str = 'openalex', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, slack_conn_id: str | None = AirflowConns.SLACK, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), schedule: str = '@weekly', queue: str = 'remote_queue')[source]
Bases:
observatory.platform.workflows.workflow.WorkflowOpenAlex telescope
- property aws_key: Tuple[str, str][source]
Get the AWS access key id and secret access key from the aws_conn_id airflow connection.
- Returns:
access key id and secret access key
- fetch_releases(**kwargs) bool[source]
Fetch OpenAlex releases.
- Returns:
True to continue, False to skip.
- make_release(**kwargs) OpenAlexRelease[source]
Make a Release instance. Gets the list of releases available from the release check (setup task).
- Parameters:
kwargs – the context passed from the Airflow Operator.
See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return OpenAlexRelease.
- aws_to_gcs_transfer(release: OpenAlexRelease, **kwargs)[source]
Transfer files from AWS bucket to Google Cloud bucket
- determine_branches(release: OpenAlexRelease, **kwargs)[source]
Determine which entity branches to follow
- bq_snapshot(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.
- transform(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Transform all files for the Work, Concept and Institution entities. Transforms one file per process.
This step also scans through each file and generates a Biguqery style schema from the incoming data.
- upload_schema(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Upload the generated schema from the transform step to GCS.
- compare_schemas(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Compare the generated schema against the expected schema for each entity.
- upload_upsert_files(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Upload the transformed data to Cloud Storage. :raises AirflowException: Raised if the files to be uploaded are not found.
- bq_load_upsert_tables(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Load the upsert table for each entity.
- bq_upsert_records(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Upsert the records from each upserts table into the main table.
- bq_load_delete_tables(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Load the delete tables.
- bq_delete_records(release: OpenAlexRelease, entity_name: str = None, **kwargs)[source]
Delete records from main tables that are in delete tables.
- add_dataset_releases(release: OpenAlexRelease, entity_name: str = None, **kwargs) None[source]
Adds release information to API.
- cleanup(release: OpenAlexRelease, **kwargs) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.workflows.openalex_telescope.make_download_bash_op(workflow: observatory.platform.workflows.workflow.Workflow, entity_name: str, task_id: str, **kwargs) observatory.platform.workflows.workflow.WorkflowBashOperator[source]
Download files for an entity from the bucket.
Gsutil is used instead of the standard Google Cloud Python library, because it is faster at downloading files than the Google Cloud Python library.
- Parameters:
workflow – the workflow.
entity_name – the name of the OpenAlex entity, e.g. authors, institutions etc.
task_id – the task id.
- Returns:
a WorkflowBashOperator instance.
- academic_observatory_workflows.workflows.openalex_telescope.s3_uri_parts(s3_uri: str) Tuple[str, str][source]
Extracts the S3 bucket name and object key from the given S3 URI.
- Parameters:
s3_uri – str, S3 URI in format s3://mybucketname/path/to/object
- Returns:
tuple, (bucket_name, object_key)
- class academic_observatory_workflows.workflows.openalex_telescope.Meta(content_length, record_count)[source]
- class academic_observatory_workflows.workflows.openalex_telescope.ManifestEntry(url: str, meta: Meta)[source]
-
- static from_dict(dict_: Dict) ManifestEntry[source]
- class academic_observatory_workflows.workflows.openalex_telescope.Manifest(entries: List[ManifestEntry], meta: Meta)[source]
- class academic_observatory_workflows.workflows.openalex_telescope.MergedId(url: str, content_length: int)[source]
- academic_observatory_workflows.workflows.openalex_telescope.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) Manifest[source]
Fetch OpenAlex manifests for a range of entity types.
- Parameters:
bucket – the OpenAlex AWS bucket.
aws_key – the aws_access_key_id and aws_secret_key as a tuple.
entity_name – the entity type.
- Returns:
None
- academic_observatory_workflows.workflows.openalex_telescope.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[MergedId][source]
- academic_observatory_workflows.workflows.openalex_telescope.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list][source]
Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.
This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.
- Parameters:
download_path – The path to the file with the OpenAlex entries.
transform_path – The path where transformed data will be saved.
- Returns:
schema_map. A nested OrderedDict object produced by the SchemaGenertaor.
- Returns:
schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.
- academic_observatory_workflows.workflows.openalex_telescope.transform_object(obj: dict)[source]
Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.
- Parameters:
obj – Single object with entity information
field – The field of interested that is transformed.
- Returns:
None.
- academic_observatory_workflows.workflows.openalex_telescope.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool[source]
Compare two Bigquery style schemas for if they have the same fields and/or data types.
- Parameters:
expected – the expected schema.
actual – the actual schema.
- Check_types_match:
Optional, if checking data types of fields is required.
- Returns:
whether the expected and actual match.
- academic_observatory_workflows.workflows.openalex_telescope.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict[source]
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.
- Parameters:
to_add – The incoming schema to add to the existing “old” schema.
old – The existing old schema with previously populated values.
- Returns:
The old schema with newly added fields.
- academic_observatory_workflows.workflows.openalex_telescope.flatten_schema(schema_map: collections.OrderedDict) dict[source]
A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.
- Parameters:
schema_map – The generated schema from SchemaGenerator.
- Return schema:
A Bigquery style schema.