academic_observatory_workflows.workflows.scopus_telescope
Module Contents
Classes
API throttling constants for ScopusClient. |
|
Handles URL fetching of SCOPUS search. |
|
Worker class |
|
Handles the SCOPUS interactions. |
|
Helper methods to process the json from SCOPUS into desired structure. |
Functions
|
Convert the json response to the expected schema. |
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]
Bases:
observatory.platform.workflows.workflow.SnapshotRelease
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, institution_ids: List[str], scopus_conn_ids: List[str], view: str = 'STANDARD', earliest_date: pendulum.DateTime = pendulum.datetime(1800, 1, 1), bq_dataset_id: str = 'scopus', bq_table_name: str = 'scopus', api_dataset_id: str = 'scopus', schema_folder: str = os.path.join(default_schema_folder(), 'scopus'), dataset_description: str = 'The Scopus citation database: https://www.scopus.com', table_description: str = 'The Scopus citation database: https://www.scopus.com', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), schedule: str = '@monthly')[source]
Bases:
observatory.platform.workflows.workflow.Workflow- property api_keys: List[str][source]
Get the API keys to use for downloading SCOPUS data.
- Returns:
List of API keys to use.
- make_release(**kwargs) ScopusRelease[source]
Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.
- Parameters:
kwargs – the context passed from the PythonOperator. See
https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: a list of GeonamesRelease instances.
- download(release: ScopusRelease, **kwargs)[source]
Download snapshot from SCOPUS for the given institution.
- upload_downloaded(release: ScopusRelease, **kwargs)[source]
Upload data to Cloud Storage.
- transform(release: ScopusRelease, **kwargs)[source]
Transform the data into database format.
- upload_transformed(release: ScopusRelease, **kwargs) None[source]
Upload the transformed data to Cloud Storage.
- bq_load(release: ScopusRelease, **kwargs)[source]
Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.
- add_new_dataset_releases(release: ScopusRelease, **kwargs) None[source]
Adds release information to API.
- cleanup(release: ScopusRelease, **kwargs) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.workflows.scopus_telescope.transform_to_db_format(records: List[dict], snapshot_date: pendulum.Date, institution_ids: List[str]) List[dict][source]
Convert the json response to the expected schema. :param records: List of the records as json. :param snapshot_date: release date. :param institution_ids: list of institutions. :return: List of transformed entries.
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusClientThrottleLimits[source]
API throttling constants for ScopusClient.
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusClient(*, api_key: str, view: str = 'standard')[source]
Handles URL fetching of SCOPUS search.
- _url(query: str) str[source]
Get the query url.
- Parameters:
query – Query string.
- Returns:
Query url.
- static get_reset_date_from_error(msg: str) int[source]
Get the reset date timestamp in seconds from the exception message. According to https://dev.elsevier.com/api_key_settings.html it is meant to be seconds, but milliseconds were observed the last time it was checked in Oct 2020.
- Parameters:
msg – exception message.
- Returns:
Reset date timestamp in seconds.
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusUtilWorker(*, client_id: int, client: ScopusClient, quota_reset_date: pendulum.DateTime, quota_remaining: int)[source]
Worker class
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusUtility[source]
Handles the SCOPUS interactions.
- static build_query(*, institution_ids: List[str], period: Type[pendulum.Period]) str[source]
Build a SCOPUS API query.
- Parameters:
institution_ids – List of Institutional ID to query, e.g, [“60031226”] (Curtin University)
period – A schedule period.
- Returns:
Constructed web query.
- static download_period(*, worker: ScopusUtilWorker, conn: str, period: Type[pendulum.Period], institution_ids: List[str], download_dir: str)[source]
Download records for a stated date range. The elsapy package currently has a cap of 5000 results per query. So in the unlikely event any institution has more than 5000 entries per month, this will present a problem.
- Parameters:
worker – Worker that will do the downloading.
conn – Connection ID from Airflow (minus scopus_)
period – Period to download.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static sleep_if_needed(*, reset_date: pendulum.DateTime, conn: str)[source]
Sleep until reset_date.
- Parameters:
reset_date – Date(time) to sleep to.
conn – Connection id from Airflow.
- static update_reset_date(*, conn: str, error_msg: str, worker: ScopusUtilWorker)[source]
Update the reset date to closest date that will make a worker available.
- Parameters:
conn – Airflow connection ID.
error_msg – Error message from quota exceeded exception.
worker – Worker that will do the downloading.
- static clear_task_queue(queue: queue.Queue)[source]
Clear a queue.
- Parameters:
queue – Queue to clear.
- static download_worker(*, worker: ScopusUtilWorker, exit_event: threading.Event, taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download worker method used by parallel downloader.
- Parameters:
worker – worker to use.
exit_event – exit event to monitor.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static download_parallel(*, workers: List[ScopusUtilWorker], taskq: queue.Queue, conn: str, institution_ids: List[str], download_dir: str)[source]
Download SCOPUS snapshot with parallel sessions. Tasks will be distributed in parallel to the available keys. Each key will independently fetch a task from the queue when it’s free so there’s no guarantee of load balance.
- Parameters:
workers – List of workers available.
taskq – tasks queue.
conn – Airflow connection ID.
institution_ids – List of institutions to query concurrently.
download_dir – Path to save downloaded files to.
- static make_query(*, worker: ScopusUtilWorker, query: str) Tuple[str, int][source]
Throttling wrapper for the API call. This is a global limit for this API when called from a program on the same machine. Limits specified in ScopusUtilConst class.
Throttle limits may or may not be enforced. Probably depends on how executors spin up tasks.
- Parameters:
worker – ScopusUtilWorker object.
query – Query object.
- Returns:
Query results.
- class academic_observatory_workflows.workflows.scopus_telescope.ScopusJsonParser[source]
Helper methods to process the json from SCOPUS into desired structure.
- static get_affiliations(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
Get the affiliation field.
- Parameters:
data – json response from SCOPUS.
:return list of affiliation details.
- static get_authors(data: Dict[str, Any]) None | List[Dict[str, Any]][source]
- Get the author field. Won’t know if this parser is going to throw error unless we get access to api key
with complete view access.
- Parameters:
data – json response from SCOPUS.
:return list of authors’ details.
- static get_identifier_list(data: dict, id_type: str) None | List[str][source]
Get the list of document identifiers or null of it does not exist. This string/list behaviour was observed for ISBNs so using it for other identifiers just in case.
- Parameters:
data – json response from SCOPUS.
id_type – type of identifier, e.g., ‘isbn’
- Returns:
List of identifiers.
- static parse_json(*, data: dict, snapshot_date: pendulum.DateTime, institution_ids: List[str]) dict[source]
Turn json data into db schema format.
- Parameters:
data – json response from SCOPUS.
harvest_datetime – isoformat string of time the fetch took place.
snapshot_date – DAG execution date.
institution_ids – List of institution ids used in the query.
- Returns:
dict of data in right field format.