academic_observatory_workflows.pubmed_telescope.tasks
Attributes
Classes
Object to identify a particular PMID record by the PMID value and Version. |
|
Object to hold the list of upserts and deletes for a single Pubmed updatefile. |
|
Custom encoder for json dump for it to write a dictionary field as a string of text for a |
Functions
|
Get a list of all files to process for this release. |
|
Don't skip this DAG run if: |
|
|
|
Download files from PubMed's FTP server for this release. |
|
Download the updatefiles from PubMed's FTP server for this release. |
|
Upload downloaded baseline files to GCS. |
|
Upload downloaded updatefiles files to GCS. |
|
Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import. |
|
Transform the *.xml.gz files downloaded from PubMed's FTP server into usable json-like files for BigQuery import. |
|
Upload transformed baseline files to GCS. |
Merge the upserts and deletes for this release period. |
|
Upload the merged upsert records to GCS. |
|
|
Ingest the baseline table from GCS to BQ using a file pattern. |
|
Ingest the upsert records from GCS to BQ using a glob pattern. |
|
Upsert records into the main table. |
Upload the merged delete records to GCS. |
|
|
Ingest delete records from GCS to BQ. |
|
Removed records from the main table that are specified in delete table. |
|
Adds release information to the API. |
|
Cleanup files from this workflow run. |
|
|
|
Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes. |
|
Download a list of Pubmed datafiles from their FTP server. |
|
Save a Pubmed jsonl to file using the custom encoder. |
|
|
|
|
|
|
|
Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. |
|
Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom |
|
Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be |
|
Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs. |
|
Recursively travel down the Pubmed data tree to move the specified fields |
|
Update a Bigquery table expiration date. |
Module Contents
- class academic_observatory_workflows.pubmed_telescope.tasks.PMID[source]
Object to identify a particular PMID record by the PMID value and Version.
- Parameters:
value – The PMID value.
Version – The Version of the PMID record.
- class academic_observatory_workflows.pubmed_telescope.tasks.PubmedUpdatefile[source]
Object to hold the list of upserts and deletes for a single Pubmed updatefile.
- Parameters:
name – Filename of the updatefile.
upserts – List of PMID objects that are to be upserted.
deletes – List of PMID objects that are to be deleted.
- from_dict() PubmedUpdatefile[source]
- academic_observatory_workflows.pubmed_telescope.tasks.fetch_release(dag_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, run_id: str, dag_run: airflow.models.DagRun, data_interval_end: pendulum.DateTime, bq_dataset_id: str, api_bq_dataset_id: str, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int) str[source]
Get a list of all files to process for this release.
Determine if workflow needs to redownload the baseline files again because of a new yearly release.
- Parameters:
dag_id – The ID of the dag
cloud_workspace – The cloud workspace object
run_id – The run ID of this dagrun
dag_run – The DagRun object
data_interval_end – The end of the data interval for this run
bq_dataset_id – The bigquery datset ID
api_bq_dataset_id – The dataset ID of the bigquery API.
ftp_server_url – The host of the pubmed data
ftp_port – The port to access the ftp server
reset_ftp_counter – After this many files, reset the ftp connection
- academic_observatory_workflows.pubmed_telescope.tasks.short_circuit(release: dict) bool[source]
Don’t skip this DAG run if: a) We are in the first run of the year b) We are not in the first run of the year and there are updatefiles
- Returns:
Whether to skip this dag run or not
- academic_observatory_workflows.pubmed_telescope.tasks.branch_baseline_or_updatefiles(release: dict) None[source]
- academic_observatory_workflows.pubmed_telescope.tasks.baseline_download(release: dict, ftp_server_url: str, ftp_port: str, reset_ftp_counter: int, max_download_attempt: int) None[source]
Download files from PubMed’s FTP server for this release. Unable to do this in parallel because their FTP server is not able to handle many requests at once.
- Parameters:
ftp_server_url – The host of the pubmed data
ftp_port – The port to access the ftp server
max_download_attempt – Fail after this many unsuccessful download attempts
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_download(release: dict, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) None[source]
Download the updatefiles from PubMed’s FTP server for this release. Unable to do this in parallel due to limitations of their FTP server.
- Parameters:
ftp_server_url – The host of the pubmed data
ftp_port – The port to access the ftp server
max_download_attempt – Fail after this many unsuccessful download attempts
- academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_downloaded(release: dict) None[source]
Upload downloaded baseline files to GCS.
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_downloaded(release: dict) None[source]
Upload downloaded updatefiles files to GCS.
- academic_observatory_workflows.pubmed_telescope.tasks.baseline_transform(release: dict, max_processes: int | None = None) None[source]
Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import.
- Parameters:
max_proceses – The max number of processes to use for multiprocessing
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_transform(release: dict, max_processes: int | None = None) List[Dict][source]
Transform the *.xml.gz files downloaded from PubMed’s FTP server into usable json-like files for BigQuery import. This is a multithreaded and pulls the PubmedArticle records from the downloaded XML files.
- Parameters:
max_processes – The number of processes to use for multithreading
- academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_transformed(release: dict) None[source]
Upload transformed baseline files to GCS.
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_merge_upserts_deletes(release: dict, updatefiles: List[Dict], max_processes: int | None = None) None[source]
Merge the upserts and deletes for this release period.
- Parameters:
max_processes – The number of processes to use for multithreading
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_upsert_records(release: dict) None[source]
Upload the merged upsert records to GCS.
- academic_observatory_workflows.pubmed_telescope.tasks.baseline_bq_load(release: dict, bq_dataset_description: str, main_table_name: str, baseline_table_description: str) None[source]
Ingest the baseline table from GCS to BQ using a file pattern.
- Parameters:
bq_dataset_description – The description to give the bigquery dataset
main_table_name – The name of the table
baseline_table_description: The description to give the table
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_upsert_table(release: dict, upsert_table_name: str, upsert_table_description: str) None[source]
Ingest the upsert records from GCS to BQ using a glob pattern.
- Parameters:
upsert_table_name – The name of the upsert table to upload
upsert_table_description – The description to give the upsert table
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_upsert_records(release: dict, main_table_name: str, upsert_table_name: str) None[source]
Upsert records into the main table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.
- Parameters:
main_table_name – The name of the main table to upsert to
upsert_table_name – The name of the table containing the records to upsert
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_delete_records(release: dict) None[source]
Upload the merged delete records to GCS.
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_delete_table(release: dict, delete_table_name: str, delete_table_description: str) None[source]
Ingest delete records from GCS to BQ.
- Parameters:
delete_table_name – The name of the delete table
delete_table_description – The description to give the delete table
- academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_delete_records(release: dict, main_table_name: str, delete_table_name: str) None[source]
Removed records from the main table that are specified in delete table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table.
- Parameters:
main_table_name – The name of the table to delete records from
delete_table_name – The name of the table containing the records to delete
- academic_observatory_workflows.pubmed_telescope.tasks.add_dataset_releases(release: dict, api_bq_dataset_id: str) None[source]
Adds release information to the API.
- Parameters:
api_bq_datastet_id – bigquery dataset ID of the API
- academic_observatory_workflows.pubmed_telescope.tasks.cleanup_workflow(release: dict) None[source]
Cleanup files from this workflow run.
Delete local download files, transform files and current task instance.
- academic_observatory_workflows.pubmed_telescope.tasks.login_to_ftp(host: str, port: int) ftplib.FTP[source]
- academic_observatory_workflows.pubmed_telescope.tasks.create_snapshot(release: dict, bq_dataset_id: str, bq_main_table_name: str, snapshot_expiry_days: int) None[source]
Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes.
- academic_observatory_workflows.pubmed_telescope.tasks.download_datafiles(datafile_list: List[academic_observatory_workflows.pubmed_telescope.datafile.Datafile], ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) bool[source]
Download a list of Pubmed datafiles from their FTP server.
- Parameters:
datafile_list – List of datafiles to download from their FTP server.
ftp_server_url – FTP server URL.
ftp_port – Port for the FTP connection.
reset_ftp_counter – After this number of files, reset the FTP connection
to make sure that the connect is not reset by the host. :param max_download_attempt: Maximum number of retries for downloading one datafile before throwing an error.
- Return download_success:
True if downloading all of the datafiles were successful.
- academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_jsonl(output_path: str, data: List[Dict])[source]
Save a Pubmed jsonl to file using the custom encoder.
- Parameters:
output_path – Path of the output file.
data – The data to write to file.
- Returns:
None.
- academic_observatory_workflows.pubmed_telescope.tasks.parse_articles(data: Dict) List | List[Dict][source]
- academic_observatory_workflows.pubmed_telescope.tasks.parse_deletes(data: Dict) List | List[Dict][source]
- academic_observatory_workflows.pubmed_telescope.tasks.parse_pubmed_element(elem, validate=False, ignore_errors=True)[source]
- academic_observatory_workflows.pubmed_telescope.tasks.transform_pubmed(input_path: str, upsert_path: str) bool | str | PubmedUpdatefile[source]
Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. Used in parallelised transform task.
- Parameters:
input_path – Path to the donwloaded xml.gz file.
upsert_path – Output file path for the upserts.
- Returns:
filename of the baseline file or a PubmedUpdatefile object if it is an updatefile.
- academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_merged_upserts(filename: str, upsert_index: Dict[PMID, str], input_path: str, output_path: str) str[source]
Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom Pubmed encoder.
- Parameters:
filename – Name of the original updatefile used by the upsert_index.
upserts – A list of upsert keys to pull out and write to file.
input_path – Where the original upsert records are stored.
output_path – Destination of where to write the merged upsert records.
- Return output_path:
For logging purposes and ensuring the multithreaded task completed.
- academic_observatory_workflows.pubmed_telescope.tasks.merge_upserts_and_deletes(updatefiles: List[PubmedUpdatefile]) Tuple[Dict[PMID, str], Set[PMID]][source]
Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be easily pulled from file.
- Parameters:
updatefiles – List of PubmedUpdatefile objects to merge together.
- Returns:
The merged upserts and deletes.
- academic_observatory_workflows.pubmed_telescope.tasks.add_attributes(obj: Bio.Entrez.Parser.StringElement | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.OrderedListElement | list | str)[source]
Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.
Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements.
- Parameters:
obj – Input object, being one of the Biopython data classes.
- Return new:
Object with attributes added as keys to the dictionary.
- academic_observatory_workflows.pubmed_telescope.tasks.change_pubmed_list_structure(obj: dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement) dict | list | str | Bio.Entrez.Parser.DictionaryElement | Bio.Entrez.Parser.ListElement | Bio.Entrez.Parser.StringElement[source]
Recursively travel down the Pubmed data tree to move the specified fields up one level to make it easier to query in Bigquery.
For example, the original data can look something like
- {
- “AuthorList”: {
“CompleteYN”: “Y”, “Author”: [{ “First”: “Foo”, “Last”: “Bar” },
{ “First”: “James”, “Last”: “Bond” }]
}
}
The “Author” field will be removed and the data from it will be moved up to the “List” level, along with any data specified in the “bad_list_fields” dictionary:
- {
“AuthorListCompleteYN”: “Y”, “AuthorListType”: None, “AuthorList”: [{ “First”: “Foo”, “Last”: “Bar” },
{ “First”: “James”, “Last”: “Bond” }]
}
- Parameters:
obj – Incoming data object.
- Returns:
Any type as it could be a Pubmed dataclass.
- class academic_observatory_workflows.pubmed_telescope.tasks.PubMedCustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]
Bases:
json.JSONEncoderCustom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data.
For example, the AbstractText field can be a string or an array containing background, methods, etc, but Bigquery requires it to be well defined and it can’t be both an array and a string in the schema. This encoder forces the below fields to be a string when written to a json file.