academic_observatory_workflows.workflows.scopus_telescope

Module Contents

Classes

ScopusRelease

ScopusTelescope

ScopusClientThrottleLimits

API throttling constants for ScopusClient.

ScopusClient

Handles URL fetching of SCOPUS search.

ScopusUtilWorker

Worker class

ScopusUtility

Handles the SCOPUS interactions.

ScopusJsonParser

Helper methods to process the json from SCOPUS into desired structure.

Functions

transform_to_db_format(→ List[dict])

Convert the json response to the expected schema.

class academic_observatory_workflows.workflows.scopus_telescope.ScopusRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

class academic_observatory_workflows.workflows.scopus_telescope.ScopusTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, institution_ids: List[str], scopus_conn_ids: List[str], view: str = 'STANDARD', earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1), bq_dataset_id: str = 'scopus', bq_table_name: str = 'scopus', api_dataset_id: str = 'scopus', schema_folder: str = os.path.join(default_schema_folder(), 'scopus'), dataset_description: str = 'The Scopus citation database: https://www.scopus.com', table_description: str = 'The Scopus citation database: https://www.scopus.com', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), schedule: str = '@monthly')[source]

Bases: observatory.platform.workflows.workflow.Workflow

property api_keys: List[str][source]

Get the API keys to use for downloading SCOPUS data.

Returns:

List of API keys to use.

SCHEMA_VERSION_ALT = 'http://www.elsevier.com/xml/ani/ani515.xsd'[source]
make_release(**kwargs) ScopusRelease[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.

Parameters:

kwargs – the context passed from the PythonOperator. See

https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: a list of GeonamesRelease instances.

download(release: ScopusRelease, **kwargs)[source]

Download snapshot from SCOPUS for the given institution.

upload_downloaded(release: ScopusRelease, **kwargs)[source]

Upload data to Cloud Storage.

transform(release: ScopusRelease, **kwargs)[source]

Transform the data into database format.

upload_transformed(release: ScopusRelease, **kwargs) None[source]

Upload the transformed data to Cloud Storage.

bq_load(release: ScopusRelease, **kwargs)[source]

Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.

add_new_dataset_releases(release: ScopusRelease, **kwargs) None[source]

Adds release information to API.

cleanup(release: ScopusRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.workflows.scopus_telescope.transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) List[dict][source]

Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries.

class academic_observatory_workflows.workflows.scopus_telescope.ScopusClientThrottleLimits[source]

API throttling constants for ScopusClient.

CALL_LIMIT = 2[source]
CALL_PERIOD = 1[source]
class academic_observatory_workflows.workflows.scopus_telescope.ScopusClient(*, api_key: str, view: str = 'standard')[source]

Handles URL fetching of SCOPUS search.

RESULTS_PER_PAGE = 25[source]
MAX_RESULTS = 5000[source]
QUOTA_EXCEED_ERROR_PREFIX = 'QuotaExceeded. Resets at: '[source]
_url(query: str) str[source]

Get the query url.

Parameters:

query – Query string.

Returns:

Query url.

static get_reset_date_from_error(msg: str) int[source]

Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020.

Parameters:

msg – exception message.

Returns:

Reset date timestamp in seconds.

static get_next_page_url(links: List[dict]) None | str[source]

Get the URL for the next result page.

Parameters:

links – The list of links returned from the last query.

:return None if next page not found, otherwise string to next page’s url.

retrieve(query: str) Tuple[List[Dict[str, Any]], int, int][source]

Execute the query.

Parameters:

query – Query string.

Returns:

(results of query, quota remaining, quota reset date timestamp in seconds)

class academic_observatory_workflows.workflows.scopus_telescope.ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int)[source]

Worker class

DEFAULT_KEY_QUOTA = 20000[source]
QUEUE_WAIT_TIME = 20[source]
class academic_observatory_workflows.workflows.scopus_telescope.ScopusUtility[source]

Handles the SCOPUS interactions.

static build_query(*, institution_ids: List[str], period: Type[pendulum.Period]) str[source]

Build a SCOPUS API query.

Parameters:
  • institution_ids – List of Institutional ID to query, e.g, [“60031226”] (Curtin University)

  • period – A schedule period.

Returns:

Constructed web query.

static download_period(*, worker: ScopusUtilWorker, conn: str, period: Type[pendulum.Period], institution_ids: List[str], download_dir: str)[source]

Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem.

Parameters:
  • worker – Worker that will do the downloading.

  • conn – Connection ID from Airflow (minus scopus_)

  • period – Period to download.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str)[source]

Sleep until reset_date.

Parameters:
  • reset_date – Date(time) to sleep to.

  • conn – Connection id from Airflow.

static update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker)[source]

Update the reset date to closest date that will make a worker available.

Parameters:
  • conn – Airflow connection ID.

  • error_msg – Error message from quota exceeded exception.

  • worker – Worker that will do the downloading.

static clear_task_queue(queue: queue.Queue)[source]

Clear a queue.

Parameters:

queue – Queue to clear.

static download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]

Download worker method used by parallel downloader.

Parameters:
  • worker – worker to use.

  • exit_event – exit event to monitor.

  • taskq – tasks queue.

  • conn – Airflow connection ID.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]

Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it’s free so there’s no guarantee of load balance.

Parameters:
  • workers – List of workers available.

  • taskq – tasks queue.

  • conn – Airflow connection ID.

  • institution_ids – List of institutions to query concurrently.

  • download_dir – Path to save downloaded files to.

static make_query(*, worker: ScopusUtilWorker, query: str) Tuple[str, int][source]

Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class.

Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks.

Parameters:
  • worker – ScopusUtilWorker object.

  • query – Query object.

Returns:

Query results.

class academic_observatory_workflows.workflows.scopus_telescope.ScopusJsonParser[source]

Helper methods to process the json from SCOPUS into desired structure.

static get_affiliations(data: Dict[str, Any]) None | List[Dict[str, Any]][source]

Get the affiliation field.

Parameters:

data – json response from SCOPUS.

:return list of affiliation details.

static get_authors(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
Get the author field. Won’t know if this parser is going to throw error unless we get access to api key

with complete view access.

Parameters:

data – json response from SCOPUS.

:return list of authors’ details.

static get_identifier_list(data: dict, id_type: str) None | List[str][source]

Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case.

Parameters:
  • data – json response from SCOPUS.

  • id_type – type of identifier, e.g., ‘isbn’

Returns:

List of identifiers.

static parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) dict[source]

Turn json data into db schema format.

Parameters:
  • data – json response from SCOPUS.

  • harvest_datetime – isoformat string of time the fetch took place.

  • snapshot_date – DAG execution date.

  • institution_ids – List of institution ids used in the query.

Returns:

dict of data in right field format.