academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope
Module Contents
Classes
Functions
|
The Crossref Metadata telescope |
|
|
|
Return API token |
|
Check if a release exists. |
|
Transform a single Crossref Metadata json file. |
|
Transform a single Crossref Metadata JSON value. |
Attributes
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.SNAPSHOT_URL = 'https://api.crossref.org/snapshots/monthly/{year}/{month:02d}/all.json.tar.gz'[source]
- class academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.CrossrefMetadataRelease(*, dag_id: str, run_id: str, snapshot_date: pendulum.DateTime, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, batch_size: int)[source]
Bases:
observatory.platform.workflows.workflow.SnapshotRelease
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, bq_dataset_id: str = 'crossref_metadata', bq_table_name: str = 'crossref_metadata', api_dataset_id: str = 'crossref_metadata', schema_folder: str = project_path('crossref_metadata_telescope', 'schema'), dataset_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', table_description: str = 'The Crossref Metadata Plus dataset: https://www.crossref.org/services/metadata-retrieval/metadata-plus/', crossref_metadata_conn_id: str = 'crossref_metadata', observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, max_processes: int = os.cpu_count(), batch_size: int = 20, start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7), schedule: str = '0 0 7 * *', catchup: bool = True, queue: str = 'remote_queue', max_active_runs: int = 1, retries: int = 3) airflow.DAG [source]
The Crossref Metadata telescope
- Parameters:
dag_id – the id of the DAG.
cloud_workspace – the cloud workspace settings.
bq_dataset_id – the BigQuery dataset id.
bq_table_name – the BigQuery table name.
api_dataset_id – the Dataset ID to use when storing releases.
schema_folder – the SQL schema path.
dataset_description – description for the BigQuery dataset.
table_description – description for the BigQuery table.
crossref_metadata_conn_id – the Crossref Metadata Airflow connection key.
observatory_api_conn_id – the Observatory API connection key.
max_processes – the number of processes used with ProcessPoolExecutor to transform files in parallel.
batch_size – the number of files to send to ProcessPoolExecutor at one time.
start_date – the start date of the DAG.
schedule – the schedule interval of the DAG.
catchup – whether to catchup the DAG or not.
queue – what Airflow queue this job runs on.
max_active_runs – the maximum number of DAG runs that can be run at once.
retries – the number of times to retry a task.
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.make_snapshot_url(snapshot_date: pendulum.DateTime) str [source]
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.get_api_key(crossref_metadata_conn_id: str)[source]
Return API token
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.check_release_exists(month: pendulum.DateTime, api_key: str) bool [source]
Check if a release exists.
- Parameters:
month – the month of the release given as a datetime.
api_key – the Crossref Metadata API key.
- Returns:
if release exists or not.
- academic_observatory_workflows.crossref_metadata_telescope.crossref_metadata_telescope.transform_file(input_file_path: str, output_file_path: str)[source]
Transform a single Crossref Metadata json file. The json file is converted to a jsonl file and field names are transformed so they are accepted by BigQuery.
- Parameters:
input_file_path – the path of the file to transform.
output_file_path – where to save the transformed file.
- Returns:
None.