academic_observatory_workflows.scopus_telescope.scopus_telescope

Module Contents

Classes

ScopusRelease

ScopusClientThrottleLimits

API throttling constants for ScopusClient.

ScopusClient

Handles URL fetching of SCOPUS search.

ScopusUtilWorker

Worker class

ScopusUtility

Handles the SCOPUS interactions.

ScopusJsonParser

Helper methods to process the json from SCOPUS into desired structure.

Functions

create_dag(*, dag_id, cloud_workspace, ...[, view, ...])

Scopus telescope.

transform_to_db_format(→ List[dict])

Convert the json response to the expected schema.

class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

property transform_blob_name[source]
static from_dict(dict_: dict)[source]
to_dict() dict[source]
academic_observatory_workflows.scopus_telescope.scopus_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, institution_ids: List[str], scopus_conn_ids: List[str], view: str = 'STANDARD', earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1), bq_dataset_id: str = 'scopus', bq_table_name: str = 'scopus', api_dataset_id: str = 'scopus', schema_folder: str = project_path('scopus_telescope', 'schema'), dataset_description: str = 'The Scopus citation database: https://www.scopus.com', table_description: str = 'The Scopus citation database: https://www.scopus.com', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), schedule: str = '@monthly', max_active_runs: int = 1, retries: int = 3)[source]

Scopus telescope. :param dag_id: the id of the DAG. :param cloud_workspace: the cloud workspace settings. :param institution_ids: list of institution IDs to use for the Scopus search query. :param scopus_conn_ids: list of Scopus Airflow Connection IDs. :param view: The view type. Standard or complete. See https://dev.elsevier.com/sc_search_views.html :param earliest_date: earliest date to query for results. :param bq_dataset_id: the BigQuery dataset id. :param bq_table_name: the BigQuery table name. :param api_dataset_id: the Dataset ID to use when storing releases. :param schema_folder: the SQL schema path. :param dataset_description: description for the BigQuery dataset. :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. :param schedule: the schedule interval of the DAG. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task.

academic_observatory_workflows.scopus_telescope.scopus_telescope.transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) List[dict][source]

Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries.

class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusClientThrottleLimits[source]

API throttling constants for ScopusClient.

CALL_LIMIT = 2[source]
CALL_PERIOD = 1[source]
class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusClient(*, api_key: str, view: str = 'standard')[source]

Handles URL fetching of SCOPUS search.

RESULTS_PER_PAGE = 25[source]
MAX_RESULTS = 5000[source]
QUOTA_EXCEED_ERROR_PREFIX = 'QuotaExceeded. Resets at: '[source]
_url(query: str) str[source]

Get the query url.

Parameters:

query – Query string.

Returns:

Query url.

static get_reset_date_from_error(msg: str) int[source]

Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020.

Parameters:

msg – exception message.

Returns:

Reset date timestamp in seconds.

static get_next_page_url(links: List[dict]) None | str[source]

Get the URL for the next result page.

Parameters:

links – The list of links returned from the last query.

:return None if next page not found, otherwise string to next page’s url.

retrieve(query: str) Tuple[List[Dict[str, Any]], int, int][source]

Execute the query.

Parameters:

query – Query string.

Returns:

(results of query, quota remaining, quota reset date timestamp in seconds)

class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int)[source]

Worker class

DEFAULT_KEY_QUOTA = 20000[source]
QUEUE_WAIT_TIME = 20[source]
class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusUtility[source]

Handles the SCOPUS interactions.

static build_query(*, institution_ids: List[str], period: Type[pendulum.Period]) str[source]

Build a SCOPUS API query.

Parameters:
  • institution_ids – List of Institutional ID to query, e.g, [“60031226”] (Curtin University)

  • period – A schedule period.

Returns:

Constructed web query.

static download_period(*, worker: ScopusUtilWorker, conn: str, period: Type[pendulum.Period], institution_ids: List[str], download_dir: str)[source]

Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem.

Parameters:
  • worker – Worker that will do the downloading.

  • conn – Connection ID from Airflow (minus scopus_)

  • period – Period to download.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str)[source]

Sleep until reset_date.

Parameters:
  • reset_date – Date(time) to sleep to.

  • conn – Connection id from Airflow.

static update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker)[source]

Update the reset date to closest date that will make a worker available.

Parameters:
  • conn – Airflow connection ID.

  • error_msg – Error message from quota exceeded exception.

  • worker – Worker that will do the downloading.

static clear_task_queue(queue: queue.Queue)[source]

Clear a queue.

Parameters:

queue – Queue to clear.

static download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]

Download worker method used by parallel downloader.

Parameters:
  • worker – worker to use.

  • exit_event – exit event to monitor.

  • taskq – tasks queue.

  • conn – Airflow connection ID.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]

Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it’s free so there’s no guarantee of load balance.

Parameters:
  • workers – List of workers available.

  • taskq – tasks queue.

  • conn – Airflow connection ID.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static make_query(*, worker: ScopusUtilWorker, query: str) Tuple[str, int][source]

Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class.

Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks.

Parameters:
  • worker – ScopusUtilWorker object.

  • query – Query object.

Returns:

Query results.

class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusJsonParser[source]

Helper methods to process the json from SCOPUS into desired structure.

static get_affiliations(data: Dict[str, Any]) None | List[Dict[str, Any]][source]

Get the affiliation field.

Parameters:

data – json response from SCOPUS.

:return list of affiliation details.

static get_authors(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
Get the author field. Won’t know if this parser is going to throw error unless we get access to api key

with complete view access.

Parameters:

data – json response from SCOPUS.

:return list of authors’ details.

static get_identifier_list(data: dict, id_type: str) None | List[str][source]

Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case.

Parameters:
  • data – json response from SCOPUS.

  • id_type – type of identifier, e.g., ‘isbn’

Returns:

List of identifiers.

static parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) dict[source]

Turn json data into db schema format.

Parameters:
  • data – json response from SCOPUS.

  • harvest_datetime – isoformat string of time the fetch took place.

  • snapshot_date – DAG execution date.

  • institution_ids – List of institution ids used in the query.

Returns:

dict of data in right field format.