academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope

Module Contents

Classes

TestOpenAlexUtils

TestOpenAlexTelescope

Tests for the OpenAlex telescope

Functions

upload_folder_to_s3(bucket_name, folder_path[, s3_prefix])

create_openalex_dataset(→ Dict)

Create an OpenAlex dataset based on data from the fixtures folder, uploading it to AWS S3 storage.

get_table_expiry(table_id)

Attributes

FIXTURES_FOLDER

academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.FIXTURES_FOLDER[source]
class academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.TestOpenAlexUtils(*args, **kwargs)[source]

Bases: observatory.platform.observatory_environment.ObservatoryTestCase

test_s3_uri_parts()[source]
test_meta()[source]
test_manifest_entry()[source]
test_manifest()[source]
test_merged_id()[source]
test_openalex_entity(m_variable_get)[source]
test_fetch_manifest()[source]
test_fetch_merged_ids()[source]
test_transform_object()[source]

Test the transform_object function.

test_bq_compare_schemas()[source]
test_merge_schema_maps()[source]
academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.upload_folder_to_s3(bucket_name: str, folder_path: str, s3_prefix=None)[source]
academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.create_openalex_dataset(input_path: pathlib.Path, bucket_name: str) Dict[source]

Create an OpenAlex dataset based on data from the fixtures folder, uploading it to AWS S3 storage.

Parameters:
  • input_path – the input path of the dataset in the fixtures folder.

  • bucket_name – the AWS S3 bucket name.

Returns:

a manifest index, with entity name as key and manfiest as value, to use for testing.

class academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.TestOpenAlexTelescope(*args, **kwargs)[source]

Bases: observatory.platform.observatory_environment.ObservatoryTestCase

Tests for the OpenAlex telescope

test_dag_structure()[source]

Test that the DAG has the correct structure.

test_dag_load()[source]

Test that the OpenAlex DAG can be loaded from a DAG bag.

test_telescope(m_send_slack_msg)[source]

Test the OpenAlex telescope end to end.

academic_observatory_workflows.openalex_telescope.tests.test_openalex_telescope.get_table_expiry(table_id: str)[source]