academic_observatory_workflows.scopus_telescope.tasks ===================================================== .. py:module:: academic_observatory_workflows.scopus_telescope.tasks Classes ------- .. autoapisummary:: academic_observatory_workflows.scopus_telescope.tasks.ScopusClientThrottleLimits academic_observatory_workflows.scopus_telescope.tasks.ScopusClient academic_observatory_workflows.scopus_telescope.tasks.ScopusUtilWorker academic_observatory_workflows.scopus_telescope.tasks.ScopusUtility academic_observatory_workflows.scopus_telescope.tasks.ScopusJsonParser Functions --------- .. autoapisummary:: academic_observatory_workflows.scopus_telescope.tasks.fetch_release academic_observatory_workflows.scopus_telescope.tasks.download academic_observatory_workflows.scopus_telescope.tasks.transform academic_observatory_workflows.scopus_telescope.tasks.bq_load academic_observatory_workflows.scopus_telescope.tasks.add_dataset_release academic_observatory_workflows.scopus_telescope.tasks.cleanup_workflow academic_observatory_workflows.scopus_telescope.tasks.transform_to_db_format Module Contents --------------- .. py:function:: fetch_release(dag_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, run_id: str, data_interval_start: str, data_interval_end: str) -> dict Fetch the release .. py:function:: download(release: dict, earliest_date: pendulum.DateTime, scopus_conn_ids: List[str], view: Literal['standard', 'complete'], institution_ids: List[str]) Download snapshot from SCOPUS for the given institution. .. py:function:: transform(release: dict, institution_ids: List[str]) Transform the data into database format. .. py:function:: bq_load(release: dict, bq_dataset_id: str, dataset_description: str, bq_table_name: str, table_description: str, schema_folder: str) Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension. .. py:function:: add_dataset_release(release: dict, api_bq_dataset_id: str) -> None Adds release information to API. .. py:function:: cleanup_workflow(release: dict) -> None Delete all files, folders and XComs associated with this release. .. py:function:: transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) -> List[dict] Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries. .. py:class:: ScopusClientThrottleLimits API throttling constants for ScopusClient. .. py:attribute:: CALL_LIMIT :value: 2 .. py:attribute:: CALL_PERIOD :value: 1 .. py:class:: ScopusClient(*, api_key: str, view: str = 'standard') Handles URL fetching of SCOPUS search. .. py:attribute:: RESULTS_PER_PAGE :value: 25 .. py:attribute:: MAX_RESULTS :value: 5000 .. py:attribute:: QUOTA_EXCEED_ERROR_PREFIX :value: 'QuotaExceeded. Resets at: ' .. py:attribute:: _headers .. py:attribute:: _view :value: 'standard' .. py:method:: _url(query: str) -> str Get the query url. :param query: Query string. :return: Query url. .. py:method:: get_reset_date_from_error(msg: str) -> int :staticmethod: Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020. :param msg: exception message. :return: Reset date timestamp in seconds. .. py:method:: get_next_page_url(links: List[dict]) -> Union[None, str] :staticmethod: Get the URL for the next result page. :param links: The list of links returned from the last query. :return None if next page not found, otherwise string to next page's url. .. py:method:: retrieve(query: str) -> Tuple[List[Dict[str, Any]], int, int] Execute the query. :param query: Query string. :return: (results of query, quota remaining, quota reset date timestamp in seconds) .. py:class:: ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int) Worker class .. py:attribute:: DEFAULT_KEY_QUOTA :value: 20000 .. py:attribute:: QUEUE_WAIT_TIME :value: 20 .. py:attribute:: client_id .. py:attribute:: client .. py:attribute:: quota_reset_date .. py:attribute:: quota_remaining .. py:class:: ScopusUtility Handles the SCOPUS interactions. .. py:method:: build_query(*, institution_ids: List[str], period: pendulum.Interval) -> str :staticmethod: Build a SCOPUS API query. :param institution_ids: List of Institutional ID to query, e.g, ["60031226"] (Curtin University) :param period: A schedule period. :return: Constructed web query. .. py:method:: download_period(*, worker: ScopusUtilWorker, conn: str, period: pendulum.Interval, institution_ids: List[str], download_dir: str) :staticmethod: Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem. :param worker: Worker that will do the downloading. :param conn: Connection ID from Airflow (minus scopus_) :param period: Period to download. :param institution_ids: List of institutions to query concurrently. :param download_dir: Path to save downloaded files to. .. py:method:: sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str) :staticmethod: Sleep until reset_date. :param reset_date: Date(time) to sleep to. :param conn: Connection id from Airflow. .. py:method:: update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker) :staticmethod: Update the reset date to closest date that will make a worker available. :param conn: Airflow connection ID. :param error_msg: Error message from quota exceeded exception. :param worker: Worker that will do the downloading. .. py:method:: clear_task_queue(queue: queue.Queue) :staticmethod: Clear a queue. :param queue: Queue to clear. .. py:method:: download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str) :staticmethod: Download worker method used by parallel downloader. :param worker: worker to use. :param exit_event: exit event to monitor. :param taskq: tasks queue. :param conn: Airflow connection ID. :param institution_ids: List of institutions to query concurrently. :param download_dir: Path to save downloaded files to. .. py:method:: download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str) :staticmethod: Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it's free so there's no guarantee of load balance. :param workers: List of workers available. :param taskq: tasks queue. :param conn: Airflow connection ID. :param institution_ids: List of institutions to query concurrently. :param download_dir: Path to save downloaded files to. .. py:method:: make_query(*, worker: ScopusUtilWorker, query: str) -> Tuple[str, int] :staticmethod: Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class. Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks. :param worker: ScopusUtilWorker object. :param query: Query object. :returns: Query results. .. py:class:: ScopusJsonParser Helper methods to process the json from SCOPUS into desired structure. .. py:method:: get_affiliations(data: Dict[str, Any]) -> Union[None, List[Dict[str, Any]]] :staticmethod: Get the affiliation field. :param data: json response from SCOPUS. :return list of affiliation details. .. py:method:: get_authors(data: Dict[str, Any]) -> Union[None, List[Dict[str, Any]]] :staticmethod: Get the author field. Won't know if this parser is going to throw error unless we get access to api key with complete view access. :param data: json response from SCOPUS. :return list of authors' details. .. py:method:: get_identifier_list(data: dict, id_type: str) -> Union[None, List[str]] :staticmethod: Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case. :param data: json response from SCOPUS. :param id_type: type of identifier, e.g., 'isbn' :return: List of identifiers. .. py:method:: parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) -> dict :staticmethod: Turn json data into db schema format. :param data: json response from SCOPUS. :param harvest_datetime: isoformat string of time the fetch took place. :param snapshot_date: DAG execution date. :param institution_ids: List of institution ids used in the query. :return: dict of data in right field format.