academic_observatory_workflows.workflows.pubmed_telescope

Module Contents

Classes

Datafile

PMID

Object to identify a particular PMID record by the PMID value and Version.

PubmedUpdatefile

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

PubMedRelease

PubMedTelescope

PubMed Telescope

PubMedCustomEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a

Functions

download_datafiles(→ bool)

Download a list of Pubmed datafiles from their FTP server.

load_datafile(→ List[Dict])

Read in a Pubmed XML file and return it in a well-defined dictionary/json object.

save_pubmed_jsonl(output_path, data)

Save a Pubmed jsonl to file using the custom encoder.

parse_articles(→ Union[List, List[Dict]])

parse_deletes(→ Union[List, List[Dict]])

transform_pubmed(→ Union[bool, str, PubmedUpdatefile])

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes.

save_pubmed_merged_upserts(→ str)

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom

merge_upserts_and_deletes(→ Tuple[Dict[PMID, str], ...)

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be

add_attributes(obj)

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

change_pubmed_list_structure(→ Union[dict, list, str, ...)

Recursively travel down the Pubmed data tree to move the specified fields

bq_update_table_expiration(full_table_id, expiration_date)

Update a Bigquery table expiration date.

Attributes

bad_list_fields

class academic_observatory_workflows.workflows.pubmed_telescope.Datafile(filename: str, file_index: int, baseline: bool, path_on_ftp: str, datafile_date: pendulum.DateTime, datafile_release: observatory.platform.workflows.workflow.ChangefileRelease = None)[source]
property download_file_path[source]
property transform_baseline_file_path[source]
property transform_upsert_file_path[source]
property transform_delete_file_path[source]
property merged_upsert_file_path[source]
__eq__(other)[source]

Return self==value.

from_dict() Datafile[source]
to_dict() Dict[source]
class academic_observatory_workflows.workflows.pubmed_telescope.PMID[source]

Object to identify a particular PMID record by the PMID value and Version.

Parameters:
  • value – The PMID value.

  • Version – The Version of the PMID record.

value: int[source]
Version: int[source]
__eq__(other)[source]

Return self==value.

__hash__()[source]

Return hash(self).

to_dict() Dict[source]
from_dict() PMID[source]
to_str() str[source]
class academic_observatory_workflows.workflows.pubmed_telescope.PubmedUpdatefile[source]

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

Parameters:
  • name – Filename of the updatefile.

  • upserts – List of PMID objects that are to be upserted.

  • deletes – List of PMID objects that are to be deleted.

name: str[source]
upserts: List[PMID][source]
deletes: List[PMID][source]
__eq__(other)[source]

Return self==value.

to_dict() Dict[source]
from_dict() PubmedUpdatefile[source]
class academic_observatory_workflows.workflows.pubmed_telescope.PubMedRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, start_date: pendulum.DateTime, end_date: pendulum.DateTime, year_first_run: bool, datafile_list: List[Datafile])[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property merged_upsert_uri_blob_pattern: str[source]

Create a uri blob pattern for importing the transformed merged upserts from GCS into Bigquery.

Returns:

Uri pattern for merged transform files.

property merged_delete_transfer_uri: str[source]

Create a uri for importing the transformed merged deletes from GCS into Bigquery.

Returns:

uri for merged transform files.

property merged_delete_file_path[source]
schema_file_path(record_type: str) str[source]
transfer_blob_pattern(table_type: str) str[source]

Create a blob pattern for importing the transformed unmerged records from GCS into Bigquery.

Parameters:

table_type – Type of the record.

Returns:

Uri pattern for transformed files.

class academic_observatory_workflows.workflows.pubmed_telescope.PubMedTelescope(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, bq_dataset_id: str = 'pubmed', bq_table_id: str = 'pubmed', bq_dataset_description: str = 'Pubmed Medline database, only PubmedArticle records: https://pubmed.ncbi.nlm.nih.gov/about/', start_date: pendulum.DateTime = pendulum.datetime(year=2022, month=12, day=8), schedule: str = '@weekly', ftp_server_url: str = 'ftp.ncbi.nlm.nih.gov', ftp_port: int = 21, reset_ftp_counter: int = 40, max_download_retry: int = 5, queue: str = 'remote_queue', snapshot_expiry_days: int = 31, max_processes: int = 4)[source]

Bases: observatory.platform.workflows.workflow.Workflow

PubMed Telescope

Please visit: https://pubmed.ncbi.nlm.nih.gov/

list_datafiles_for_release(**kwargs) bool[source]

Get a list of all files to process for this release.

Determine if workflow needs to redownload the baseline files again because of a new yearly release.

make_release(**kwargs) PubMedRelease[source]

Make a Release instance. Gets the list of releases available from the release check (setup task).

Parameters:

kwargs – the context passed from the Airflow Operator.

See https://airflow.apache.org/docs/stable/macros-ref.html for a list of the keyword arguments that are passed to this argument. :return PubMedRelease.

create_snapshot(release: PubMedRelease, **kwargs)[source]

Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes.

download_baseline(release: PubMedRelease, **kwargs)[source]

Download files from PubMed’s FTP server for this release.

Unable to do this in parallel because their FTP server is not able to handle many requests at once.

upload_downloaded_baseline(release: PubMedRelease, **kwargs)[source]

Upload downloaded baseline files to GCS.

transform_baseline(release: PubMedRelease, **kwargs)[source]

Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import.

upload_transformed_baseline(release: PubMedRelease, **kwargs)[source]

Upload transformed baseline files to GCS.

bq_load_main_table(release: PubMedRelease, **kwargs)[source]

Ingest the baseline table from GCS to BQ using a file pattern.

download_updatefiles(release: PubMedRelease, **kwargs)[source]

Download the updatefiles from PubMed’s FTP server for this release.

Unable to do this in parallel due to limitations of their FTP server.

upload_downloaded_updatefiles(release: PubMedRelease, **kwargs)[source]

Upload downloaded updatefiles files to GCS.

transform_updatefiles(release: PubMedRelease, **kwargs)[source]

Transform the *.xml.gz files downloaded from PubMed’s FTP server into usable json-like files for BigQuery import.

This is a multithreaded and pulls the PubmedArticle records from the downloaded XML files.

merge_upserts_and_deletes(release: PubMedRelease, **kwargs)[source]

Merge the upserts and deletes for this release period.

upload_merged_upsert_records(release: PubMedRelease, **kwargs)[source]

Upload the merged upsert records to GCS.

bq_load_upsert_table(release: PubMedRelease, **kwargs)[source]

Ingest the upsert records from GCS to BQ using a glob pattern.

bq_upsert_records(release: PubMedRelease, **kwargs)[source]

Upsert records into the main table.

Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.

upload_merged_delete_records(release: PubMedRelease, **kwargs)[source]

Upload the merged delete records to GCS.

bq_load_delete_table(release: PubMedRelease, **kwargs)[source]

Ingest delete records from GCS to BQ.

bq_delete_records(release: PubMedRelease, **kwargs)[source]

Removed records from the main table that are specified in delete table.

Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.

add_new_dataset_release(release: PubMedRelease, **kwargs)[source]

Adds release information to the API.

cleanup(release: PubMedRelease, **kwargs)[source]

Cleanup files from this workflow run.

Delete local download files, tranform files and current task instance.

academic_observatory_workflows.workflows.pubmed_telescope.download_datafiles(datafile_list: List[Datafile], ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_retry: int) bool[source]

Download a list of Pubmed datafiles from their FTP server.

Parameters:
  • datafile_list – List of datafiles to download from their FTP server.

  • ftp_server_url – FTP server URL.

  • ftp_port – Port for the FTP connection.

  • reset_ftp_counter – After this number of files, reset the FTP connection

to make sure that the connect is not reset by the host. :param max_download_retry: Maximum number of retries for downloading one datafile before throwing an error.

Return download_success:

True if downloading all of the datafiles were successful.

academic_observatory_workflows.workflows.pubmed_telescope.load_datafile(input_path: str) List[Dict][source]

Read in a Pubmed XML file and return it in a well-defined dictionary/json object.

Parameters:

input_path – Path to the Pubmed xml.gz file.

Return data:

A list of Pubmed records.

academic_observatory_workflows.workflows.pubmed_telescope.save_pubmed_jsonl(output_path: str, data: List[Dict])[source]

Save a Pubmed jsonl to file using the custom encoder.

Parameters:
  • output_path – Path of the output file.

  • data – The data to write to file.

Returns:

None.

academic_observatory_workflows.workflows.pubmed_telescope.parse_articles(data: Dict) List | List[Dict][source]
academic_observatory_workflows.workflows.pubmed_telescope.parse_deletes(data: Dict) List | List[Dict][source]
academic_observatory_workflows.workflows.pubmed_telescope.transform_pubmed(input_path: str, upsert_path: str) bool | str | PubmedUpdatefile[source]

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. Used in parallelised transform task.

Parameters:
  • input_path – Path to the donwloaded xml.gz file.

  • upsert_path – Output file path for the upserts.

Returns:

filename of the baseline file or a PubmedUpdatefile object if it is an updatefile.

academic_observatory_workflows.workflows.pubmed_telescope.save_pubmed_merged_upserts(filename: str, upsert_index: Dict[PMID, str], input_path: str, output_path: str) str[source]

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom Pubmed encoder.

Parameters:
  • filename – Name of the original updatefile used by the upsert_index.

  • upserts – A list of upsert keys to pull out and write to file.

  • input_path – Where the original upsert records are stored.

  • output_path – Destination of where to write the merged upsert records.

Return output_path:

For logging purposes and ensuring the multithreaded task completed.

academic_observatory_workflows.workflows.pubmed_telescope.merge_upserts_and_deletes(updatefiles: List[PubmedUpdatefile]) Tuple[Dict[PMID, str], Set[PMID]][source]

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be easily pulled from file.

Parameters:

updatefiles – List of PubmedUpdatefile objects to merge together.

Returns:

The merged upserts and deletes.

academic_observatory_workflows.workflows.pubmed_telescope.add_attributes(obj: Bio.Entrez.Parser.StringElement | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.OrderedListElement | list | str)[source]

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements.

Parameters:

obj – Input object, being one of the Biopython data classes.

Return new:

Object with attributes added as keys to the dictionary.

academic_observatory_workflows.workflows.pubmed_telescope.bad_list_fields[source]
academic_observatory_workflows.workflows.pubmed_telescope.change_pubmed_list_structure(obj: dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement) dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement[source]

Recursively travel down the Pubmed data tree to move the specified fields up one level to make it easier to query in Bigquery.

For example, the original data can look something like

{
“AuthorList”: {

“CompleteYN”: “Y”, “Author”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

}

The “Author” field will be removed and the data from it will be moved up to the “List” level, along with any data specified in the “bad_list_fields” dictionary:

{

“AuthorListCompleteYN”: “Y”, “AuthorListType”: None, “AuthorList”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

Parameters:

obj – Incoming data object.

Returns:

Any type as it could be a Pubmed dataclass.

class academic_observatory_workflows.workflows.pubmed_telescope.PubMedCustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.JSONEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data.

For example, the AbstractText field can be a string or an array containing background, methods, etc, but Bigquery requires it to be well defined and it can’t be both an array and a string in the schema. This encoder forces the below fields to be a string when written to a json file.

write_as_single_field = ['AbstractText', 'Affiliation', 'ArticleTitle', 'b', 'BookTitle', 'Citation', 'CoiStatement',...[source]
_transform_obj_data(obj)[source]
encode(obj)[source]

Return a JSON string representation of a Python data structure.

>>> from json.encoder import JSONEncoder
>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
academic_observatory_workflows.workflows.pubmed_telescope.bq_update_table_expiration(full_table_id: str, expiration_date: pendulum.datetime)[source]

Update a Bigquery table expiration date.

Parameters:
  • full_table_id – Full qualified table id.

  • expiration_date – Expiration date of the table.

Returns:

None.