academic_observatory_workflows.workflows.crossref_events_telescope
Module Contents
Classes
Crossref Events telescope |
|
Functions
|
|
|
|
|
|
|
Create the upsert and delete URLs. The interval for dates is [start_date, end_date). On the first run only |
|
Download one day of events. When the download finished successfully, the generated cursor file is deleted. |
|
Fetch the events for an EventsRequest from the given url until no new cursor is returned or a RetryError occurs. |
|
Function to throttle the calls to the Crossref Events API |
|
Transform one day of events. |
|
Transform the dictionary with event data by replacing '-' with '_' in key names, converting all int values to |
Attributes
- academic_observatory_workflows.workflows.crossref_events_telescope.CROSSREF_EVENTS_HOST = 'https://api.eventdata.crossref.org/v1/events'[source]
- academic_observatory_workflows.workflows.crossref_events_telescope.DATE_FORMAT = 'YYYY-MM-DD'[source]
- class academic_observatory_workflows.workflows.crossref_events_telescope.CrossrefEventsRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_end_date: pendulum.DateTime, is_first_run: bool, mailto: str)[source]
Bases:
observatory.platform.workflows.workflow.ChangefileRelease
- class academic_observatory_workflows.workflows.crossref_events_telescope.CrossrefEventsTelescope(dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, events_start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 17), bq_dataset_id: str = 'crossref_events', bq_table_name: str = 'crossref_events', api_dataset_id: str = 'crossref_events', schema_folder: str = os.path.join(default_schema_folder(), 'crossref_events'), dataset_description: str = 'The Crossref Events dataset: https://www.eventdata.crossref.org/guide/', table_description: str = 'The Crossref Events dataset: https://www.eventdata.crossref.org/guide/', snapshot_expiry_days: int = 31, n_rows: int = 1000, max_threads: int = min(32, os.cpu_count() + 4), max_processes: int = os.cpu_count(), mailto: str = metadata('academic_observatory_workflows').get('Author-email'), primary_key: str = 'id', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 12), schedule: str = '@weekly', queue: str = 'remote_queue')[source]
Bases:
observatory.platform.workflows.workflow.WorkflowCrossref Events telescope
- fetch_releases(**kwargs)[source]
Return release information with the start and end date. [start date, end date) Includes start date, excludes end date.
- param kwargs:
the context passed from the Airflow Operator.
See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: True to continue, False to skip.
- make_release(**kwargs) CrossrefEventsRelease[source]
Make a Release instance
- create_datasets(release: CrossrefEventsRelease, **kwargs) None[source]
Create datasets.
- bq_create_main_table_snapshot(release: CrossrefEventsRelease, **kwargs)[source]
Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.
- download(release: CrossrefEventsRelease, **kwargs)[source]
Task to download the CrossrefEventsRelease release.
- upload_downloaded(release: CrossrefEventsRelease, **kwargs)[source]
Upload the downloaded files for the given release.
- transform_snapshot(release: CrossrefEventsRelease, **kwargs)[source]
Transforms the create files on the first run.
- transform_created_edited(release: CrossrefEventsRelease, **kwargs)[source]
Transforms the created and edited files on subsequent runs
- transform_deleted(release: CrossrefEventsRelease, **kwargs)[source]
Transform the deleted files into a file that just contains the primary key of each entry
- upload_transformed(release: CrossrefEventsRelease, **kwargs) None[source]
Upload the transformed data to Cloud Storage.
- bq_load_main_table(release: CrossrefEventsRelease, **kwargs)[source]
Load the main table.
- bq_load_upsert_table(release: CrossrefEventsRelease, **kwargs)[source]
Load the upsert table.
- bq_upsert_records(release: CrossrefEventsRelease, **kwargs)[source]
Upsert the records from the upserts table into the main table.
- bq_load_delete_table(release: CrossrefEventsRelease, **kwargs)[source]
Load the delete table.
- bq_delete_records(release: CrossrefEventsRelease, **kwargs)[source]
Delete records from main table that are in delete table.
- add_new_dataset_releases(release: CrossrefEventsRelease, **kwargs) None[source]
Adds release information to API.
- cleanup(release: CrossrefEventsRelease, **kwargs) None[source]
Delete all files, folders and XComs associated with this release.
- academic_observatory_workflows.workflows.crossref_events_telescope.parse_release_msg(msg: Dict) Tuple[pendulum.DateTime, pendulum.DateTime, bool, pendulum.DateTime][source]
- academic_observatory_workflows.workflows.crossref_events_telescope.get_event_date(event) pendulum.DateTime[source]
- class academic_observatory_workflows.workflows.crossref_events_telescope.DayRequest(day: pendulum.DateTime, mailto: str)[source]
- academic_observatory_workflows.workflows.crossref_events_telescope.make_crossref_events_url(*, action: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, mailto: str, rows: int, cursor: str)[source]
- class academic_observatory_workflows.workflows.crossref_events_telescope.EventRequest(action: str, day: pendulum.DateTime, mailto: str)[source]
-
- make_url(rows: int = 1000, cursor: str = None) str[source]
Make a URL for this EventsRequest. See documentation for more details:
Example queries: https://www.eventdata.crossref.org/guide/service/query-api/#example-queries Edited events: See documentation for more details: https://www.eventdata.crossref.org/guide/service/query-api/#edited-events Deleted events: https://www.eventdata.crossref.org/guide/service/query-api/#deleted-events
- Parameters:
rows – the number of rows.
cursor – the cursor.
- Returns:
the URL.
- academic_observatory_workflows.workflows.crossref_events_telescope.make_day_requests(start_date: pendulum.DateTime, end_date: pendulum.DateTime, mailto: str) List[DayRequest][source]
Create the upsert and delete URLs. The interval for dates is [start_date, end_date). On the first run only create events are returned in the upsert_events list, on subsequent runs, create and edit event URLs are populated into upsert_events and delete events in delete_events.
- Parameters:
start_date – the start date. Inclusive.
end_date – the end date. Exclusive.
mailto – the mailto email address.
- Returns:
the URLs.
- academic_observatory_workflows.workflows.crossref_events_telescope.download_events(request: EventRequest, download_folder: str, n_rows: int)[source]
Download one day of events. When the download finished successfully, the generated cursor file is deleted. If there is a cursor file available at the start, it means that a previous download attempt failed. If there is an events file available and no cursor file, it means that a previous download attempt was successful, so these events will not be downloaded again.
- Parameters:
request – the event we will fetch.
download_folder – the folder we will download the data into.
n_rows – the number of rows per page.
- Returns:
None.
- academic_observatory_workflows.workflows.crossref_events_telescope.fetch_events(request: EventRequest, cursor: str = None, n_rows: int = 1000) Tuple[Dict, str][source]
Fetch the events for an EventsRequest from the given url until no new cursor is returned or a RetryError occurs. The extracted events are appended to a jsonl file and the cursors are written to a text file.
- Parameters:
request – the url.
cursor – the cursor.
n_rows – number of rows per page.
- Returns:
the fetched events and the next_cursor.
- academic_observatory_workflows.workflows.crossref_events_telescope.crossref_events_limiter(calls_per_second: int = 10)[source]
Function to throttle the calls to the Crossref Events API
- academic_observatory_workflows.workflows.crossref_events_telescope.transform_events(download_path: str, transform_folder: str)[source]
Transform one day of events.
- Parameters:
download_path – The path to the downloaded file.
transform_folder – the transform folder.
- Returns:
None.
- academic_observatory_workflows.workflows.crossref_events_telescope.transform_event(event: Dict)[source]
Transform the dictionary with event data by replacing ‘-’ with ‘_’ in key names, converting all int values to string except for the ‘total’ field and parsing datetime columns for a valid datetime.
- Parameters:
event – The event dictionary.
- Returns:
The updated event dictionary.