academic_observatory_workflows.workflows.crossref_events_telescope

Module Contents

Classes

CrossrefEventsRelease

CrossrefEventsTelescope

Crossref Events telescope

Action

DayRequest

EventRequest

Functions

parse_release_msg(→ Tuple[pendulum.DateTime, ...)

get_event_date(→ pendulum.DateTime)

make_crossref_events_url(*, action, start_date, ...)

make_day_requests(→ List[DayRequest])

Create the upsert and delete URLs. The interval for dates is [start_date, end_date). On the first run only

download_events(request, download_folder, n_rows)

Download one day of events. When the download finished successfully, the generated cursor file is deleted.

fetch_events(→ Tuple[Dict, str])

Fetch the events for an EventsRequest from the given url until no new cursor is returned or a RetryError occurs.

crossref_events_limiter([calls_per_second])

Function to throttle the calls to the Crossref Events API

transform_events(download_path, transform_folder)

Transform one day of events.

transform_event(event)

Transform the dictionary with event data by replacing '-' with '_' in key names, converting all int values to

Attributes

CROSSREF_EVENTS_HOST

DATE_FORMAT

BACKEND

MOVING_WINDOW

academic_observatory_workflows.workflows.crossref_events_telescope.CROSSREF_EVENTS_HOST = 'https://api.eventdata.crossref.org/v1/events'[source]
academic_observatory_workflows.workflows.crossref_events_telescope.DATE_FORMAT = 'YYYY-MM-DD'[source]
academic_observatory_workflows.workflows.crossref_events_telescope.BACKEND[source]
academic_observatory_workflows.workflows.crossref_events_telescope.MOVING_WINDOW[source]
class academic_observatory_workflows.workflows.crossref_events_telescope.CrossrefEventsRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str, bq_table_name: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, prev_end_date: pendulum.DateTime, is_first_run: bool, mailto: str)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property created_files[source]
property edited_files[source]
property deleted_files[source]
property has_created[source]
property has_edited[source]
property has_deleted[source]
class academic_observatory_workflows.workflows.crossref_events_telescope.CrossrefEventsTelescope(dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, events_start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 17), bq_dataset_id: str = 'crossref_events', bq_table_name: str = 'crossref_events', api_dataset_id: str = 'crossref_events', schema_folder: str = os.path.join(default_schema_folder(), 'crossref_events'), dataset_description: str = 'The Crossref Events dataset: https://www.eventdata.crossref.org/guide/', table_description: str = 'The Crossref Events dataset: https://www.eventdata.crossref.org/guide/', snapshot_expiry_days: int = 31, n_rows: int = 1000, max_threads: int = min(32, os.cpu_count() + 4), max_processes: int = os.cpu_count(), mailto: str = metadata('academic_observatory_workflows').get('Author-email'), primary_key: str = 'id', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 12), schedule: str = '@weekly', queue: str = 'remote_queue')[source]

Bases: observatory.platform.workflows.workflow.Workflow

Crossref Events telescope

fetch_releases(**kwargs)[source]

Return release information with the start and end date. [start date, end date) Includes start date, excludes end date.

param kwargs:

the context passed from the Airflow Operator.

See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: True to continue, False to skip.

make_release(**kwargs) CrossrefEventsRelease[source]

Make a Release instance

create_datasets(release: CrossrefEventsRelease, **kwargs) None[source]

Create datasets.

bq_create_main_table_snapshot(release: CrossrefEventsRelease, **kwargs)[source]

Create a snapshot of each main table. The purpose of this table is to be able to rollback the table if something goes wrong. The snapshot expires after self.snapshot_expiry_days.

download(release: CrossrefEventsRelease, **kwargs)[source]

Task to download the CrossrefEventsRelease release.

upload_downloaded(release: CrossrefEventsRelease, **kwargs)[source]

Upload the downloaded files for the given release.

transform_snapshot(release: CrossrefEventsRelease, **kwargs)[source]

Transforms the create files on the first run.

transform_created_edited(release: CrossrefEventsRelease, **kwargs)[source]

Transforms the created and edited files on subsequent runs

transform_deleted(release: CrossrefEventsRelease, **kwargs)[source]

Transform the deleted files into a file that just contains the primary key of each entry

upload_transformed(release: CrossrefEventsRelease, **kwargs) None[source]

Upload the transformed data to Cloud Storage.

bq_load_main_table(release: CrossrefEventsRelease, **kwargs)[source]

Load the main table.

bq_load_upsert_table(release: CrossrefEventsRelease, **kwargs)[source]

Load the upsert table.

bq_upsert_records(release: CrossrefEventsRelease, **kwargs)[source]

Upsert the records from the upserts table into the main table.

bq_load_delete_table(release: CrossrefEventsRelease, **kwargs)[source]

Load the delete table.

bq_delete_records(release: CrossrefEventsRelease, **kwargs)[source]

Delete records from main table that are in delete table.

add_new_dataset_releases(release: CrossrefEventsRelease, **kwargs) None[source]

Adds release information to API.

cleanup(release: CrossrefEventsRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

academic_observatory_workflows.workflows.crossref_events_telescope.parse_release_msg(msg: Dict) Tuple[pendulum.DateTime, pendulum.DateTime, bool, pendulum.DateTime][source]
academic_observatory_workflows.workflows.crossref_events_telescope.get_event_date(event) pendulum.DateTime[source]
class academic_observatory_workflows.workflows.crossref_events_telescope.Action[source]
create = 'created'[source]
edit = 'edited'[source]
delete = 'deleted'[source]
class academic_observatory_workflows.workflows.crossref_events_telescope.DayRequest(day: pendulum.DateTime, mailto: str)[source]
academic_observatory_workflows.workflows.crossref_events_telescope.make_crossref_events_url(*, action: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, mailto: str, rows: int, cursor: str)[source]
class academic_observatory_workflows.workflows.crossref_events_telescope.EventRequest(action: str, day: pendulum.DateTime, mailto: str)[source]
property cursor_file_name[source]
property data_file_name[source]
make_url(rows: int = 1000, cursor: str = None) str[source]

Make a URL for this EventsRequest. See documentation for more details:

Example queries: https://www.eventdata.crossref.org/guide/service/query-api/#example-queries Edited events: See documentation for more details: https://www.eventdata.crossref.org/guide/service/query-api/#edited-events Deleted events: https://www.eventdata.crossref.org/guide/service/query-api/#deleted-events

Parameters:
  • rows – the number of rows.

  • cursor – the cursor.

Returns:

the URL.

__str__()[source]

Return str(self).

__repr__()[source]

Return repr(self).

academic_observatory_workflows.workflows.crossref_events_telescope.make_day_requests(start_date: pendulum.DateTime, end_date: pendulum.DateTime, mailto: str) List[DayRequest][source]

Create the upsert and delete URLs. The interval for dates is [start_date, end_date). On the first run only create events are returned in the upsert_events list, on subsequent runs, create and edit event URLs are populated into upsert_events and delete events in delete_events.

Parameters:
  • start_date – the start date. Inclusive.

  • end_date – the end date. Exclusive.

  • mailto – the mailto email address.

Returns:

the URLs.

academic_observatory_workflows.workflows.crossref_events_telescope.download_events(request: EventRequest, download_folder: str, n_rows: int)[source]

Download one day of events. When the download finished successfully, the generated cursor file is deleted. If there is a cursor file available at the start, it means that a previous download attempt failed. If there is an events file available and no cursor file, it means that a previous download attempt was successful, so these events will not be downloaded again.

Parameters:
  • request – the event we will fetch.

  • download_folder – the folder we will download the data into.

  • n_rows – the number of rows per page.

Returns:

None.

academic_observatory_workflows.workflows.crossref_events_telescope.fetch_events(request: EventRequest, cursor: str = None, n_rows: int = 1000) Tuple[Dict, str][source]

Fetch the events for an EventsRequest from the given url until no new cursor is returned or a RetryError occurs. The extracted events are appended to a jsonl file and the cursors are written to a text file.

Parameters:
  • request – the url.

  • cursor – the cursor.

  • n_rows – number of rows per page.

Returns:

the fetched events and the next_cursor.

academic_observatory_workflows.workflows.crossref_events_telescope.crossref_events_limiter(calls_per_second: int = 10)[source]

Function to throttle the calls to the Crossref Events API

academic_observatory_workflows.workflows.crossref_events_telescope.transform_events(download_path: str, transform_folder: str)[source]

Transform one day of events.

Parameters:
  • download_path – The path to the downloaded file.

  • transform_folder – the transform folder.

Returns:

None.

academic_observatory_workflows.workflows.crossref_events_telescope.transform_event(event: Dict)[source]

Transform the dictionary with event data by replacing ‘-’ with ‘_’ in key names, converting all int values to string except for the ‘total’ field and parsing datetime columns for a valid datetime.

Parameters:

event – The event dictionary.

Returns:

The updated event dictionary.