academic_observatory_workflows.openalex_telescope.tasks
Attributes
Functions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Get the AWS access key id and secret access key from the aws_conn_id airflow connection. |
|
Fetch OpenAlex manifests for a range of entity types. |
|
|
|
Transforms a single file. |
|
Remove None values from an array. |
|
|
|
Transform an entry/object for one of the OpenAlex entities. |
|
Compare two Bigquery style schemas for if they have the same fields and/or data types. |
|
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found |
|
A quick trick using the JSON encoder and load string function to convert from a nested |
|
Read in a *.json file. |
|
Set the expiry time for a BigQuery table. |
Module Contents
- academic_observatory_workflows.openalex_telescope.tasks.fetch_entities(*, dag_id: str, run_id: str, is_first_run: bool, entity_names: list[str], cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_folder: str, bq_dataset_id: str, api_bq_dataset_id: str, aws_conn_id: str, aws_openalex_bucket: str, entity_id=DATASET_API_ENTITY_ID) dict[str, dict][source]
- academic_observatory_workflows.openalex_telescope.tasks.aws_to_gcs_transfer(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, gc_project_id: str, aws_conn_id: str, n_transfer_trys: int, aws_openalex_bucket: str)[source]
- academic_observatory_workflows.openalex_telescope.tasks.download(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, **context)[source]
- academic_observatory_workflows.openalex_telescope.tasks.transform(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity)[source]
- academic_observatory_workflows.openalex_telescope.tasks.upload_schema(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str)[source]
- academic_observatory_workflows.openalex_telescope.tasks.compare_schemas(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str, slack_conn_id: str, **context)[source]
- academic_observatory_workflows.openalex_telescope.tasks.upload_files(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str)[source]
- academic_observatory_workflows.openalex_telescope.tasks.bq_load_table(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity)[source]
- academic_observatory_workflows.openalex_telescope.tasks.expire_previous_version(dag_id: str, project_id: str, dataset_id: str, table_id: str, snapshot_date: pendulum.DateTime, expiry_days: int, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID, client: google.cloud.bigquery.Client = None)[source]
- academic_observatory_workflows.openalex_telescope.tasks.add_dataset_release(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, bq_project_id: str, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID)[source]
- academic_observatory_workflows.openalex_telescope.tasks.get_entity(entity_index: dict, entity_name: str) academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity | None[source]
- academic_observatory_workflows.openalex_telescope.tasks.make_no_updated_data_msg(task_id: str, entity_name: str) str[source]
- academic_observatory_workflows.openalex_telescope.tasks.make_first_run_message(task_id: str)[source]
- academic_observatory_workflows.openalex_telescope.tasks.make_no_merged_ids_msg(task_id: str, entity_name: str) str[source]
- academic_observatory_workflows.openalex_telescope.tasks.get_aws_key(aws_conn_id: str) Tuple[str, str][source]
Get the AWS access key id and secret access key from the aws_conn_id airflow connection.
- Returns:
access key id and secret access key
- academic_observatory_workflows.openalex_telescope.tasks.fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) academic_observatory_workflows.openalex_telescope.release.Manifest[source]
Fetch OpenAlex manifests for a range of entity types.
- Parameters:
bucket – the OpenAlex AWS bucket.
aws_key – the aws_access_key_id and aws_secret_key as a tuple.
entity_name – the entity type.
- Returns:
None
- academic_observatory_workflows.openalex_telescope.tasks.fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') List[academic_observatory_workflows.openalex_telescope.release.MergedId][source]
- academic_observatory_workflows.openalex_telescope.tasks.transform_file(download_path: str, transform_path: str) Tuple[collections.OrderedDict, list][source]
Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed.
This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the ‘bigquery_schema_generator’ package.
- Parameters:
download_path – The path to the file with the OpenAlex entries.
transform_path – The path where transformed data will be saved.
- Returns:
schema_map. A nested OrderedDict object produced by the SchemaGenertaor.
- Returns:
schema_generator.error_logs: Possible error logs produced by the SchemaGenerator.
- academic_observatory_workflows.openalex_telescope.tasks.remove_none_from_array(obj: dict, field: str)[source]
Remove None values from an array.
- Parameters:
obj – the dictionary containing the array field.
field – the key for the array field.
- Returns:
None
- academic_observatory_workflows.openalex_telescope.tasks.safe_get_dict(obj: dict, field: str) dict[source]
- academic_observatory_workflows.openalex_telescope.tasks.transform_object(obj: dict)[source]
Transform an entry/object for one of the OpenAlex entities. For the Work entity only the “abstract_inverted_index” field is transformed. For the Concept and Institution entities only the “international” field is transformed.
- Parameters:
obj – Single object with entity information
field – The field of interested that is transformed.
- Returns:
None.
- academic_observatory_workflows.openalex_telescope.tasks.bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: bool | None = False) bool[source]
Compare two Bigquery style schemas for if they have the same fields and/or data types.
- Parameters:
expected – the expected schema.
actual – the actual schema.
- Check_types_match:
Optional, if checking data types of fields is required.
- Returns:
whether the expected and actual match.
- academic_observatory_workflows.openalex_telescope.tasks.merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) collections.OrderedDict[source]
Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict.
- Parameters:
to_add – The incoming schema to add to the existing “old” schema.
old – The existing old schema with previously populated values.
- Returns:
The old schema with newly added fields.
- academic_observatory_workflows.openalex_telescope.tasks.flatten_schema(schema_map: collections.OrderedDict) dict[source]
A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary.
- Parameters:
schema_map – The generated schema from SchemaGenerator.
- Return schema:
A Bigquery style schema.