academic_observatory_workflows.data_quality_workflow.data_quality_workflow

Module Contents

Classes

Table

Functions

create_dag(→ airflow.DAG)

Create the DataQualityCheck Workflow.

create_data_quality_record(→ Dict[str, Union[str, ...)

Perform novel data quality checks on a given table in Bigquery.

create_table_hash_id(→ str)

Create a unique table identifier based off of the the input parameters for a table in Biguqery.

is_in_dqc_table(→ bool)

Checks if a table has already been processed before checking if the table's hash_id is in the main DQC table.

bq_count_distinct_records(→ int)

Finds the distinct number of records that have these matching fields.

bq_count_nulls(→ int)

Return the number of nulls for a singular field or number of fields.

bq_count_duplicate_records(→ int)

Query a table in Bigquery and return a dictionary of values holding the field/s

class academic_observatory_workflows.data_quality_workflow.data_quality_workflow.Table[source]
property full_table_id[source]
project_id: str[source]
dataset_id: str[source]
table_id: str[source]
primary_key: List[str][source]
is_sharded: bool[source]
shard_limit: int | bool | None[source]

Create a metadata class for tables to be processed by this Workflow.

Parameters:
  • project_id – The Google project_id of where the tables are located.

  • dataset_id – The dataset that the table is under.

  • table_id – The name of the table (not the full qualifed table name).

  • primary_key – Location of where the primary key is located in the table e.g. [“doi”], could be multiple different identifiers like for Pubmed: [“MedlineCiation.PMID.value”, “MedlineCiation.PMID.Version”]

  • is_sharded – True if the table is shared or not.

  • shard_limit – The number of shards to process for this series of table.

__post_init__()[source]
academic_observatory_workflows.data_quality_workflow.data_quality_workflow.create_dag(*, dag_id: str, cloud_workspace: observatory.platform.observatory_config.CloudWorkspace, datasets: Dict, sensor_dag_ids: List[str] | None = None, bq_dataset_id: str = 'data_quality_checks', bq_dataset_description: str = "This dataset holds metadata about the tables that the Academic Observatory Worflows produce. If there are multiple shards tables, it will go back on the table and check if it hasn't done that table previously.", bq_table_id: str = 'data_quality', schema_path: str = project_path('data_quality_workflow', 'schema', 'data_quality.json'), start_date: pendulum.DateTime | None = pendulum.datetime(2020, 1, 1), schedule: str = '@weekly', max_active_runs: int = 1, retries: int = 3) airflow.DAG[source]

Create the DataQualityCheck Workflow.

This workflow creates metadata for all the tables defined in the “datasets” dictionary. If a table has already been checked before, it will no do it again. This based on if the number of columns, rows or bytesof data stored in the table changes. We cannot use the “date modified” from the Biguqery table object because it changes if any other metadata is modified, i.e. description, etc.

Parameters:
  • dag_id – the DAG ID.

  • cloud_workspace – the cloud workspace settings.

  • datasets – A dictionary of datasets holding tables that will processed by this workflow.

  • sensor_dag_ids – List of dags that this workflow will wait to finish before running.

  • bq_dataset_id – The dataset_id of where the data quality records will be stored.

  • bq_dataset_description – Description of the data quality check dataset.

  • bq_table_id – The name of the table in Bigquery.

  • schema_path – The path to the schema file for the records produced by this workflow.

  • start_date – The start date of the workflow.

  • schedule – Schedule of how often the workflow runs.

  • max_active_runs – the maximum number of DAG runs that can be run at once.

  • retries – the number of times to retry a task.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.create_data_quality_record(hash_id: str, full_table_id: str, primary_key: List[str], is_sharded: bool, table_in_bq: google.cloud.bigquery.Table) Dict[str, str | List[str] | float | bool | int][source]

Perform novel data quality checks on a given table in Bigquery.

Parameters:
  • hash_id – Unique md5 style identifier of the table.

  • full_table_id – The fully qualified table id, including the shard date suffix.

  • primary_key – The key identifier columns for the table.

  • is_sharded – If the table is supposed to be sharded or not.

  • table_in_bq – Table metadata object retrieved from the Bigquery API.

Returns:

Dictionary of values from the data quality check.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.create_table_hash_id(full_table_id: str, num_bytes: int, nrows: int, ncols: int) str[source]

Create a unique table identifier based off of the the input parameters for a table in Biguqery.

Parameters:
  • full_table_id – The fully qualified table name.

  • num_bytes – Number of bytes stored in the table.

  • nrows – Number of rows/records in the table.

  • ncols – Number of columns/fields in the table.

Returns:

A md5 hash based off of the given input parameters.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.is_in_dqc_table(hash_to_check: str, dqc_full_table_id: str) bool[source]

Checks if a table has already been processed before checking if the table’s hash_id is in the main DQC table.

Parameters:
  • hash_to_check – The hash of the table to check if the data quality checks have been performed before.

  • dqc_full_table_id – The fully qualified name of the table that holds all of the DQC records in Bigquery.

Returns:

True if the check has been done before, otherwise false.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.bq_count_distinct_records(full_table_id: str, fields: str | List[str]) int[source]

Finds the distinct number of records that have these matching fields.

Parameters:
  • table_id – The fully qualified table id.

  • fields – Singular or list of fields to determine the distinct records.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.bq_count_nulls(full_table_id: str, fields: str | List[str]) int[source]
Return the number of nulls for a singular field or number of fields.

This is separated by an OR condition, thus will be counts if any of the fields listed are nulls/empty.

Parameters:
  • full_table_id – The fully qualified table id.

  • fields – A single string or list of strings to have the number of nulls checked.

Returns:

The integer number of nulls present in the given fields.

academic_observatory_workflows.data_quality_workflow.data_quality_workflow.bq_count_duplicate_records(full_table_id: str, fields: str | List[str]) int[source]

Query a table in Bigquery and return a dictionary of values holding the field/s and the number of duplicates for said key/s.

Parameters:
  • fields – String or list of strings of the keys to query the table.

  • full_table_id – Fully qualified table name.