academic_observatory_workflows.workflows.crossref_metadata_telescope

Module Contents

Classes

CrossrefMetadataRelease

CrossrefMetadataTelescope

The Crossref Metadata Telescope

Functions

make_snapshot_url(→ str)

check_release_exists(→ bool)

Check if a release exists.

transform_file(input_file_path, output_file_path)

Transform a single Crossref Metadata json file.

transform_item(item)

Transform a single Crossref Metadata JSON value.

Attributes

SNAPSHOT_URL

academic_observatory_workflows.workflows.crossref_metadata_telescope.SNAPSHOT_URL = 'https://api.crossref.org/snapshots/monthly/{year}/{month:02d}/all.json.tar.gz'[source]
academic_observatory_workflows.workflows.crossref_metadata_telescope.make_snapshot_url(snapshot_date: pendulum.DateTime) str[source]
class academic_observatory_workflows.workflows.crossref_metadata_telescope.CrossrefMetadataRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

class academic_observatory_workflows.workflows.crossref_metadata_telescope.CrossrefMetadataTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'crossref_metadata', bq_table_name: str = 'crossref_metadata', api_dataset_id: str = 'crossref_metadata', schema_folder: str = os.path.join(default_schema_folder(), 'crossref_metadata'), dataset_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', table_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', crossref_metadata_conn_id: str = 'crossref_metadata', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, max_processes: int = os.cpu_count(), batch_size: int = 20, start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7), schedule: str = '0 0 7 * *', catchup: bool = True, queue: str = 'remote_queue', max_active_runs: int = 1)[source]

Bases: observatory.platform.workflows.workflow.Workflow

The Crossref Metadata Telescope

Saved to the BigQuery table: <project_id>.crossref.crossref_metadataYYYYMMDD

property api_key[source]

Return API token

make_release(**kwargs) CrossrefMetadataRelease[source]

Make release instances. The release is passed as an argument to the function (TelescopeFunction) that is called in ‘task_callable’.

Parameters:

kwargs – the context passed from the PythonOperator. See

https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: a list of CrossrefMetadataRelease instances.

check_release_exists(**kwargs)[source]

Check that the release for this month exists.

download(release: CrossrefMetadataRelease, **kwargs)[source]

Task to download the CrossrefMetadataRelease release for a given month.

upload_downloaded(release: CrossrefMetadataRelease, **kwargs)[source]

Upload data to Cloud Storage.

transform(release: CrossrefMetadataRelease, **kwargs)[source]

Task to transform the CrossrefMetadataRelease release for a given month. Each extracted file is transformed.

upload_transformed(release: CrossrefMetadataRelease, **kwargs) None[source]

Upload the transformed data to Cloud Storage.

bq_load(release: CrossrefMetadataRelease, **kwargs)[source]

Task to load each transformed release to BigQuery. The table_id is set to the file name without the extension.

add_new_dataset_releases(release: CrossrefMetadataRelease, **kwargs) None[source]

Adds release information to API.

cleanup(release: CrossrefMetadataRelease, **kwargs) None[source]

Delete all files, folders and XComs associated with this release.

Parameters:
  • release – the release instance.

  • kwargs – the context passed from the Airflow Operator.

See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return: None.

academic_observatory_workflows.workflows.crossref_metadata_telescope.check_release_exists(month: pendulum.DateTime, api_key: str) bool[source]

Check if a release exists.

Parameters:
  • month – the month of the release given as a datetime.

  • api_key – the Crossref Metadata API key.

Returns:

if release exists or not.

academic_observatory_workflows.workflows.crossref_metadata_telescope.transform_file(input_file_path: str, output_file_path: str)[source]

Transform a single Crossref Metadata json file. The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.

Parameters:
  • input_file_path – the path of the file to transform.

  • output_file_path – where to save the transformed file.

Returns:

None.

academic_observatory_workflows.workflows.crossref_metadata_telescope.transform_item(item)[source]

Transform a single Crossref Metadata JSON value.

Parameters:

item – a JSON value.

Returns:

the transformed item.