academic_observatory_workflows.scopus_telescope.scopus_telescope
Module Contents
Classes
API throttling constants for ScopusClient. |
|
Handles URL fetching of SCOPUS search. |
|
Worker class |
|
Handles the SCOPUS interactions. |
|
Helper methods to process the json from SCOPUS into desired structure. |
Functions
|
Scopus telescope. |
|
Convert the json response to the expected schema. |
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]
Bases:
observatory.platform.workflows.workflow.SnapshotRelease
- academic_observatory_workflows.scopus_telescope.scopus_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, institution_ids: List[str], scopus_conn_ids: List[str], view: str = 'STANDARD', earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1), bq_dataset_id: str = 'scopus', bq_table_name: str = 'scopus', api_dataset_id: str = 'scopus', schema_folder: str = project_path('scopus_telescope', 'schema'), dataset_description: str = 'The Scopus citation database: https://www.scopus.com', table_description: str = 'The Scopus citation database: https://www.scopus.com', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), schedule: str = '@monthly', max_active_runs: int = 1, retries: int = 3)[source]
Scopus telescope. :param dag_id: the id of the DAG. :param cloud_workspace: the cloud workspace settings. :param institution_ids: list of institution IDs to use for the Scopus search query. :param scopus_conn_ids: list of Scopus Airflow Connection IDs. :param view: The view type. Standard or complete. See https://dev.elsevier.com/sc_search_views.html :param earliest_date: earliest date to query for results. :param bq_dataset_id: the BigQuery dataset id. :param bq_table_name: the BigQuery table name. :param api_dataset_id: the Dataset ID to use when storing releases. :param schema_folder: the SQL schema path. :param dataset_description: description for the BigQuery dataset. :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. :param schedule: the schedule interval of the DAG. :param max_active_runs: the maximum number of DAG runs that can be run at once. :param retries: the number of times to retry a task.
- academic_observatory_workflows.scopus_telescope.scopus_telescope.transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) List[dict] [source]
Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries.
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusClientThrottleLimits[source]
API throttling constants for ScopusClient.
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusClient(*, api_key: str, view: str = 'standard')[source]
Handles URL fetching of SCOPUS search.
- _url(query: str) str [source]
Get the query url.
- Parameters:
query – Query string.
- Returns:
Query url.
- static get_reset_date_from_error(msg: str) int [source]
Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020.
- Parameters:
msg – exception message.
- Returns:
Reset date timestamp in seconds.
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int)[source]
Worker class
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusUtility[source]
Handles the SCOPUS interactions.
- static build_query(*, institution_ids: List[str], period: Type[pendulum.Period]) str [source]
Build a SCOPUS API query.
- Parameters:
institution_ids – List of Institutional ID to query, e.g, [“60031226”] (Curtin University)
period – A schedule period.
- Returns:
Constructed web query.
- static download_period(*, worker: ScopusUtilWorker, conn: str, period: Type[pendulum.Period], institution_ids: List[str], download_dir: str)[source]
Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem.
- Parameters:
worker – Worker that will do the downloading.
conn – Connection ID from Airflow (minus scopus_)
period – Period to download.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str)[source]
Sleep until reset_date.
- Parameters:
reset_date – Date(time) to sleep to.
conn – Connection id from Airflow.
- static update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker)[source]
Update the reset date to closest date that will make a worker available.
- Parameters:
conn – Airflow connection ID.
error_msg – Error message from quota exceeded exception.
worker – Worker that will do the downloading.
- static clear_task_queue(queue: queue.Queue)[source]
Clear a queue.
- Parameters:
queue – Queue to clear.
- static download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download worker method used by parallel downloader.
- Parameters:
worker – worker to use.
exit_event – exit event to monitor.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it’s free so there’s no guarantee of load balance.
- Parameters:
workers – List of workers available.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static make_query(*, worker: ScopusUtilWorker, query: str) Tuple[str, int] [source]
Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class.
Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks.
- Parameters:
worker – ScopusUtilWorker object.
query – Query object.
- Returns:
Query results.
- class academic_observatory_workflows.scopus_telescope.scopus_telescope.ScopusJsonParser[source]
Helper methods to process the json from SCOPUS into desired structure.
- static get_affiliations(data: Dict[str, Any]) None | List[Dict[str, Any]] [source]
Get the affiliation field.
- Parameters:
data – json response from SCOPUS.
:return list of affiliation details.
- static get_authors(data: Dict[str, Any]) None | List[Dict[str, Any]] [source]
- Get the author field. Won’t know if this parser is going to throw error unless we get access to api key
with complete view access.
- Parameters:
data – json response from SCOPUS.
:return list of authors’ details.
- static get_identifier_list(data: dict, id_type: str) None | List[str] [source]
Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case.
- Parameters:
data – json response from SCOPUS.
id_type – type of identifier, e.g., ‘isbn’
- Returns:
List of identifiers.
- static parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) dict [source]
Turn json data into db schema format.
- Parameters:
data – json response from SCOPUS.
harvest_datetime – isoformat string of time the fetch took place.
snapshot_date – DAG execution date.
institution_ids – List of institution ids used in the query.
- Returns:
dict of data in right field format.