academic_observatory_workflows.scopus_telescope.tasks
Classes
API throttling constants for ScopusClient. |
|
Handles URL fetching of SCOPUS search. |
|
Worker class |
|
Handles the SCOPUS interactions. |
|
Helper methods to process the json from SCOPUS into desired structure. |
Functions
|
Fetch the release |
|
Download snapshot from SCOPUS for the given institution. |
|
Transform the data into database format. |
|
Task to load each transformed release to BigQuery. |
|
Adds release information to API. |
|
Delete all files, folders and XComs associated with this release. |
|
Convert the json response to the expected schema. |
Module Contents
- academic_observatory_workflows.scopus_telescope.tasks.fetch_release(dag_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, run_id: str, data_interval_start: str, data_interval_end: str) dict[source]
Fetch the release
- academic_observatory_workflows.scopus_telescope.tasks.download(release: dict, earliest_date: pendulum.DateTime, scopus_conn_ids: List[str], view: Literal['standard', 'complete'], institution_ids: List[str])[source]
Download snapshot from SCOPUS for the given institution.
- academic_observatory_workflows.scopus_telescope.tasks.transform(release: dict, institution_ids: List[str])[source]
Transform the data into database format.
- academic_observatory_workflows.scopus_telescope.tasks.bq_load(release: dict, bq_dataset_id: str, dataset_description: str, bq_table_name: str, table_description: str, schema_folder: str)[source]
Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.
- academic_observatory_workflows.scopus_telescope.tasks.add_dataset_release(release: dict, api_bq_dataset_id: str) None[source]
Adds release information to API.
- academic_observatory_workflows.scopus_telescope.tasks.cleanup_workflow(release: dict) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.scopus_telescope.tasks.transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) List[dict][source]
Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries.
- class academic_observatory_workflows.scopus_telescope.tasks.ScopusClientThrottleLimits[source]
API throttling constants for ScopusClient.
- class academic_observatory_workflows.scopus_telescope.tasks.ScopusClient(*, api_key: str, view: str = 'standard')[source]
Handles URL fetching of SCOPUS search.
- _url(query: str) str[source]
Get the query url.
- Parameters:
query – Query string.
- Returns:
Query url.
- static get_reset_date_from_error(msg: str) int[source]
Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020.
- Parameters:
msg – exception message.
- Returns:
Reset date timestamp in seconds.
- class academic_observatory_workflows.scopus_telescope.tasks.ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int)[source]
Worker class
- class academic_observatory_workflows.scopus_telescope.tasks.ScopusUtility[source]
Handles the SCOPUS interactions.
- static build_query(*, institution_ids: List[str], period: pendulum.Interval) str[source]
Build a SCOPUS API query.
- Parameters:
institution_ids – List of Institutional ID to query, e.g, [“60031226”] (Curtin University)
period – A schedule period.
- Returns:
Constructed web query.
- static download_period(*, worker: ScopusUtilWorker, conn: str, period: pendulum.Interval, institution_ids: List[str], download_dir: str)[source]
Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem.
- Parameters:
worker – Worker that will do the downloading.
conn – Connection ID from Airflow (minus scopus_)
period – Period to download.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str)[source]
Sleep until reset_date.
- Parameters:
reset_date – Date(time) to sleep to.
conn – Connection id from Airflow.
- static update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker)[source]
Update the reset date to closest date that will make a worker available.
- Parameters:
conn – Airflow connection ID.
error_msg – Error message from quota exceeded exception.
worker – Worker that will do the downloading.
- static clear_task_queue(queue: queue.Queue)[source]
Clear a queue.
- Parameters:
queue – Queue to clear.
- static download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download worker method used by parallel downloader.
- Parameters:
worker – worker to use.
exit_event – exit event to monitor.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it’s free so there’s no guarantee of load balance.
- Parameters:
workers – List of workers available.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static make_query(*, worker: ScopusUtilWorker, query: str) Tuple[str, int][source]
Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class.
Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks.
- Parameters:
worker – ScopusUtilWorker object.
query – Query object.
- Returns:
Query results.
- class academic_observatory_workflows.scopus_telescope.tasks.ScopusJsonParser[source]
Helper methods to process the json from SCOPUS into desired structure.
- static get_affiliations(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
Get the affiliation field.
- Parameters:
data – json response from SCOPUS.
:return list of affiliation details.
- static get_authors(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
- Get the author field. Won’t know if this parser is going to throw error unless we get access to api key
with complete view access.
- Parameters:
data – json response from SCOPUS.
:return list of authors’ details.
- static get_identifier_list(data: dict, id_type: str) None | List[str][source]
Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case.
- Parameters:
data – json response from SCOPUS.
id_type – type of identifier, e.g., ‘isbn’
- Returns:
List of identifiers.
- static parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) dict[source]
Turn json data into db schema format.
- Parameters:
data – json response from SCOPUS.
harvest_datetime – isoformat string of time the fetch took place.
snapshot_date – DAG execution date.
institution_ids – List of institution ids used in the query.
- Returns:
dict of data in right field format.