academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope

Module Contents

Classes

CrossrefMetadataRelease

Functions

create_dag(, dataset_description, table_description, ...)

The Crossref Metadata telescope

make_snapshot_url(→ str)

get_api_key(crossref_metadata_conn_id)

Return API token

check_release_exists(→ bool)

Check if a release exists.

transform_file(input_file_path, output_file_path)

Transform a single Crossref Metadata json file.

transform_item(item)

Transform a single Crossref Metadata JSON value.

Attributes

SNAPSHOT_URL

academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.SNAPSHOT_URL = 'https://api.crossref.org/snapshots/monthly/{year}/{month:02d}/all.json.tar.gz'[source]
class academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.CrossrefMetadataRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, batch_size: int)[source]

Bases: observatory.platform.workflows.workflow.SnapshotRelease

static from_dict(dict_: dict)[source]
to_dict() dict[source]
academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'crossref_metadata', bq_table_name: str = 'crossref_metadata', api_dataset_id: str = 'crossref_metadata', schema_folder: str = project_path('crossref_metadata_telescope', 'schema'), dataset_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', table_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', crossref_metadata_conn_id: str = 'crossref_metadata', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, max_processes: int = os.cpu_count(), batch_size: int = 20, start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7), schedule: str = '0 0 7 * *', catchup: bool = True, queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

The Crossref Metadata telescope

Parameters:
  • dag_id – the id of the DAG.

  • cloud_workspace – the cloud workspace settings.

  • bq_dataset_id – the BigQuery dataset id.

  • bq_table_name – the BigQuery table name.

  • api_dataset_id – the Dataset ID to use when storing releases.

  • schema_folder – the SQL schema path.

  • dataset_description – description for the BigQuery dataset.

  • table_description – description for the BigQuery table.

  • crossref_metadata_conn_id – the Crossref Metadata Airflow connection key.

  • observatory_api_conn_id – the Observatory API connection key.

  • max_processes – the number of processes used with ProcessPoolExecutor to transform files in parallel.

  • batch_size – the number of files to send to ProcessPoolExecutor at one time.

  • start_date – the start date of the DAG.

  • schedule – the schedule interval of the DAG.

  • catchup – whether to catchup the DAG or not.

  • queue – what Airflow queue this job runs on.

  • max_active_runs – the maximum number of DAG runs that can be run at once.

  • retries – the number of times to retry a task.

academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.make_snapshot_url(snapshot_date: pendulum.DateTime) str[source]
academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.get_api_key(crossref_metadata_conn_id: str)[source]

Return API token

academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.check_release_exists(month: pendulum.DateTime, api_key: str) bool[source]

Check if a release exists.

Parameters:
  • month – the month of the release given as a datetime.

  • api_key – the Crossref Metadata API key.

Returns:

if release exists or not.

academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.transform_file(input_file_path: str, output_file_path: str)[source]

Transform a single Crossref Metadata json file. The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.

Parameters:
  • input_file_path – the path of the file to transform.

  • output_file_path – where to save the transformed file.

Returns:

None.

academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.transform_item(item)[source]

Transform a single Crossref Metadata JSON value.

Parameters:

item – a JSON value.

Returns:

the transformed item.