academic_observatory_workflows.pubmed_telescope.tasks

Attributes

bad_list_fields

Classes

PMID

Object to identify a particular PMID record by the PMID value and Version.

PubmedUpdatefile

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

PubMedCustomEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a

Functions

fetch_release(→ str)

Get a list of all files to process for this release.

short_circuit(→ bool)

Don't skip this DAG run if:

branch_baseline_or_updatefiles(→ None)

baseline_download(→ None)

Download files from PubMed's FTP server for this release.

updatefiles_download(→ None)

Download the updatefiles from PubMed's FTP server for this release.

baseline_upload_downloaded(→ None)

Upload downloaded baseline files to GCS.

updatefiles_upload_downloaded(→ None)

Upload downloaded updatefiles files to GCS.

baseline_transform(→ None)

Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import.

updatefiles_transform(→ List[Dict])

Transform the *.xml.gz files downloaded from PubMed's FTP server into usable json-like files for BigQuery import.

baseline_upload_transformed(→ None)

Upload transformed baseline files to GCS.

updatefiles_merge_upserts_deletes(→ None)

Merge the upserts and deletes for this release period.

updatefiles_upload_merged_upsert_records(→ None)

Upload the merged upsert records to GCS.

baseline_bq_load(→ None)

Ingest the baseline table from GCS to BQ using a file pattern.

updatefiles_bq_load_upsert_table(→ None)

Ingest the upsert records from GCS to BQ using a glob pattern.

updatefiles_bq_upsert_records(→ None)

Upsert records into the main table.

updatefiles_upload_merged_delete_records(→ None)

Upload the merged delete records to GCS.

updatefiles_bq_load_delete_table(→ None)

Ingest delete records from GCS to BQ.

updatefiles_bq_delete_records(→ None)

Removed records from the main table that are specified in delete table.

add_dataset_releases(→ None)

Adds release information to the API.

cleanup_workflow(→ None)

Cleanup files from this workflow run.

login_to_ftp(→ ftplib.FTP)

create_snapshot(→ None)

Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes.

download_datafiles(→ bool)

Download a list of Pubmed datafiles from their FTP server.

save_pubmed_jsonl(output_path, data)

Save a Pubmed jsonl to file using the custom encoder.

parse_articles(→ Union[List, List[Dict]])

parse_deletes(→ Union[List, List[Dict]])

parse_pubmed_element(elem[, validate, ignore_errors])

init_logging()

transform_pubmed(→ Union[bool, str, PubmedUpdatefile])

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes.

save_pubmed_merged_upserts(→ str)

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom

merge_upserts_and_deletes(→ Tuple[Dict[PMID, str], ...)

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be

add_attributes(obj)

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

change_pubmed_list_structure(→ Union[dict, list, str, ...)

Recursively travel down the Pubmed data tree to move the specified fields

bq_update_table_expiration(→ None)

Update a Bigquery table expiration date.

Module Contents

class academic_observatory_workflows.pubmed_telescope.tasks.PMID[source]

Object to identify a particular PMID record by the PMID value and Version.

Parameters:
  • value – The PMID value.

  • Version – The Version of the PMID record.

value: int[source]
Version: int[source]
__eq__(other)[source]
__hash__()[source]
to_dict() Dict[source]
from_dict() PMID[source]
to_str() str[source]
class academic_observatory_workflows.pubmed_telescope.tasks.PubmedUpdatefile[source]

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

Parameters:
  • name – Filename of the updatefile.

  • upserts – List of PMID objects that are to be upserted.

  • deletes – List of PMID objects that are to be deleted.

name: str[source]
upserts: List[PMID][source]
deletes: List[PMID][source]
__eq__(other)[source]
to_dict() Dict[source]
from_dict() PubmedUpdatefile[source]
academic_observatory_workflows.pubmed_telescope.tasks.fetch_release(dag_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, run_id: str, dag_run: airflow.models.DagRun, data_interval_end: pendulum.DateTime, bq_dataset_id: str, api_bq_dataset_id: str, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int) str[source]

Get a list of all files to process for this release.

Determine if workflow needs to redownload the baseline files again because of a new yearly release.

Parameters:
  • dag_id – The ID of the dag

  • cloud_workspace – The cloud workspace object

  • run_id – The run ID of this dagrun

  • dag_run – The DagRun object

  • data_interval_end – The end of the data interval for this run

  • bq_dataset_id – The bigquery datset ID

  • api_bq_dataset_id – The dataset ID of the bigquery API.

  • ftp_server_url – The host of the pubmed data

  • ftp_port – The port to access the ftp server

  • reset_ftp_counter – After this many files, reset the ftp connection

academic_observatory_workflows.pubmed_telescope.tasks.short_circuit(release: dict) bool[source]

Don’t skip this DAG run if: a) We are in the first run of the year b) We are not in the first run of the year and there are updatefiles

Returns:

Whether to skip this dag run or not

academic_observatory_workflows.pubmed_telescope.tasks.branch_baseline_or_updatefiles(release: dict) None[source]
academic_observatory_workflows.pubmed_telescope.tasks.baseline_download(release: dict, ftp_server_url: str, ftp_port: str, reset_ftp_counter: int, max_download_attempt: int) None[source]

Download files from PubMed’s FTP server for this release. Unable to do this in parallel because their FTP server is not able to handle many requests at once.

Parameters:
  • ftp_server_url – The host of the pubmed data

  • ftp_port – The port to access the ftp server

  • max_download_attempt – Fail after this many unsuccessful download attempts

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_download(release: dict, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) None[source]

Download the updatefiles from PubMed’s FTP server for this release. Unable to do this in parallel due to limitations of their FTP server.

Parameters:
  • ftp_server_url – The host of the pubmed data

  • ftp_port – The port to access the ftp server

  • max_download_attempt – Fail after this many unsuccessful download attempts

academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_downloaded(release: dict) None[source]

Upload downloaded baseline files to GCS.

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_downloaded(release: dict) None[source]

Upload downloaded updatefiles files to GCS.

academic_observatory_workflows.pubmed_telescope.tasks.baseline_transform(release: dict, max_processes: int | None = None) None[source]

Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import.

Parameters:

max_proceses – The max number of processes to use for multiprocessing

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_transform(release: dict, max_processes: int | None = None) List[Dict][source]

Transform the *.xml.gz files downloaded from PubMed’s FTP server into usable json-like files for BigQuery import. This is a multithreaded and pulls the PubmedArticle records from the downloaded XML files.

Parameters:

max_processes – The number of processes to use for multithreading

academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_transformed(release: dict) None[source]

Upload transformed baseline files to GCS.

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_merge_upserts_deletes(release: dict, updatefiles: List[Dict], max_processes: int | None = None) None[source]

Merge the upserts and deletes for this release period.

Parameters:

max_processes – The number of processes to use for multithreading

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_upsert_records(release: dict) None[source]

Upload the merged upsert records to GCS.

academic_observatory_workflows.pubmed_telescope.tasks.baseline_bq_load(release: dict, bq_dataset_description: str, main_table_name: str, baseline_table_description: str) None[source]

Ingest the baseline table from GCS to BQ using a file pattern.

Parameters:
  • bq_dataset_description – The description to give the bigquery dataset

  • main_table_name – The name of the table

baseline_table_description: The description to give the table

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_upsert_table(release: dict, upsert_table_name: str, upsert_table_description: str) None[source]

Ingest the upsert records from GCS to BQ using a glob pattern.

Parameters:
  • upsert_table_name – The name of the upsert table to upload

  • upsert_table_description – The description to give the upsert table

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_upsert_records(release: dict, main_table_name: str, upsert_table_name: str) None[source]

Upsert records into the main table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.

Parameters:
  • main_table_name – The name of the main table to upsert to

  • upsert_table_name – The name of the table containing the records to upsert

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_delete_records(release: dict) None[source]

Upload the merged delete records to GCS.

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_delete_table(release: dict, delete_table_name: str, delete_table_description: str) None[source]

Ingest delete records from GCS to BQ.

Parameters:
  • delete_table_name – The name of the delete table

  • delete_table_description – The description to give the delete table

academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_delete_records(release: dict, main_table_name: str, delete_table_name: str) None[source]

Removed records from the main table that are specified in delete table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.

Parameters:
  • main_table_name – The name of the table to delete records from

  • delete_table_name – The name of the table containing the records to delete

academic_observatory_workflows.pubmed_telescope.tasks.add_dataset_releases(release: dict, api_bq_dataset_id: str) None[source]

Adds release information to the API.

Parameters:

api_bq_datastet_id – bigquery dataset ID of the API

academic_observatory_workflows.pubmed_telescope.tasks.cleanup_workflow(release: dict) None[source]

Cleanup files from this workflow run.

Delete local download files, transform files and current task instance.

academic_observatory_workflows.pubmed_telescope.tasks.login_to_ftp(host: str, port: int) ftplib.FTP[source]
academic_observatory_workflows.pubmed_telescope.tasks.create_snapshot(release: dict, bq_dataset_id: str, bq_main_table_name: str, snapshot_expiry_days: int) None[source]

Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes.

academic_observatory_workflows.pubmed_telescope.tasks.download_datafiles(datafile_list: List[academic_observatory_workflows.pubmed_telescope.datafile.Datafile], ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) bool[source]

Download a list of Pubmed datafiles from their FTP server.

Parameters:
  • datafile_list – List of datafiles to download from their FTP server.

  • ftp_server_url – FTP server URL.

  • ftp_port – Port for the FTP connection.

  • reset_ftp_counter – After this number of files, reset the FTP connection

to make sure that the connect is not reset by the host. :param max_download_attempt: Maximum number of retries for downloading one datafile before throwing an error.

Return download_success:

True if downloading all of the datafiles were successful.

academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_jsonl(output_path: str, data: List[Dict])[source]

Save a Pubmed jsonl to file using the custom encoder.

Parameters:
  • output_path – Path of the output file.

  • data – The data to write to file.

Returns:

None.

academic_observatory_workflows.pubmed_telescope.tasks.parse_articles(data: Dict) List | List[Dict][source]
academic_observatory_workflows.pubmed_telescope.tasks.parse_deletes(data: Dict) List | List[Dict][source]
academic_observatory_workflows.pubmed_telescope.tasks.parse_pubmed_element(elem, validate=False, ignore_errors=True)[source]
academic_observatory_workflows.pubmed_telescope.tasks.init_logging()[source]
academic_observatory_workflows.pubmed_telescope.tasks.transform_pubmed(input_path: str, upsert_path: str) bool | str | PubmedUpdatefile[source]

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. Used in parallelised transform task.

Parameters:
  • input_path – Path to the donwloaded xml.gz file.

  • upsert_path – Output file path for the upserts.

Returns:

filename of the baseline file or a PubmedUpdatefile object if it is an updatefile.

academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_merged_upserts(filename: str, upsert_index: Dict[PMID, str], input_path: str, output_path: str) str[source]

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom Pubmed encoder.

Parameters:
  • filename – Name of the original updatefile used by the upsert_index.

  • upserts – A list of upsert keys to pull out and write to file.

  • input_path – Where the original upsert records are stored.

  • output_path – Destination of where to write the merged upsert records.

Return output_path:

For logging purposes and ensuring the multithreaded task completed.

academic_observatory_workflows.pubmed_telescope.tasks.merge_upserts_and_deletes(updatefiles: List[PubmedUpdatefile]) Tuple[Dict[PMID, str], Set[PMID]][source]

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be easily pulled from file.

Parameters:

updatefiles – List of PubmedUpdatefile objects to merge together.

Returns:

The merged upserts and deletes.

academic_observatory_workflows.pubmed_telescope.tasks.add_attributes(obj: Bio.Entrez.Parser.StringElement | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.OrderedListElement | list | str)[source]

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements.

Parameters:

obj – Input object, being one of the Biopython data classes.

Return new:

Object with attributes added as keys to the dictionary.

academic_observatory_workflows.pubmed_telescope.tasks.bad_list_fields[source]
academic_observatory_workflows.pubmed_telescope.tasks.change_pubmed_list_structure(obj: dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement) dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement[source]

Recursively travel down the Pubmed data tree to move the specified fields up one level to make it easier to query in Bigquery.

For example, the original data can look something like

{
“AuthorList”: {

“CompleteYN”: “Y”, “Author”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

}

The “Author” field will be removed and the data from it will be moved up to the “List” level, along with any data specified in the “bad_list_fields” dictionary:

{

“AuthorListCompleteYN”: “Y”, “AuthorListType”: None, “AuthorList”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

Parameters:

obj – Incoming data object.

Returns:

Any type as it could be a Pubmed dataclass.

class academic_observatory_workflows.pubmed_telescope.tasks.PubMedCustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.JSONEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data.

For example, the AbstractText field can be a string or an array containing background, methods, etc, but Bigquery requires it to be well defined and it can’t be both an array and a string in the schema. This encoder forces the below fields to be a string when written to a json file.

write_as_single_field = ['AbstractText', 'Affiliation', 'ArticleTitle', 'b', 'BookTitle', 'Citation', 'CoiStatement',...[source]
_transform_obj_data(obj)[source]
encode(obj)[source]

Return a JSON string representation of a Python data structure.

>>> from json.encoder import JSONEncoder
>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
academic_observatory_workflows.pubmed_telescope.tasks.bq_update_table_expiration(full_table_id: str, expiration_date: pendulum.datetime) None[source]

Update a Bigquery table expiration date.

Parameters:
  • full_table_id – Full qualified table id.

  • expiration_date – Expiration date of the table.