CloudDQ is a Command-Line Interface (CLI) application. It takes as input YAML Data Quality configurations, generates and executes SQL code in BigQuery using provided connection configurations, and writes the resulting Data Quality summary statistics to a BigQuery table of your choice.
CloudDQ is currently only tested to run on Ubuntu
/Debian
linux distributions. It may not work properly on other OS such as MacOS
, Windows
/cygwin
, or CentOS
/Fedora
/FreeBSD
, etc...
For development or trying out CloudDQ, we recommend using either Cloud Shell or a Google Cloud Compute Engine VM with the Debian 11 OS distribution.
CloudDQ requires the command python3
to point to a Python Interterpreter version 3.8.x or 3.9.x. To install the correct Python version, please refer to the script scripts/poetry_install.sh
for an interactive installation or scripts/install_python3.sh
for a non-interactive installation intended for automated build/test processes.
For example, on Cloud Shell, you could install Python 3.9 by running:
#!/bin/bash
export CLOUDDQ_RELEASE_VERSION="1.0.0"
git clone -b "v${CLOUDDQ_RELEASE_VERSION}" https://github.com/GoogleCloudPlatform/cloud-data-quality.git
source cloud-data-quality/scripts/install_python3.sh "3.9.7"
python3 --version
The simplest way to run CloudDQ is to use one of the pre-built executable provided in the Github releases page: https://github.com/GoogleCloudPlatform/cloud-data-quality/releases
We currently provide pre-built executables for Debian 11+Python3.9 built for execution on Dataplex Task/Dataproc Serverless Batches (this executable will also work on Debian 10 in Cloud Shell) and Ubuntu18+Python3.8 built for execution as Dataproc Jobs/Workflows with a compatible Ubuntu18 OS image.
For example, from Cloud Shell, you can download the executable with the following commands:
#!/bin/bash
export CLOUDDQ_RELEASE_VERSION="1.0.0"
export TARGET_OS="debian_11" # can be either "debian_11" or "ubuntu_18"
export TARGET_PYTHON_INTERPRETER="3.9" # can be either "3.8" or "3.9"
cd cloud-data-quality
wget -O clouddq_executable.zip https://github.com/GoogleCloudPlatform/cloud-data-quality/releases/download/v"${CLOUDDQ_RELEASE_VERSION}"/clouddq_executable_v"${CLOUDDQ_RELEASE_VERSION}"_"${TARGET_OS}"_python"${TARGET_PYTHON_INTERPRETER}".zip
If you do not have Python 3.9 installed, you could install Python 3.9 on an Ubuntu
or Debian
machine by running:
#!/bin/bash
export CLOUDDQ_RELEASE_VERSION="1.0.0"
git clone -b "v${CLOUDDQ_RELEASE_VERSION}" https://github.com/GoogleCloudPlatform/cloud-data-quality.git
source cloud-data-quality/scripts/install_python3.sh "3.9.7"
python3 --version
You can then use the CLI by passing the zip into a Python interpreter:
#!/bin/bash
python3 clouddq_executable.zip --help
This should show you the help text.
In the below examples, we will use the example YAML configs
provided in this project.
The example commands will authenticate to GCP with your user's credentials via Application Default Credentials (ADC). Ensure you have a GCP Project and your user's GCP credentials have at minimum project-level IAM permission to run BigQuery jobs (roles/bigquery.jobUser
) and to create new BigQuery datasets (roles/bigquery.dataEditor
) in that project.
From either Cloud Shell or a Ubuntu
/Debian
machine, clone the CloudDQ project to get the sample config directory:
#!/bin/bash
export CLOUDDQ_RELEASE_VERSION="1.0.0"
git clone -b "v${CLOUDDQ_RELEASE_VERSION}" https://github.com/GoogleCloudPlatform/cloud-data-quality.git
Then change directory to the git project and get the pre-built executable from Github:
#!/bin/bash
export CLOUDDQ_RELEASE_VERSION="1.0.0"
export TARGET_OS="debian_11" # can be either "debian_11" or "ubuntu_18"
export TARGET_PYTHON_INTERPRETER="3.9" # can be either "3.8" or "3.9"
cd cloud-data-quality
wget -O clouddq_executable.zip https://github.com/GoogleCloudPlatform/cloud-data-quality/releases/download/v"${CLOUDDQ_RELEASE_VERSION}"/clouddq_executable_v"${CLOUDDQ_RELEASE_VERSION}"_"${TARGET_OS}"_python"${TARGET_PYTHON_INTERPRETER}".zip
If you do not have Python 3.9 installed, you could install Python 3.9 on an Ubuntu
or Debian
machine by running:
#!/bin/bash
source cloud-data-quality/scripts/install_python3.sh "3.9.7"
python3 --version
Run the following command to authenticate to GCP using application-default credentials. The command will prompt you to login and provide a verification code back into the console.
#!/bin/bash
gcloud auth application-default login
Then locate your GCP project ID and set it to a local variable for usage later. We will also configure gcloud
to use this GCP Project ID.
#!/bin/bash
export GOOGLE_CLOUD_PROJECT="<your_GCP_project_id>"
gcloud config set project ${GOOGLE_CLOUD_PROJECT}
Now we'll create a new dataset with a custom name in a location of our choice and load some sample data into it:
#!/bin/bash
# Create a BigQuery Dataset in a region of your choice and load data
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export CLOUDDQ_BIGQUERY_REGION=EU
export CLOUDDQ_BIGQUERY_DATASET=clouddq
# Create BigQuery Dataset. Skip this step if `CLOUDDQ_BIGQUERY_DATASET` already exists
bq --location=${CLOUDDQ_BIGQUERY_REGION} mk --dataset ${GOOGLE_CLOUD_PROJECT}:${CLOUDDQ_BIGQUERY_DATASET}
# Fetch the example csv file
curl -LO https://raw.githubusercontent.com/GoogleCloudPlatform/cloud-data-quality/main/tests/data/contact_details.csv
# Load sample data to the dataset
bq load --source_format=CSV --autodetect ${CLOUDDQ_BIGQUERY_DATASET}.contact_details contact_details.csv
Before running CloudDQ, we need to edit the table configurations in configs/entities/test-data.yml
to use our Project ID and BigQuery dataset ID.
#!/bin/bash
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export CLOUDDQ_BIGQUERY_REGION=EU
export CLOUDDQ_BIGQUERY_DATASET=clouddq
sed -i s/\<your_gcp_project_id\>/${GOOGLE_CLOUD_PROJECT}/g configs/entities/test-data.yml
sed -i s/\<your_bigquery_dataset_id\>/${CLOUDDQ_BIGQUERY_DATASET}/g configs/entities/test-data.yml
Using the same Project ID, GCP Region, and BigQuery dataset ID as defined before, we will attempt to execute two Rule Binding
s with the unique identifers T2_DQ_1_EMAIL
and T3_DQ_1_EMAIL_DUPLICATE
from the configs
directory containing the complete YAML configurations:
#!/bin/bash
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export CLOUDDQ_BIGQUERY_REGION=EU
export CLOUDDQ_BIGQUERY_DATASET=clouddq
export CLOUDDQ_TARGET_BIGQUERY_TABLE="<project-id>.<dataset-id>.<table-id>"
python3 clouddq_executable.zip \
T2_DQ_1_EMAIL,T3_DQ_1_EMAIL_DUPLICATE \
configs \
--gcp_project_id="${GOOGLE_CLOUD_PROJECT}" \
--gcp_bq_dataset_id="${CLOUDDQ_BIGQUERY_DATASET}" \
--gcp_region_id="${CLOUDDQ_BIGQUERY_REGION}" \
--target_bigquery_summary_table="${CLOUDDQ_TARGET_BIGQUERY_TABLE}"
By running this CLI command, CloudDQ
will:
- validate that the YAML configs are valid, i.e. every
Rule
,Entity
, andRow Filter
referenced in eachRule Binding
exists and there are no duplicated configuration ID. CloudDQ will attempt to parse all files with extensionsyml
andyaml
discovered in a path and use the top-level keys (e.g.rules
,rule_bindings
,row_filters
,entities
) to determine the config type. Each configuration item must have a globally unique identifier in theconfig
path. - validate that the dataset
--gcp_bq_dataset_id
and the table--target_bigquery_summary_table
(if exists) is located in the region--gcp_region_id
- resolves the
entity_uris
defined in allRule Binding
to retrieve the corresponding table and column names for validation - generate BigQuery SQL code for each
Rule Binding
and validate that it is valid BigQuery SQL using BigQuery dry-run feature - create a BigQuery view for each generated BigQuery SQL
Rule Binding
in the BigQuery dataset specified in--gcp_bq_dataset_id
- create a BigQuery job to execute all
Rule Binding
SQL views. The BigQuery jobs will be created in the GCP project specified in--gcp_project_id
and GCP region specified in--gcp_region_id
- aggregate the validation outcomes and write the Data Quality summary results into a table called
dq_summary
in the BigQuery dataset specified in--gcp_bq_dataset_id
- if the CLI argument
--target_bigquery_summary_table
is defined, CloudDQ will append all rows created in thedq_summary
table in the last run to the BigQuery table specified in--target_bigquery_summary_table
.--target_bigquery_summary_table
currently an optional argument to maintain backwards compatibility but will become a required argument in version 1.0.0. - if the CLI flag
--summary_to_stdout
is set, CloudDQ will retrieve all validation results rows created in thedq_summary
table in the last run and log them as a JSON record to Cloud Logging and stdout
To execute all Rule Binding
s discovered in the config directory, use ALL
as the RULE_BINDING_IDS
:
#!/bin/bash
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export CLOUDDQ_BIGQUERY_REGION=EU
export CLOUDDQ_BIGQUERY_DATASET=clouddq
export CLOUDDQ_TARGET_BIGQUERY_TABLE="<project-id>.<dataset-id>.<table-id>"
python3 clouddq_executable.zip \
ALL \
configs \
--gcp_project_id="${GOOGLE_CLOUD_PROJECT}" \
--gcp_bq_dataset_id="${CLOUDDQ_BIGQUERY_DATASET}" \
--gcp_region_id="${CLOUDDQ_BIGQUERY_REGION}" \
--target_bigquery_summary_table="${CLOUDDQ_TARGET_BIGQUERY_TABLE}"
To see the resulting Data Quality validation summary statistics in the BigQuery table $CLOUDDQ_TARGET_BIGQUERY_TABLE
, run:
#!/bin/bash
export GOOGLE_CLOUD_PROJECT=$(gcloud config get-value project)
export CLOUDDQ_BIGQUERY_REGION=EU
export CLOUDDQ_TARGET_BIGQUERY_TABLE="<project-id>.<dataset-id>.<table-id>"
echo "select * from \`${CLOUDDQ_TARGET_BIGQUERY_TABLE}\`" | bq query --location=${CLOUDDQ_BIGQUERY_REGION} --nouse_legacy_sql
To see the BigQuery SQL logic generated by CloudDQ for each Rule Binding
stored as a view in --gcp_bq_dataset_id
, run:
#!/bin/bash
export CLOUDDQ_BIGQUERY_DATASET=clouddq
bq show --view ${CLOUDDQ_BIGQUERY_DATASET}.T3_DQ_1_EMAIL_DUPLICATE
In this section, we will provide some example implementations of data quality checks that are common requirements.
The fixed set of allowed values is hardcoded into the rule.
rules:
ACCEPTED_CURRENCY:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
$column in (‘USD’, ‘EUR’, ‘CNY’)
The subquery is coded into the custom_sql_expr
field. This example assumes a reference table with a column with allowed values, but the query could be any that returns a list of values.
rules:
REF_SUBQUERY:
rule_type: CUSTOM_SQL_EXPR
custom_sql_arguments:
start_date
params:
custom_sql_expr: |-
$column in (select cast(unique_key as string) from <dataset_id>.`<ref_table_id>` where timestamp >= "$start_date")
rule_bindings:
REF_SUBQUERY:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- REF_SUBQUERY:
start_date: '2016-01-01'
Full example: rule and rule binding
Check that all values in the referenced column exist in a column in another table (which can be interpreted as the primary key).
This is a special case of the previous example.
rules:
FOREIGN_KEY_VALID:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
$column in (select distinct unique_key from <dataset_id>.`<pk_table_id>`)
Full example: rule and rule binding
In this test we check that two columns have an equal set of values. In this example we do not consider the ordering, meaning that if one column has [a, a, b, b, c]
and another has [a, b, c, a, b]
, then we will consider them equal.
rules:
EQUAL_COLUMNS:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
- ref_table_id
- ref_column_id
params:
custom_sql_statement: |-
with t1 as (
select $column as col, count(*) as t1_n
from data
group by $column),
t2 as (
select $ref_column_id as col, count(*) as t2_n
from <dataset_id>.`$ref_table_id`
group by $ref_column_id
)
select t1.*, t2.t2_n
from t1
full outer join t2 using(col)
where not t1_n = t2_n
rule_bindings:
EQUAL_COLUMNS:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- EQUAL_COLUMNS:
ref_table_id: <reference_table_id>
ref_column_id: <reference_column_id>
Full example: Rule and rule binding
For this test we compare two columns that we know have distinct values (for example because we have checked this separately).
rules:
EQUAL_COLUMNS_UNIQUE_VALUES:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
- ref_table_id
- ref_column_id
params:
custom_sql_statement: |-
with t1 as (
select
$column as t1_col,
row_number() over (order by $column) as n
from data),
t2 as (
select
$ref_column_id as t2_col,
row_number() over (order by $ref_column_id) as n
from <dataset_id>.`$ref_table_id`
)
select t1.t1_col, t2.t2_col
from t1
full outer join t2 using(n)
where not t1_col = t2_col
rule_bindings:
EQUAL_COLUMNS:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- EQUAL_COLUMNS_UNIQUE_VALUES:
ref_table_id: <reference_table_id>
ref_column_id: <reference_column_id>
Full example: Rule and rule binding
In this example we check that the aggregate in one table equals a value in another table. For instance, the sum over line items in an order should be equal to the total invoice amount in another table.
This example uses a rule of type CUSTOM_SQL_STATEMENT with 5 parameters. We assume there are two tables, one where we will aggregated based on some key, and sum a value column, and another table that has the totals for each key. The parameters are
- the column ID for the key and
- the value that we are aggregated,
- the column ID for the key and
- the value in table that contains the totals, and
- the table ID of the table containing the totals.
As we specified everything that we need through the above “custom SQL arguments”, the column_id field in the rule binding has no effect.
rules:
EQUAL_AGGREGATE:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
key_column_id
value_column_id
ref_table_id
ref_key_column_id
ref_value_column_id
params:
custom_sql_statement: |-
with t1 as (
select $key_column_id as key, sum($value_column_id) as total
from data
group by $key_column_id),
t2 as (
select $ref_key_column_id as key, sum($ref_value_column_id) as ref_total
from <dataset_id>.`$ref_table_id`
group by $ref_key_column_id
)
select t1.*, t2.*
from t1
full outer join t2 using(key)
where not total = ref_total
rule_bindings:
EQUAL_AGGREGATE:
entity_id: TAXI_TRIPS
column_id: TAXI_ID
row_filter_id: NONE
rule_ids:
- EQUAL_AGGREGATE:
key_column_id: <key_column_id>
value_column_id: <value_column_id>
ref_table_id: <reference_table_id>
ref_key_column_id: <reference_key_column_id>
ref_value_column_id: <reference_value_column_id>
Full example: Rule and rule binding
Sometimes there is business logic that can be validated at the row-level. For example, a transaction with payment “completed” should have an amount greater or equal than zero.
rules:
TRIP_AMOUNT_NO_CHARGE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
payment_type = 'No Charge' and ($column is not null or $column > 0)
rule_bindings:
TRIP_AMOUNT_NO_CHARGE:
entity_id: TAXI_TRIPS
column_id: TRIP_TOTAL
row_filter_id: NONE
rule_ids:
- TRIP_AMOUNT_NO_CHARGE
Full example: Rule and rule binding
Set-based validation tests the properties of sets, such as the total row count, the average, etc.
Similar to the total row count, but for unique values.
Note that this example will not count “NULL” as one of the distinct values.
rules:
DISTINCT_VALUES:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
n_max
params:
custom_sql_statement: |-
select count(distinct $column) as n
from data
having n > $n_max
rule_bindings:
DISTINCT_VALUES:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- DISTINCT_VALUES:
n_max: 1000
Full example: Rule and rule binding
Ensures that all values in a column are unique.
rules:
UNIQUE:
rule_type: CUSTOM_SQL_STATEMENT
params:
custom_sql_statement: |-
select $column from data
group by $column
having count(*) > 1
rule_bindings:
UNIQUENESS:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- UNIQUE
Full example: Rule and rule binding
Ensures that the combination of values from several columns are all unique across the table.
Note that the row count that is reported for this rule will be the number of elements that are duplicated. If one element is duplicated three times, the row count returned will be 1.
rules:
UNIQUE_MULTIPLE:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
- columns
params:
custom_sql_statement: |-
select $columns from data
group by $columns
having count(*) > 1
rule_bindings:
UNIQUENESS_MULTIPLE:
entity_id: TEST_DATA
column_id: UNIQUE_KEY
row_filter_id: NONE
rule_ids:
- UNIQUE_MULTIPLE:
columns: unique_key, date
Full example: Rule and rule binding
Do not allow the gap between two subsequent values to be greater than a threshold.
In this example, “two subsequent values” are two rows that are subsequent when sorting by the column. This works well for example when the column is a message timestamp, and we don’t want the time between messages to be too big. The example also assumes that the column being inspected is of type TIMESTAMP
, wich requires the function DATETIME_DIFF
to calculate the interval.
But if we have a value, for example, current, at different points in time, and we don’t want the current to jump more than a threshold value between two measurements, we need to change the order by clause in the custom_sql_statement, to order by the time column.
rules:
GAP:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
max_gap
params:
custom_sql_statement: |-
with lagged_data as (
select $column, lag($column) over (order by $column ASC) as prev
from data
)
select $column from lagged_data
where prev is not null and datetime_diff($column, prev, HOUR) > $max_gap
rule_bindings:
entity_id: TEST_DATA
column_id: TIMESTAMP
row_filter_id: NONE
rule_ids:
- GAP:
max_gap: 24
Full example: Rule and rule binding
The z-score is the deviation from the mean in units of the standard deviation. If the values in a column have a mean of 10 and a standard deviation of 2, then a value of 16 in this column has a z-score of (16-10)/2 = 3 standard deviations.
The z-score is frequently used to identify outliers.
rules:
Z_SCORE_OUTLIER:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
z_limit
params:
custom_sql_statement: |-
with stats as (
select avg($column) as mu, stddev($column) as sigma
from data
),
z_scores as (
select $column, mu, sigma, abs(mu - $column)/sigma as z
from data
join stats on true
)
select *
from z_scores
where z > $z_limit
rule_bindings:
OUTLIER_DETECTION:
entity_id: TAXI_TRIPS
column_id: TIPS
row_filter_id: NONE
rule_ids:
- Z_SCORE_OUTLIER:
z_limit: 3
Full example: Rule and rule binding
CloudDQ supports the follow methods for authenticating to GCP:
- OAuth via locally discovered Application Default Credentials (ADC) if only the arguments
--gcp_project_id
,--gcp_bq_dataset_id
, and--gcp_region_id
are provided. - Using an exported JSON service account key if the argument
--gcp_service_account_key_path
is provided. - Service Account impersonation via a source credentials if the argument
--gcp_impersonation_credentials
is provided. The source credentials can be obtained from either--gcp_service_account_key_path
or from the local ADC credentials.
--intermediate_table_expiration_hours
CLI argument specifies the bigquery job intermediate tables expiration hours in the provided dataset--gcp_bq_dataset_id
.- The default "expiration hours" for BigQuery job intermediate tables in the provided dataset
--gcp_bq_dataset_id
is set to 24 hours. If you'd like to change it, use--intermediate_table_expiration_hours
CLI argument.
--num_threads
CLI argument specifies the number of concurrent bigquery operations that can be increased to reduce run-time.--num_threads
is currently an optional argument and has a default value of8 threads
. We advice setting this to number of cores of your run-environment machines. One worker thread per core seemed a good number for how many threads to run at once. This number is chosen much more carefully based on other factors, such as other applications and services running on the same machine.