academic_observatory_workflows.openalex_telescope.tasks ======================================================= .. py:module:: academic_observatory_workflows.openalex_telescope.tasks Attributes ---------- .. autoapisummary:: academic_observatory_workflows.openalex_telescope.tasks.DATASET_API_ENTITY_ID Functions --------- .. autoapisummary:: academic_observatory_workflows.openalex_telescope.tasks.fetch_entities academic_observatory_workflows.openalex_telescope.tasks.aws_to_gcs_transfer academic_observatory_workflows.openalex_telescope.tasks.download academic_observatory_workflows.openalex_telescope.tasks.transform academic_observatory_workflows.openalex_telescope.tasks.upload_schema academic_observatory_workflows.openalex_telescope.tasks.compare_schemas academic_observatory_workflows.openalex_telescope.tasks.upload_files academic_observatory_workflows.openalex_telescope.tasks.bq_load_table academic_observatory_workflows.openalex_telescope.tasks.expire_previous_version academic_observatory_workflows.openalex_telescope.tasks.add_dataset_release academic_observatory_workflows.openalex_telescope.tasks.get_entity academic_observatory_workflows.openalex_telescope.tasks.get_task_id academic_observatory_workflows.openalex_telescope.tasks.make_no_updated_data_msg academic_observatory_workflows.openalex_telescope.tasks.make_first_run_message academic_observatory_workflows.openalex_telescope.tasks.make_no_merged_ids_msg academic_observatory_workflows.openalex_telescope.tasks.get_aws_key academic_observatory_workflows.openalex_telescope.tasks.fetch_manifest academic_observatory_workflows.openalex_telescope.tasks.fetch_merged_ids academic_observatory_workflows.openalex_telescope.tasks.transform_file academic_observatory_workflows.openalex_telescope.tasks.remove_none_from_array academic_observatory_workflows.openalex_telescope.tasks.safe_get_dict academic_observatory_workflows.openalex_telescope.tasks.transform_object academic_observatory_workflows.openalex_telescope.tasks.bq_compare_schemas academic_observatory_workflows.openalex_telescope.tasks.merge_schema_maps academic_observatory_workflows.openalex_telescope.tasks.flatten_schema academic_observatory_workflows.openalex_telescope.tasks.load_json academic_observatory_workflows.openalex_telescope.tasks.bq_set_table_expiry Module Contents --------------- .. py:data:: DATASET_API_ENTITY_ID :value: 'openalex' .. py:function:: fetch_entities(*, dag_id: str, run_id: str, is_first_run: bool, entity_names: list[str], cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, schema_folder: str, bq_dataset_id: str, api_bq_dataset_id: str, aws_conn_id: str, aws_openalex_bucket: str, entity_id=DATASET_API_ENTITY_ID) -> dict[str, dict] .. py:function:: aws_to_gcs_transfer(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, gc_project_id: str, aws_conn_id: str, n_transfer_trys: int, aws_openalex_bucket: str) .. py:function:: download(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, **context) .. py:function:: transform(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity) .. py:function:: upload_schema(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str) .. py:function:: compare_schemas(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str, slack_conn_id: str, **context) .. py:function:: upload_files(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity, transform_bucket: str) .. py:function:: bq_load_table(*, entity: academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity) .. py:function:: expire_previous_version(dag_id: str, project_id: str, dataset_id: str, table_id: str, snapshot_date: pendulum.DateTime, expiry_days: int, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID, client: google.cloud.bigquery.Client = None) .. py:function:: add_dataset_release(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, bq_project_id: str, api_bq_dataset_id: str, entity_id: str = DATASET_API_ENTITY_ID) .. py:function:: get_entity(entity_index: dict, entity_name: str) -> Optional[academic_observatory_workflows.openalex_telescope.release.OpenAlexEntity] .. py:function:: get_task_id(**context) .. py:function:: make_no_updated_data_msg(task_id: str, entity_name: str) -> str .. py:function:: make_first_run_message(task_id: str) .. py:function:: make_no_merged_ids_msg(task_id: str, entity_name: str) -> str .. py:function:: get_aws_key(aws_conn_id: str) -> Tuple[str, str] Get the AWS access key id and secret access key from the aws_conn_id airflow connection. :return: access key id and secret access key .. py:function:: fetch_manifest(*, bucket: str, aws_key: Tuple[str, str], entity_name: str) -> academic_observatory_workflows.openalex_telescope.release.Manifest Fetch OpenAlex manifests for a range of entity types. :param bucket: the OpenAlex AWS bucket. :param aws_key: the aws_access_key_id and aws_secret_key as a tuple. :param entity_name: the entity type. :return: None .. py:function:: fetch_merged_ids(*, bucket: str, aws_key: Tuple[str, str], entity_name: str, prefix: str = 'data/merged_ids') -> List[academic_observatory_workflows.openalex_telescope.release.MergedId] .. py:function:: transform_file(download_path: str, transform_path: str) -> Tuple[collections.OrderedDict, list] Transforms a single file. Each entry/object in the gzip input file is transformed and the transformed object is immediately written out to a gzip file. For each entity only one field has to be transformed. This function generates and returnms a Bigquery style schema from the transformed object, using the ScehmaGenerator from the 'bigquery_schema_generator' package. :param download_path: The path to the file with the OpenAlex entries. :param transform_path: The path where transformed data will be saved. :return: schema_map. A nested OrderedDict object produced by the SchemaGenertaor. :return: schema_generator.error_logs: Possible error logs produced by the SchemaGenerator. .. py:function:: remove_none_from_array(obj: dict, field: str) Remove None values from an array. :param obj: the dictionary containing the array field. :param field: the key for the array field. :return: None .. py:function:: safe_get_dict(obj: dict, field: str) -> dict .. py:function:: transform_object(obj: dict) Transform an entry/object for one of the OpenAlex entities. For the Work entity only the "abstract_inverted_index" field is transformed. For the Concept and Institution entities only the "international" field is transformed. :param obj: Single object with entity information :param field: The field of interested that is transformed. :return: None. .. py:function:: bq_compare_schemas(expected: List[dict], actual: List[dict], check_types_match: Optional[bool] = False) -> bool Compare two Bigquery style schemas for if they have the same fields and/or data types. :param expected: the expected schema. :param actual: the actual schema. :check_types_match: Optional, if checking data types of fields is required. :return: whether the expected and actual match. .. py:function:: merge_schema_maps(to_add: collections.OrderedDict, old: collections.OrderedDict) -> collections.OrderedDict Using the SchemaGenerator from the bigquery_schema_generator library, merge the schemas found when from scanning through files into one large nested OrderedDict. :param to_add: The incoming schema to add to the existing "old" schema. :param old: The existing old schema with previously populated values. :return: The old schema with newly added fields. .. py:function:: flatten_schema(schema_map: collections.OrderedDict) -> dict A quick trick using the JSON encoder and load string function to convert from a nested OrderedDict object to a regular dictionary. :param schema_map: The generated schema from SchemaGenerator. :return schema: A Bigquery style schema. .. py:function:: load_json(file_path: str) -> Any Read in a *.json file. .. py:function:: bq_set_table_expiry(*, table_id: str, days: int) Set the expiry time for a BigQuery table. :param table_id: the fully qualified BigQuery table identifier. :param days: the number of days from now until the table expires. :return: