academic_observatory_workflows.pubmed_telescope.tasks ===================================================== .. py:module:: academic_observatory_workflows.pubmed_telescope.tasks Attributes ---------- .. autoapisummary:: academic_observatory_workflows.pubmed_telescope.tasks.bad_list_fields Classes ------- .. autoapisummary:: academic_observatory_workflows.pubmed_telescope.tasks.PMID academic_observatory_workflows.pubmed_telescope.tasks.PubmedUpdatefile academic_observatory_workflows.pubmed_telescope.tasks.PubMedCustomEncoder Functions --------- .. autoapisummary:: academic_observatory_workflows.pubmed_telescope.tasks.fetch_release academic_observatory_workflows.pubmed_telescope.tasks.short_circuit academic_observatory_workflows.pubmed_telescope.tasks.branch_baseline_or_updatefiles academic_observatory_workflows.pubmed_telescope.tasks.baseline_download academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_download academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_downloaded academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_downloaded academic_observatory_workflows.pubmed_telescope.tasks.baseline_transform academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_transform academic_observatory_workflows.pubmed_telescope.tasks.baseline_upload_transformed academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_merge_upserts_deletes academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_upsert_records academic_observatory_workflows.pubmed_telescope.tasks.baseline_bq_load academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_upsert_table academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_upsert_records academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_upload_merged_delete_records academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_load_delete_table academic_observatory_workflows.pubmed_telescope.tasks.updatefiles_bq_delete_records academic_observatory_workflows.pubmed_telescope.tasks.add_dataset_releases academic_observatory_workflows.pubmed_telescope.tasks.cleanup_workflow academic_observatory_workflows.pubmed_telescope.tasks.login_to_ftp academic_observatory_workflows.pubmed_telescope.tasks.create_snapshot academic_observatory_workflows.pubmed_telescope.tasks.download_datafiles academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_jsonl academic_observatory_workflows.pubmed_telescope.tasks.parse_articles academic_observatory_workflows.pubmed_telescope.tasks.parse_deletes academic_observatory_workflows.pubmed_telescope.tasks.parse_pubmed_element academic_observatory_workflows.pubmed_telescope.tasks.init_logging academic_observatory_workflows.pubmed_telescope.tasks.transform_pubmed academic_observatory_workflows.pubmed_telescope.tasks.save_pubmed_merged_upserts academic_observatory_workflows.pubmed_telescope.tasks.merge_upserts_and_deletes academic_observatory_workflows.pubmed_telescope.tasks.add_attributes academic_observatory_workflows.pubmed_telescope.tasks.change_pubmed_list_structure academic_observatory_workflows.pubmed_telescope.tasks.bq_update_table_expiration Module Contents --------------- .. py:class:: PMID Object to identify a particular PMID record by the PMID value and Version. :param value: The PMID value. :param Version: The Version of the PMID record. .. py:attribute:: value :type: int .. py:attribute:: Version :type: int .. py:method:: __eq__(other) .. py:method:: __hash__() .. py:method:: to_dict() -> Dict .. py:method:: from_dict() -> PMID .. py:method:: to_str() -> str .. py:class:: PubmedUpdatefile Object to hold the list of upserts and deletes for a single Pubmed updatefile. :param name: Filename of the updatefile. :param upserts: List of PMID objects that are to be upserted. :param deletes: List of PMID objects that are to be deleted. .. py:attribute:: name :type: str .. py:attribute:: upserts :type: List[PMID] .. py:attribute:: deletes :type: List[PMID] .. py:method:: __eq__(other) .. py:method:: to_dict() -> Dict .. py:method:: from_dict() -> PubmedUpdatefile .. py:function:: fetch_release(dag_id: str, cloud_workspace: observatory_platform.airflow.workflow.CloudWorkspace, run_id: str, dag_run: airflow.models.DagRun, data_interval_end: pendulum.DateTime, bq_dataset_id: str, api_bq_dataset_id: str, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int) -> str Get a list of all files to process for this release. Determine if workflow needs to redownload the baseline files again because of a new yearly release. :param dag_id: The ID of the dag :param cloud_workspace: The cloud workspace object :param run_id: The run ID of this dagrun :param dag_run: The DagRun object :param data_interval_end: The end of the data interval for this run :param bq_dataset_id: The bigquery datset ID :param api_bq_dataset_id: The dataset ID of the bigquery API. :param ftp_server_url: The host of the pubmed data :param ftp_port: The port to access the ftp server :param reset_ftp_counter: After this many files, reset the ftp connection .. py:function:: short_circuit(release: dict) -> bool Don't skip this DAG run if: a) We are in the first run of the year b) We are not in the first run of the year and there are updatefiles :return: Whether to skip this dag run or not .. py:function:: branch_baseline_or_updatefiles(release: dict) -> None .. py:function:: baseline_download(release: dict, ftp_server_url: str, ftp_port: str, reset_ftp_counter: int, max_download_attempt: int) -> None Download files from PubMed's FTP server for this release. Unable to do this in parallel because their FTP server is not able to handle many requests at once. :param ftp_server_url: The host of the pubmed data :param ftp_port: The port to access the ftp server :param max_download_attempt: Fail after this many unsuccessful download attempts .. py:function:: updatefiles_download(release: dict, ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) -> None Download the updatefiles from PubMed's FTP server for this release. Unable to do this in parallel due to limitations of their FTP server. :param ftp_server_url: The host of the pubmed data :param ftp_port: The port to access the ftp server :param max_download_attempt: Fail after this many unsuccessful download attempts .. py:function:: baseline_upload_downloaded(release: dict) -> None Upload downloaded baseline files to GCS. .. py:function:: updatefiles_upload_downloaded(release: dict) -> None Upload downloaded updatefiles files to GCS. .. py:function:: baseline_transform(release: dict, max_processes: Optional[int] = None) -> None Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import. :param max_proceses: The max number of processes to use for multiprocessing .. py:function:: updatefiles_transform(release: dict, max_processes: Optional[int] = None) -> List[Dict] Transform the *.xml.gz files downloaded from PubMed's FTP server into usable json-like files for BigQuery import. This is a multithreaded and pulls the PubmedArticle records from the downloaded XML files. :param max_processes: The number of processes to use for multithreading .. py:function:: baseline_upload_transformed(release: dict) -> None Upload transformed baseline files to GCS. .. py:function:: updatefiles_merge_upserts_deletes(release: dict, updatefiles: List[Dict], max_processes: Optional[int] = None) -> None Merge the upserts and deletes for this release period. :param max_processes: The number of processes to use for multithreading .. py:function:: updatefiles_upload_merged_upsert_records(release: dict) -> None Upload the merged upsert records to GCS. .. py:function:: baseline_bq_load(release: dict, bq_dataset_description: str, main_table_name: str, baseline_table_description: str) -> None Ingest the baseline table from GCS to BQ using a file pattern. :param bq_dataset_description: The description to give the bigquery dataset :param main_table_name: The name of the table baseline_table_description: The description to give the table .. py:function:: updatefiles_bq_load_upsert_table(release: dict, upsert_table_name: str, upsert_table_description: str) -> None Ingest the upsert records from GCS to BQ using a glob pattern. :param upsert_table_name: The name of the upsert table to upload :param upsert_table_description: The description to give the upsert table .. py:function:: updatefiles_bq_upsert_records(release: dict, main_table_name: str, upsert_table_name: str) -> None Upsert records into the main table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table. :param main_table_name: The name of the main table to upsert to :param upsert_table_name: The name of the table containing the records to upsert .. py:function:: updatefiles_upload_merged_delete_records(release: dict) -> None Upload the merged delete records to GCS. .. py:function:: updatefiles_bq_load_delete_table(release: dict, delete_table_name: str, delete_table_description: str) -> None Ingest delete records from GCS to BQ. :param delete_table_name: The name of the delete table :param delete_table_description: The description to give the delete table .. py:function:: updatefiles_bq_delete_records(release: dict, main_table_name: str, delete_table_name: str) -> None Removed records from the main table that are specified in delete table. Has to match on both the PMID value and the Version number, as there could be multiple different versions in the main table. :param main_table_name: The name of the table to delete records from :param delete_table_name: The name of the table containing the records to delete .. py:function:: add_dataset_releases(release: dict, api_bq_dataset_id: str) -> None Adds release information to the API. :param api_bq_datastet_id: bigquery dataset ID of the API .. py:function:: cleanup_workflow(release: dict) -> None Cleanup files from this workflow run. Delete local download files, transform files and current task instance. .. py:function:: login_to_ftp(host: str, port: int) -> ftplib.FTP .. py:function:: create_snapshot(release: dict, bq_dataset_id: str, bq_main_table_name: str, snapshot_expiry_days: int) -> None Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes. .. py:function:: download_datafiles(datafile_list: List[academic_observatory_workflows.pubmed_telescope.datafile.Datafile], ftp_server_url: str, ftp_port: int, reset_ftp_counter: int, max_download_attempt: int) -> bool Download a list of Pubmed datafiles from their FTP server. :param datafile_list: List of datafiles to download from their FTP server. :param ftp_server_url: FTP server URL. :param ftp_port: Port for the FTP connection. :param reset_ftp_counter: After this number of files, reset the FTP connection to make sure that the connect is not reset by the host. :param max_download_attempt: Maximum number of retries for downloading one datafile before throwing an error. :return download_success: True if downloading all of the datafiles were successful. .. py:function:: save_pubmed_jsonl(output_path: str, data: List[Dict]) Save a Pubmed jsonl to file using the custom encoder. :param output_path: Path of the output file. :param data: The data to write to file. :return: None. .. py:function:: parse_articles(data: Dict) -> Union[List, List[Dict]] .. py:function:: parse_deletes(data: Dict) -> Union[List, List[Dict]] .. py:function:: parse_pubmed_element(elem, validate=False, ignore_errors=True) .. py:function:: init_logging() .. py:function:: transform_pubmed(input_path: str, upsert_path: str) -> Union[bool, str, PubmedUpdatefile] Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed the upserts and or deletes. Used in parallelised transform task. :param input_path: Path to the donwloaded xml.gz file. :param upsert_path: Output file path for the upserts. :return: filename of the baseline file or a PubmedUpdatefile object if it is an updatefile. .. py:function:: save_pubmed_merged_upserts(filename: str, upsert_index: Dict[PMID, str], input_path: str, output_path: str) -> str Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert records using the custom Pubmed encoder. :param filename: Name of the original updatefile used by the upsert_index. :param upserts: A list of upsert keys to pull out and write to file. :param input_path: Where the original upsert records are stored. :param output_path: Destination of where to write the merged upsert records. :return output_path: For logging purposes and ensuring the multithreaded task completed. .. py:function:: merge_upserts_and_deletes(updatefiles: List[PubmedUpdatefile]) -> Tuple[Dict[PMID, str], Set[PMID]] Merge the Pubmed upserts and deletes and return them as a list of PMID value-Version pairs so that they can be easily pulled from file. :param updatefiles: List of PubmedUpdatefile objects to merge together. :return: The merged upserts and deletes. .. py:function:: add_attributes(obj: Union[Bio.Entrez.Parser.StringElement, Bio.Entrez.Parser.DictionaryElement, Bio.Entrez.Parser.ListElement, Bio.Entrez.Parser.OrderedListElement, list, str]) Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs. Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements. :param obj: Input object, being one of the Biopython data classes. :return new: Object with attributes added as keys to the dictionary. .. py:data:: bad_list_fields .. py:function:: change_pubmed_list_structure(obj: Union[dict, list, str, Bio.Entrez.Parser.DictionaryElement, Bio.Entrez.Parser.ListElement, Bio.Entrez.Parser.StringElement]) -> Union[dict, list, str, Bio.Entrez.Parser.DictionaryElement, Bio.Entrez.Parser.ListElement, Bio.Entrez.Parser.StringElement] Recursively travel down the Pubmed data tree to move the specified fields up one level to make it easier to query in Bigquery. For example, the original data can look something like { "AuthorList": { "CompleteYN": "Y", "Author": [{ "First": "Foo", "Last": "Bar" }, { "First": "James", "Last": "Bond" }] } } The "Author" field will be removed and the data from it will be moved up to the "List" level, along with any data specified in the "bad_list_fields" dictionary: { "AuthorListCompleteYN": "Y", "AuthorListType": None, "AuthorList": [{ "First": "Foo", "Last": "Bar" }, { "First": "James", "Last": "Bond" }] } :param obj: Incoming data object. :return: Any type as it could be a Pubmed dataclass. .. py:class:: PubMedCustomEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None) Bases: :py:obj:`json.JSONEncoder` Custom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data. For example, the AbstractText field can be a string or an array containing background, methods, etc, but Bigquery requires it to be well defined and it can't be both an array and a string in the schema. This encoder forces the below fields to be a string when written to a json file. .. py:attribute:: write_as_single_field :value: ['AbstractText', 'Affiliation', 'ArticleTitle', 'b', 'BookTitle', 'Citation', 'CoiStatement',... .. py:method:: _transform_obj_data(obj) .. py:method:: encode(obj) Return a JSON string representation of a Python data structure. >>> from json.encoder import JSONEncoder >>> JSONEncoder().encode({"foo": ["bar", "baz"]}) '{"foo": ["bar", "baz"]}' .. py:function:: bq_update_table_expiration(full_table_id: str, expiration_date: pendulum.datetime) -> None Update a Bigquery table expiration date. :param full_table_id: Full qualified table id. :param expiration_date: Expiration date of the table.