academic_observatory_workflows.pubmed_telescope.pubmed_telescope

Module Contents

Classes

Datafile

PMID

Object to identify a particular PMID record by the PMID value and Version.

PubmedUpdatefile

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

PubMedRelease

PubMedCustomEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a

Functions

create_dag(, schedule, ftp_server_url, ftp_port, ...)

Construct an PubMed Telescope instance.

download_datafiles(→ bool)

Download a list of Pubmed datafiles from their FTP server.

load_datafile(→ List[Dict])

Read in a Pubmed XML file and return it in a well-defined dictionary/json object.

save_pubmed_jsonl(output_path, data)

Save a Pubmed jsonl to file using the custom encoder.

parse_articles(→ Union[List, List[Dict]])

parse_deletes(→ Union[List, List[Dict]])

transform_pubmed(→ Union[bool, str, PubmedUpdatefile])

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes.

save_pubmed_merged_upserts(→ str)

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom

merge_upserts_and_deletes(→ Tuple[Dict[PMID, str], ...)

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be

add_attributes(obj)

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

change_pubmed_list_structure(→ Union[dict, list, str, ...)

Recursively travel down the Pubmed data tree to move the specified fields

bq_update_table_expiration(full_table_id, expiration_date)

Update a Bigquery table expiration date.

Attributes

bad_list_fields

class academic_observatory_workflows.pubmed_telescope.pubmed_telescope.Datafile(filename: str, file_index: int, baseline: bool, path_on_ftp: str, datafile_date: pendulum.DateTime, datafile_release: observatory.platform.workflows.workflow.ChangefileRelease = None)[source]
property download_file_path[source]
property transform_baseline_file_path[source]
property transform_upsert_file_path[source]
property transform_delete_file_path[source]
property merged_upsert_file_path[source]
__eq__(other)[source]

Return self==value.

from_dict() Datafile[source]
to_dict() Dict[source]
class academic_observatory_workflows.pubmed_telescope.pubmed_telescope.PMID[source]

Object to identify a particular PMID record by the PMID value and Version.

Parameters:
  • value – The PMID value.

  • Version – The Version of the PMID record.

value: int[source]
Version: int[source]
__eq__(other)[source]

Return self==value.

__hash__()[source]

Return hash(self).

to_dict() Dict[source]
from_dict() PMID[source]
to_str() str[source]
class academic_observatory_workflows.pubmed_telescope.pubmed_telescope.PubmedUpdatefile[source]

Object to hold the list of upserts and deletes for a single Pubmed updatefile.

Parameters:
  • name – Filename of the updatefile.

  • upserts – List of PMID objects that are to be upserted.

  • deletes – List of PMID objects that are to be deleted.

name: str[source]
upserts: List[PMID][source]
deletes: List[PMID][source]
__eq__(other)[source]

Return self==value.

to_dict() Dict[source]
from_dict() PubmedUpdatefile[source]
class academic_observatory_workflows.pubmed_telescope.pubmed_telescope.PubMedRelease(*, dag_id: str, run_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, start_date: pendulum.DateTime, end_date: pendulum.DateTime, year_first_run: bool, datafile_list: List[Datafile], baseline_upload_date: pendulum.DateTime)[source]

Bases: observatory.platform.workflows.workflow.ChangefileRelease

property baseline_files: List[Datafile][source]

Return a list of the “baseline” datafiles files for this release.

property updatefiles: List[Datafile][source]

Return a list of “updatefile” datafiles for this release.

property merged_upsert_uri_blob_pattern: str[source]

Create a uri blob pattern for importing the transformed merged upserts from GCS into Bigquery.

Returns:

Uri pattern for merged transform files.

property merged_delete_transfer_uri: str[source]

Create a uri for importing the transformed merged deletes from GCS into Bigquery.

Returns:

uri for merged transform files.

property merged_delete_file_path[source]
schema_file_path(record_type: str) str[source]
transfer_blob_pattern(table_type: str) str[source]

Create a blob pattern for importing the transformed unmerged records from GCS into Bigquery.

Parameters:

table_type – Type of the record.

Returns:

Uri pattern for transformed files.

static from_dict(dict_: dict) PubMedRelease[source]
to_dict() dict[source]
academic_observatory_workflows.pubmed_telescope.pubmed_telescope.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, bq_dataset_id: str = 'pubmed', bq_table_id: str = 'pubmed', bq_dataset_description: str = 'Pubmed Medline database, only PubmedArticle records: https://pubmed.ncbi.nlm.nih.gov/about/', start_date: pendulum.DateTime = pendulum.datetime(year=2022, month=12, day=8), schedule: str = '@weekly', ftp_server_url: str = 'ftp.ncbi.nlm.nih.gov', ftp_port: int = 21, reset_ftp_counter: int = 40, max_download_retry: int = 5, queue: str = 'remote_queue', snapshot_expiry_days: int = 31, max_processes: int = 4, max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

Construct an PubMed Telescope instance.

https://pubmed.ncbi.nlm.nih.gov/

Parameters:
  • dag_id – the id of the DAG.

  • cloud_workspace – Cloud settings.

  • observatory_api_conn_id – Observatory API connection for Airflow.

  • bq_dataset_id – Dataset name for final tables.

  • bq_table_id – Table name of the final Pubmed table.

  • bq_dataset_description – Description of the Pubmed dataset.

  • start_date – The start date of the DAG.

  • schedule – How often the DAG should run.

  • ftp_server_url – Server address of Pubmed’s FTP server.

  • ftp_port – Port for connectiong to Pubmed’s FTP server.

  • reset_ftp_counter – Resets FTP connection after downloading x number of files.

  • max_download_retry – Maximum number of retries of a single Pubmed file from the FTP server before throwing an error.

  • queue – The queue that the tasks should run on, “default” or “remote_queue”.

  • snapshot_expiry_days – How long until the backup snapshot (before this release’s upserts and deletes) of the Pubmed table exist in BQ.

  • max_processes – Max number of parallel processes.

  • max_active_runs – the maximum number of DAG runs that can be run at once.

  • retries – the number of times to retry a task.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.download_datafiles(datafile_list: List[Datafile], ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_retry: int) bool[source]

Download a list of Pubmed datafiles from their FTP server.

Parameters:
  • datafile_list – List of datafiles to download from their FTP server.

  • ftp_server_url – FTP server URL.

  • ftp_port – Port for the FTP connection.

  • reset_ftp_counter – After this number of files, reset the FTP connection

to make sure that the connect is not reset by the host. :param max_download_retry: Maximum number of retries for downloading one datafile before throwing an error.

Return download_success:

True if downloading all of the datafiles were successful.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.load_datafile(input_path: str) List[Dict][source]

Read in a Pubmed XML file and return it in a well-defined dictionary/json object.

Parameters:

input_path – Path to the Pubmed xml.gz file.

Return data:

A list of Pubmed records.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.save_pubmed_jsonl(output_path: str, data: List[Dict])[source]

Save a Pubmed jsonl to file using the custom encoder.

Parameters:
  • output_path – Path of the output file.

  • data – The data to write to file.

Returns:

None.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.parse_articles(data: Dict) List | List[Dict][source]
academic_observatory_workflows.pubmed_telescope.pubmed_telescope.parse_deletes(data: Dict) List | List[Dict][source]
academic_observatory_workflows.pubmed_telescope.pubmed_telescope.transform_pubmed(input_path: str, upsert_path: str) bool | str | PubmedUpdatefile[source]

Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. Used in parallelised transform task.

Parameters:
  • input_path – Path to the donwloaded xml.gz file.

  • upsert_path – Output file path for the upserts.

Returns:

filename of the baseline file or a PubmedUpdatefile object if it is an updatefile.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.save_pubmed_merged_upserts(filename: str, upsert_index: Dict[PMID, str], input_path: str, output_path: str) str[source]

Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom Pubmed encoder.

Parameters:
  • filename – Name of the original updatefile used by the upsert_index.

  • upserts – A list of upsert keys to pull out and write to file.

  • input_path – Where the original upsert records are stored.

  • output_path – Destination of where to write the merged upsert records.

Return output_path:

For logging purposes and ensuring the multithreaded task completed.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.merge_upserts_and_deletes(updatefiles: List[PubmedUpdatefile]) Tuple[Dict[PMID, str], Set[PMID]][source]

Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be easily pulled from file.

Parameters:

updatefiles – List of PubmedUpdatefile objects to merge together.

Returns:

The merged upserts and deletes.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.add_attributes(obj: Bio.Entrez.Parser.StringElement | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.OrderedListElement | list | str)[source]

Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.

Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements.

Parameters:

obj – Input object, being one of the Biopython data classes.

Return new:

Object with attributes added as keys to the dictionary.

academic_observatory_workflows.pubmed_telescope.pubmed_telescope.bad_list_fields[source]
academic_observatory_workflows.pubmed_telescope.pubmed_telescope.change_pubmed_list_structure(obj: dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement) dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement[source]

Recursively travel down the Pubmed data tree to move the specified fields up one level to make it easier to query in Bigquery.

For example, the original data can look something like

{
“AuthorList”: {

“CompleteYN”: “Y”, “Author”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

}

The “Author” field will be removed and the data from it will be moved up to the “List” level, along with any data specified in the “bad_list_fields” dictionary:

{

“AuthorListCompleteYN”: “Y”, “AuthorListType”: None, “AuthorList”: [{ “First”: “Foo”, “Last”: “Bar” },

{ “First”: “James”, “Last”: “Bond” }]

}

Parameters:

obj – Incoming data object.

Returns:

Any type as it could be a Pubmed dataclass.

class academic_observatory_workflows.pubmed_telescope.pubmed_telescope.PubMedCustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: json.JSONEncoder

Custom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data.

For example, the AbstractText field can be a string or an array containing background, methods, etc, but Bigquery requires it to be well defined and it can’t be both an array and a string in the schema. This encoder forces the below fields to be a string when written to a json file.

write_as_single_field = ['AbstractText', 'Affiliation', 'ArticleTitle', 'b', 'BookTitle', 'Citation', 'CoiStatement',...[source]
_transform_obj_data(obj)[source]
encode(obj)[source]

Return a JSON string representation of a Python data structure.

>>> from json.encoder import JSONEncoder
>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
academic_observatory_workflows.pubmed_telescope.pubmed_telescope.bq_update_table_expiration(full_table_id: str, expiration_date: pendulum.datetime)[source]

Update a Bigquery table expiration date.

Parameters:
  • full_table_id – Full qualified table id.

  • expiration_date – Expiration date of the table.

Returns:

None.