Skip to main content

Quality Checks Definition

Checks are the core of DQX, allowing you to define and apply data quality rules to your datasets.

There are several ways to define quality checks in DQX:

  • YAML or JSON file (declarative approach), suitable when applying checks programmatically and using DQX workflows.
  • Delta table (most scalable), suitable when applying checks programmatically and using DQX workflows.
  • Programmatically as a list of dictionaries or DQX objects, suitable when applying checks programmatically.

Checks can be saved and loaded from various storage systems as described here. The loaded checks can be applied to the data using methods as described here.

DQX provides a collection of predefined built-in quality rules (checks). The following is a quick guide on defining quality checks using YAML, JSON, Delta tables, or programmatically in code. For a complete list of check functions and detailed examples, see the full reference here.

The quality checks can be defined for simple and complex column types such as structs, maps and arrays. Additionally, you can define custom checks using sql or python to meet specific requirements.

Code format (programmatic approach)

You can define quality checks programmatically using a list of DQX classes (list of DQRule instances) or list of dictionaries (declaratively).

Checks defined with DQX classes

Checks can be defined using DQX classes such as DQRowRule, DQDatasetRule, and DQForEachColRule. This approach provides static type checking and autocompletion in IDEs, making it potentially easier to work with.

from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule

checks = [
DQRowRule( # check for a single column
name="col3_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col3",
),

*DQForEachColRule( # check for multiple columns applied individually
columns=["col1", "col2"],
criticality="error",
check_func=check_funcs.is_not_null).get_rules(),

DQRowRule( # check with a filter
name="col_4_is_null_or_empty",
criticality="warn",
filter="col1 < 3",
check_func=check_funcs.is_not_null_and_not_empty,
column="col4",
),

DQRowRule( # check with user metadata
name="col_5_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col5",
user_metadata={
"check_type": "completeness",
"responsible_data_steward": "someone@email.com"
},
),

DQRowRule( # provide check func arguments using positional arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col1",
check_func_args=[[1, 2]],
),

DQRowRule( # provide check func arguments using keyword arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2]},
),

DQRowRule( # check for a struct field
# use "error" criticality if not provided
check_func=check_funcs.is_not_null,
column="col7.field1",
),

DQRowRule( # check for a map element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col5", F.lit("key1")),
),

DQRowRule( # check for an array element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col6", F.lit(1)),
),

DQDatasetRule( # check uniqueness of composite key
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"]
),

DQDatasetRule( # dataset check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="col1",
check_func_kwargs={"aggr_type": "count", "group_by": ["col2"], "limit": 10},
),

DQDatasetRule( # dataset check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 1.2},
),
DQDatasetRule( # dataset check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_equal,
column="col1",
check_func_kwargs={"aggr_type": "count", "group_by": ["col2"], "limit": 5},
),
DQDatasetRule( # dataset check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_equal,
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 10.5},
),
]
Note

The validation of arguments and keyword arguments for the check function is automatically performed upon creating a DQRowRule.

Checks defined using metadata (list of dictionaries)

Checks can be defined using declarative syntax as a list of dictionaries. In the below example we create the list of dictionaries using YAML definition for readability, but you can also define a list of dictionaries directly in python.

  # load list of dict from YAML definition
checks: list[dict] = yaml.safe_load("""
# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3

# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2

# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4

# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com

# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2

# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided

# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')

# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)

# check uniqueness of composite key
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2

# dataset checks working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10

- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2
- criticality: error
check:
function: is_aggr_equal
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 5
- criticality: error
check:
function: is_aggr_not_equal
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 10.5
""")

Checks defined using declarative syntax must contain the following fields:

  • criticality: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".
  • check column expression containing:
    • function: check function name to apply.
    • arguments: check function arguments if any.
    • for_each_column: (optional) list of column names or expressions to which the check will be applied individually.
  • (optional) name: name of the check: autogenerated if not provided.
  • (optional) filter: spark expression to filter the rows for which the check is applied (e.g. "business_unit = 'Finance'"). The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function.
  • (optional) user_metadata: key-value pairs added to the row-level warnings and errors

YAML format (declarative approach)

Checks can be defined in a YAML file using declarative syntax.

Checks can be saved and loaded from a YAML file using methods described here. Checks defined in YAML files are supported by the DQX workflows.

Example yaml file defining several checks:

# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3

# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2

# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4

# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com

# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2

# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided

# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')

# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)

# check uniqueness of composite key
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2

# dataset check working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10

- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2
- criticality: error
check:
function: is_aggr_equal
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 5
- criticality: error
check:
function: is_aggr_not_equal
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 10.5

JSON format (declarative approach)

Checks can be defined in a JSON file using declarative syntax. The structure of the JSON file defining quality checks is the same as the YAML format, but uses JSON syntax.

Checks can be saved and loaded from a JSON file using methods described here. Checks defined in JSON files are supported by the DQX workflows.

[
{
"criticality": "warn",
"check": {
"function": "is_not_null_and_not_empty",
"arguments": {
"column": "col3"
}
}
},
{
"criticality": "error",
"check": {
"function": "is_not_null",
"for_each_column": ["col1", "col2"]
}
},
{
"criticality": "warn",
"filter": "col1 < 3",
"check": {
"function": "is_not_null_and_not_empty",
"arguments": {
"column": "col4"
}
}
},
{
"criticality": "warn",
"check": {
"function": "is_not_null_and_not_empty",
"arguments": {
"column": "col5"
}
},
"user_metadata": {
"check_category": "completeness",
"responsible_data_steward": "someone@email.com"
}
},
{
"criticality": "warn",
"check": {
"function": "is_in_list",
"arguments": {
"column": "col1",
"allowed": [1, 2]
}
}
},
{
"check": {
"function": "is_not_null",
"arguments": {
"column": "col7.field1"
}
}
},
{
"criticality": "error",
"check": {
"function": "is_not_null",
"arguments": {
"column": "try_element_at(col5, 'key1')"
}
}
},
{
"criticality": "error",
"check": {
"function": "is_not_null",
"arguments": {
"column": "try_element_at(col6, 1)"
}
}
},
{
"criticality": "error",
"check": {
"function": "is_unique",
"arguments": {
"columns": ["col1", "col2"]
}
}
},
{
"criticality": "error",
"check": {
"function": "is_aggr_not_greater_than",
"arguments": {
"column": "col1",
"aggr_type": "count",
"group_by": ["col2"],
"limit": 10
}
}
},
{
"criticality": "error",
"check": {
"function": "is_aggr_not_less_than",
"arguments": {
"column": "col1",
"aggr_type": "avg",
"group_by": ["col2"],
"limit": 1.2
}
}
},
{
"criticality": "error",
"check": {
"function": "is_aggr_equal",
"arguments": {
"column": "col1",
"aggr_type": "count",
"group_by": ["col2"],
"limit": 5
}
}
},
{
"criticality": "error",
"check": {
"function": "is_aggr_not_equal",
"arguments": {
"column": "col1",
"aggr_type": "avg",
"group_by": ["col2"],
"limit": 10.5
}
}
}
]

Delta table format (declarative approach)

Checks can be defined in a delta table where each row represents a check with columns: name, check, criticality, filter, and run_config_name fields.

The most convenient way to programmatically load and save checks in the Delta table is to use the load_checks and save_checks methods from the DQEngine. See more details here. Checks defined in delta tables are supported by the DQX workflows.

The Delta table used to store checks has the following structure:

ColumnTypeDescription
namestringName to use for the check. Optional.
criticalitystringEither "error" (rows go only to "bad" dataset) or "warn" (rows go to both "good" and "bad"). Defaults to "error".
checkstructDefines the DQX check to apply.
└─ functionstringName of the DQX check function to apply.
└─ argumentsmap<string, string>Key-value pairs passed as keyword arguments to the function, stored as JSON string so quotes for string arguments must be escaped.
└─ for_each_columnarray<string>(Optional) List of column names or expressions to which the check will be applied individually.
filterstring(Optional) Spark SQL expression to filter rows to which the check is applied, e.g. "business_unit = 'Finance'". The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function.
run_config_namestringName of the run config name. Could be any string such as workflow name. Useful for selecting applicable checks. Defaults to "default".
user_metadatamap<string, string>(Optional) Custom metadata to add to any row-level warnings or errors generated by the check.

Example checks saved in the Delta table:

+------------------------+-------------+------------------------------------------------------------------------------------------------------------------------+----------+-----------------+-------------------------------------------+
| name | criticality | check | filter | run_config_name | user_metadata |
+------------------------+-------------+------------------------------------------------------------------------------------------------------------------------+----------+-----------------+-------------------------------------------+
| null | error | {function: "is_not_null", for_each_column: ["col1", "col2"], arguments: {}} | col1 > 0 | "default" | {"check_owner": "someone@email.com"} |
| column_not_less_than | warn | {function: "is_not_less_than", for_each_column: null, arguments: {column: "col_2", limit: "1"}} | null | "default" | null |
| column_in_list | warn | {function: "is_in_list", for_each_column: null, arguments: {column: "col_2", allowed: "[1, 2]"}} | null | "default" | null |
+------------------------+-------------+------------------------------------------------------------------------------------------------------------------------+----------+-----------------+-------------------------------------------+

Validating syntax of quality checks

You can validate the syntax of checks loaded from a storage system or checks defined programmatically before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine.

The validation cannot be used for checks defined programmatically using DQX classes. When checks are defined programmatically with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly.

import yaml
from databricks.labs.dqx.engine import DQEngine

checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
""")

status = DQEngine.validate_checks(checks)
print(status)
Usage tips

Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.