Source code for academic_observatory_workflows.scopus_telescope.telescope

# Copyright 2020-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Author: Tuan Chien, James Diprose

from typing import List

import pendulum
from airflow.decorators import dag, task

from academic_observatory_workflows.config import project_path
from academic_observatory_workflows.scopus_telescope import tasks
from observatory_platform.airflow.airflow import on_failure_callback
from observatory_platform.airflow.workflow import CloudWorkspace
from observatory_platform.airflow.tasks import check_dependencies


[docs]class DagParams: """ :param dag_id: the id of the DAG. :param cloud_workspace: the cloud workspace settings. :param institution_ids: list of institution IDs to use for the Scopus search query. :param scopus_conn_ids: list of Scopus Airflow Connection IDs. :param view: The view type. Standard or complete. See https://dev.elsevier.com/sc_search_views.html :param earliest_date: earliest date to query for results. :param bq_dataset_id: the BigQuery dataset id. :param bq_table_name: the BigQuery table name. :param api_bq_dataset_id: the Dataset ID to use when storing releases. :param schema_folder: the SQL schema path. :param dataset_description: description for the BigQuery dataset. :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. :param schedule: the schedule interval of the DAG. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task. """ def __init__( self, *, dag_id: str, cloud_workspace: CloudWorkspace, institution_ids: List[str], scopus_conn_ids: List[str], view: str = "STANDARD", earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1), bq_dataset_id: str = "scopus", bq_table_name: str = "scopus", api_bq_dataset_id: str = "dataset_api", schema_folder: str = project_path("scopus_telescope", "schema"), dataset_description: str = "The Scopus citation database: https://www.scopus.com", table_description: str = "The Scopus citation database: https://www.scopus.com", start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), schedule: str = "@monthly", max_active_runs: int = 1, retries: int = 3, ):
[docs] self.dag_id = dag_id
[docs] self.cloud_workspace = cloud_workspace
[docs] self.institution_ids = institution_ids
[docs] self.scopus_conn_ids = scopus_conn_ids
[docs] self.view = view
[docs] self.earliest_date = earliest_date
[docs] self.bq_dataset_id = bq_dataset_id
[docs] self.bq_table_name = bq_table_name
[docs] self.api_bq_dataset_id = api_bq_dataset_id
[docs] self.schema_folder = schema_folder
[docs] self.dataset_description = dataset_description
[docs] self.table_description = table_description
[docs] self.start_date = start_date
[docs] self.schedule = schedule
[docs] self.max_active_runs = max_active_runs
[docs] self.retries = retries
[docs]def create_dag(dag_params: DagParams): """Scopus telescope.""" @dag( dag_id=dag_params.dag_id, start_date=dag_params.start_date, schedule=dag_params.schedule, catchup=False, max_active_runs=dag_params.max_active_runs, tags=["academic-observatory"], default_args={ "owner": "airflow", "on_failure_callback": on_failure_callback, "retries": dag_params.retries, }, ) def scopus(): @task def fetch_release(dag_params: DagParams, **context) -> dict: """Fetch the release""" return tasks.fetch_release( dag_id=dag_params.dag_id, cloud_workspace=dag_params.cloud_workspace, run_id=context["run_id"], data_interval_start=context["data_interval_start"], data_interval_end=context["data_interval_end"], ) @task def download(release: dict, dag_params: DagParams, **context): """Download snapshot from SCOPUS for the given institution.""" tasks.download( release, earliest_date=dag_params.earliest_date, scopus_conn_ids=dag_params.scopus_conn_ids, view=dag_params.view, institution_ids=dag_params.institution_ids, ) @task def transform(release: dict, dag_params: DagParams, **context): """Transform the data into database format.""" tasks.transform(release, institution_ids=dag_params.institution_ids) @task def bq_load(release: dict, dag_params: DagParams, **context): """Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.""" tasks.bq_load( release=release, bq_dataset_id=dag_params.bq_dataset_id, dataset_description=dag_params.dataset_description, bq_table_name=dag_params.bq_table_name, table_description=dag_params.table_description, schema_folder=dag_params.schema_folder, ) @task def add_dataset_release(release: dict, **context) -> None: """Adds release information to API.""" tasks.add_dataset_release(release, api_bq_dataset_id=dag_params.api_bq_dataset_id) @task def cleanup_workflow(release: dict, **context) -> None: """Delete all files, folders and XComs associated with this release.""" tasks.cleanup_workflow(release) # Define task connections task_check_dependencies = check_dependencies(airflow_conns=dag_params.scopus_conn_ids) xcom_release = fetch_release(dag_params) task_download = download(xcom_release, dag_params) task_transform = transform(xcom_release, dag_params) task_bq_load = bq_load(xcom_release, dag_params) task_add_dataset_release = add_dataset_release(xcom_release) task_cleanup_workflow = cleanup_workflow(xcom_release) ( task_check_dependencies >> xcom_release >> task_download >> task_transform >> task_bq_load >> task_add_dataset_release >> task_cleanup_workflow ) return scopus()