Source code for academic_observatory_workflows.unpaywall_telescope.tests.test_telescope

# Copyright 2020 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: Tuan Chien, James Diprose

import os
from unittest.mock import patch

import pendulum

from observatory_platform.dataset_api import DatasetAPI
from observatory_platform.airflow.airflow import clear_airflow_connections, upsert_airflow_connection
from academic_observatory_workflows.unpaywall_telescope.telescope import create_dag, DagParams

from academic_observatory_workflows.config import project_path, TestConfig
from observatory_platform.google.bigquery import bq_table_id, bq_sharded_table_id
from observatory_platform.airflow.workflow import Workflow
from observatory_platform.sandbox.test_utils import load_and_parse_json, SandboxTestCase
from observatory_platform.sandbox.sandbox_environment import SandboxEnvironment


[docs]FIXTURES_FOLDER = project_path("unpaywall_telescope", "tests", "fixtures")
[docs]class TestUnpaywallTelescope(SandboxTestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] self.dag_id = "unpaywall"
[docs] self.project_id = os.getenv("TEST_GCP_PROJECT_ID")
[docs] self.data_location = os.getenv("TEST_GCP_DATA_LOCATION")
[docs] def test_dag_structure(self): """Test that the DAG has the correct structure.""" dag = create_dag(DagParams(dag_id=self.dag_id, cloud_workspace=self.fake_cloud_workspace)) self.assert_dag_structure( { "wait_for_prev_dag_run": {"check_dependencies"}, "check_dependencies": {"fetch_release"}, "fetch_release": { "load_changefiles.bq_upsert", "load_snapshot.bq_load", "load_snapshot.download", "load_snapshot.upload_main_table_files", "load_snapshot.split_main_table_file", "load_snapshot.transform", "load_changefiles.download", "branch", "cleanup_workflow", "short_circuit", "load_changefiles.transform", "add_dataset_release", "load_changefiles.extract", "load_changefiles.upload", "bq_create_main_table_snapshot", "load_changefiles.bq_load", "load_snapshot.extract", }, "short_circuit": {"create_dataset"}, "create_dataset": {"bq_create_main_table_snapshot"}, "bq_create_main_table_snapshot": {"gke_create_storage"}, "gke_create_storage": {"branch"}, "branch": {"load_snapshot.download", "load_changefiles.download"}, "load_snapshot.download": {"load_snapshot.extract"}, "load_snapshot.extract": {"load_snapshot.transform"}, "load_snapshot.transform": {"load_snapshot.split_main_table_file"}, "load_snapshot.split_main_table_file": {"load_snapshot.upload_main_table_files"}, "load_snapshot.upload_main_table_files": {"load_snapshot.bq_load"}, "load_snapshot.bq_load": {"load_changefiles.download"}, "load_changefiles.download": {"load_changefiles.extract"}, "load_changefiles.extract": {"load_changefiles.transform"}, "load_changefiles.transform": {"load_changefiles.upload"}, "load_changefiles.upload": {"load_changefiles.bq_load"}, "load_changefiles.bq_load": {"load_changefiles.bq_upsert"}, "load_changefiles.bq_upsert": {"merge_branches"}, "merge_branches": {"gke_delete_storage"}, "gke_delete_storage": {"add_dataset_release"}, "add_dataset_release": {"cleanup_workflow"}, "cleanup_workflow": {"dag_run_complete"}, "dag_run_complete": {}, }, dag, )
[docs] def test_dag_load(self): """Test that workflow can be loaded from a DAG bag.""" env = SandboxEnvironment( workflows=[ Workflow( dag_id=self.dag_id, name="Unpaywall Telescope", class_name="academic_observatory_workflows.unpaywall_telescope.telescope", cloud_workspace=self.fake_cloud_workspace, ) ] ) with env.create(): dag_file = os.path.join(project_path(), "..", "..", "dags", "load_dags.py") self.assert_dag_load(self.dag_id, dag_file)
[docs] def test_telescope(self): """Test workflow end to end. The test files in fixtures/unpaywall have been carefully crafted to make sure that the data is loaded into BigQuery correctly. """ env = SandboxEnvironment(project_id=TestConfig.gcp_project_id, data_location=TestConfig.gcp_data_location) api_bq_dataset_id = env.add_dataset("unpaywall_api") bq_dataset_id = env.add_dataset("unpaywall") with env.create(task_logging=True): task_resources = { "load_snapshot_download": {"memory": "2G", "cpu": "2"}, "load_snapshot_extract": {"memory": "2G", "cpu": "2"}, "load_snapshot_transform": {"memory": "2G", "cpu": "2"}, "load_snapshot_split_main_table_file": {"memory": "2G", "cpu": "2"}, "load_snapshot_upload_main_table_files": {"memory": "2G", "cpu": "2"}, "load_changefiles_download": {"memory": "2G", "cpu": "2"}, "load_changefiles_extract": {"memory": "2G", "cpu": "2"}, "load_changefiles_transform": {"memory": "2G", "cpu": "2"}, "load_changefiles_upload": {"memory": "2G", "cpu": "2"}, } start_date = pendulum.datetime(2021, 7, 2) test_params = DagParams( dag_id="test_unpaywall", cloud_workspace=env.cloud_workspace, retries=0, bq_dataset_id=bq_dataset_id, api_bq_dataset_id=api_bq_dataset_id, start_date=start_date, unpaywall_base_url=f"http://{TestConfig.http_host_url}:{TestConfig.http_port}/unpaywall/1", # Add /1/ for proper routing in the http-server gke_image=TestConfig.gke_image, gke_namespace=TestConfig.gke_namespace, gke_resource_overrides=task_resources, gke_volume_size="500Mi", test_run=True, ) api = DatasetAPI(bq_project_id=env.cloud_workspace.project_id, bq_dataset_id=test_params.api_bq_dataset_id) main_table_id = bq_table_id( test_params.cloud_workspace.project_id, test_params.bq_dataset_id, test_params.bq_table_name ) upsert_table_id = f"{main_table_id}_upsert" ### First run: snapshot and initial changefiles data_interval_start = pendulum.datetime(2023, 4, 25) data_interval_end = data_interval_start.end_of("day") snapshot_date = pendulum.datetime(2023, 4, 25, 8, 30, 2) clear_airflow_connections() upsert_airflow_connection(conn_id="unpaywall", conn_type="http", password="secret") upsert_airflow_connection(**TestConfig.gke_cluster_connection) with patch("academic_observatory_workflows.unpaywall_telescope.tasks.get_http_response_json") as cfs, patch( "academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name" ) as ss: cfs.return_value = { "list": [{"filename": "changed_dois_with_versions_2023-04-25T080001.jsonl.gz", "filetype": "jsonl"}] } ss.return_value = f"unpaywall_snapshot_{snapshot_date.format('YYYY-MM-DDTHHmmss')}.jsonl.gz" dagrun = create_dag(dag_params=test_params).test(execution_date=data_interval_end) # Make assertions if not dagrun.state == "success": raise RuntimeError("First Dagrun did not complete successfully") self.assert_table_integrity(upsert_table_id, expected_rows=2) expected_content = load_and_parse_json( os.path.join(FIXTURES_FOLDER, "expected", "run1_main_table.json"), date_fields={"oa_date", "published_date"}, timestamp_fields={"updated"}, ) self.assert_table_content(main_table_id, expected_content, "doi") api_releases = api.get_dataset_releases(dag_id=test_params.dag_id, entity_id="unpaywall") self.assertEqual(len(api_releases), 1) ### Second Run: No new changefiles data_interval_start = pendulum.datetime(2023, 4, 26) data_interval_end = data_interval_start.end_of("day") with patch("academic_observatory_workflows.unpaywall_telescope.tasks.get_http_response_json") as cfs, patch( "academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name" ) as ss: cfs.return_value = {"list": []} ss.return_value = "filename" dagrun = create_dag(dag_params=test_params).test(execution_date=data_interval_end) # Make assertions if not dagrun.state == "success": raise RuntimeError("Second Dagrun did not complete successfully") # Check that only 1 dataset release exists api_releases = api.get_dataset_releases(dag_id=test_params.dag_id, entity_id="unpaywall") self.assertEqual(len(api_releases), 1) ## Third run: waiting a couple of days and applying multiple changefiles prev_end_date = pendulum.datetime(2023, 4, 25, 8, 0, 1) data_interval_start = pendulum.datetime(2023, 4, 27) data_interval_end = data_interval_start.end_of("day") with patch("academic_observatory_workflows.unpaywall_telescope.tasks.get_http_response_json") as cfs, patch( "academic_observatory_workflows.unpaywall_telescope.tasks.get_snapshot_file_name" ) as ss: cfs.return_value = { "list": [ {"filename": "changed_dois_with_versions_2023-04-27T080001.jsonl.gz", "filetype": "jsonl"}, {"filename": "changed_dois_with_versions_2023-04-26T080001.jsonl.gz", "filetype": "jsonl"}, ] } ss.return_value = f"unpaywall_snapshot_{snapshot_date.format('YYYY-MM-DDTHHmmss')}.jsonl.gz" dagrun = create_dag(dag_params=test_params).test(execution_date=data_interval_end) # Make assertions if not dagrun.state == "success": raise RuntimeError("Third Dagrun did not complete successfully") dst_table_id = bq_sharded_table_id( test_params.cloud_workspace.project_id, test_params.bq_dataset_id, "unpaywall_snapshot", prev_end_date, ) self.assert_table_integrity(dst_table_id, expected_rows=12) self.assert_table_integrity(upsert_table_id, expected_rows=4) self.assert_table_integrity(main_table_id, expected_rows=14) expected_content = load_and_parse_json( os.path.join(FIXTURES_FOLDER, "expected", "run3_main_table.json"), date_fields={"oa_date", "published_date"}, timestamp_fields={"updated"}, ) self.assert_table_content(main_table_id, expected_content, "doi") api_releases = api.get_dataset_releases(dag_id=test_params.dag_id, entity_id="unpaywall") self.assertEqual(len(api_releases), 2)