Skip to main content

Quality Check Definition and Application

DQX offers a set of predefined built-in quality rules (checks). See details and list of all check functions here.

The quality checking can be done on simple column types and complex types like structs, maps and arrays. Additionally, you can define custom checks to meet specific requirements.

There are several ways to define and apply quality checks in DQX:

Quality Rules defined in a File

Quality rules can be defined declaratively as part of yaml or json file and stored in the installation folder, workspace, or local file system.

Below is an example yaml file ('checks.yml') defining several checks:

# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4
# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com
# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2
# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided
# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')
# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)
# check uniqueness of composite key, multi-column rule
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2
# aggregation checks working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2

Fields:

  • criticality: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".
  • check column expression containing:
    • function: check function name to apply.
    • arguments: check function arguments if any.
    • for_each_column: (optional) list of column names or expressions to which the check will be applied individually.
  • (optional) name: name of the check: autogenerated if not provided.
  • (optional) filter: spark expression to filter the rows for which the check is applied (e.g. "business_unit = 'Finance'"). The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function.
  • (optional) user_metadata: key-value pairs added to the row-level warnings and errors
Note

Checks are applied to a column specified in the arguments of the check. This can be either a column name as string or column expression. Some checks require as input columns (eg. is_unique) instead of column, which is a list of column names or expressions. There are also checks that require no columns at all (eg. sql_expression). Alternatively, for_each_column can be used to define list of columns that the check should be applied for one by one. For details on each check, please refer to the documentation here.

Applying Quality Rules defined in a File

Checks can be loaded from a file in the installation folder, workspace, or local file system. The checks can be applied using one of the following methods:

  • apply_checks_by_metadata_and_split: splits the input data into valid and invalid (quarantined) dataframes.
  • apply_checks_by_metadata: report issues as additional columns.

The engine will raise an error if you try to load checks with invalid definition. In addition, you can also perform a standalone syntax validation of the checks as described here.

Method 1: Loading and Applying Checks Defined in the Installation Folder

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# load check file defined in 'checks_file' in the run config
# only works if DQX is installed in the workspace
checks = dq_engine.load_checks_from_installation(method="file", assume_user=True, run_config_name="default")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, run_config_name="default")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, run_config_name="default")

Method 2: Loading and Applying Checks Defined in a Workspace File

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/checks.yml")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, output_table="catalog1.schema1.output", quarantine_table="catalog1.schema1.quarantine")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, output_table="catalog1.schema1.output")

Method 3: Loading and Applying Checks Defined in a Local File System

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

checks = DQEngine.load_checks_from_local_file("checks.yml")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, output_table="catalog1.schema1.output", quarantine_table="catalog1.schema1.quarantine")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, output_table="catalog1.schema1.output")

Quality Rules defined in a Delta Table

Quality rules can also be stored in a Delta table in Unity Catalog. Each row represents a check with column values for the name, check, criticality, filter, and run_config_name.

Example:

# The checks table will contain the following columns:
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+
# | name | criticality | check | filter | run_config_name | user_metadata |
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+
# | "city_is_null" | "warn" | {function: 'is_not_null', | "country = 'Poland'" | "default" | {'key1': 'value1', ...} |
# | | | arguments: {'column': 'city'}} | | | |
# | ... | ... | ... | ... | ... | ... |
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+

Fields:

  • criticality: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".
  • check: a StructType value with the following fields:
    • function: name of the DQX check function to apply.
    • arguments: MapType value with the function's keyword arguments, if any, stored as json.
    • for_each_column: (optional) list of column names or expressions to which the check will be applied individually.
  • (optional) name: name to use for the check.
  • (optional) filter: spark expression to filter the rows for which the check is applied (e.g. "business_unit = 'Finance'"). The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function.
  • run_config_name: a run or workflow name. Can be used to load and apply a subset of checks to specific workflows. Default value is "default".
  • (optional) user_metadata: a MapType column with key-value pairs added to the row-level warnings and errors generated by DQX checks.

Applying Quality Rules defined in a Delta Table

Checks loaded from a table can be applied using one of the following methods:

  • apply_checks_by_metadata_and_split: splits the input data into valid and invalid (quarantined) dataframes.
  • apply_checks_by_metadata: report issues as additional columns.

The engine will raise an error if you try to load checks with invalid definition. In addition, you can also perform a standalone syntax validation of the checks as described here.

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# Load checks with the "default" run config:
default_checks = dq_engine.load_checks_from_table(table_name="dq.config.checks_table")

# Load checks with the "workflow_001" run config:
workflow_checks = dq_engine.load_checks_from_table(table_name="dq.config.checks_table", run_config_name="workflow_001")

# load checks from a table defined in 'checks_table' in the default run config
# only works if DQX is installed in the workspace
workflow_checks = dq_engine.load_checks_from_installation(method="table", assume_user=True, run_config_name="default")

checks = default_checks + workflow_checks
input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, output_table="catalog1.schema1.output", quarantine_table="catalog1.schema1.quarantine")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, output_table="catalog1.schema1.output")

Quality Rules defined in Code

Method 1: Using DQX Classes

Checks defined programmatically using DQX classes can applied using one of the following methods:

  • apply_checks_and_split: if you want to split the checked data into valid and invalid (quarantined) dataframes.
  • apply_checks: if you want to report issues as additional columns.

Example:

from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule, DQRowRuleForEachCol
from databricks.sdk import WorkspaceClient


dq_engine = DQEngine(WorkspaceClient())

checks = [
DQRowRule( # check for a single column
name="col3_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col3",
)] + \
DQRowRuleForEachCol( # check for multiple columns applied individually
columns=["col1", "col2"],
criticality="error",
check_func=check_funcs.is_not_null).get_rules() + [
DQRowRule( # check with a filter
name="col_4_is_null_or_empty",
criticality="warn",
filter="col1 < 3",
check_func=check_funcs.is_not_null_and_not_empty,
column="col4",
),
DQRowRule( # check with user metadata
name="col_5_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col5",
user_metadata={
"check_type": "completeness",
"responsible_data_steward": "someone@email.com"
},
),
DQRowRule( # provide check func arguments using positional arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col1",
check_func_args=[[1, 2]],
),
DQRowRule( # provide check func arguments using keyword arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2]},
),
DQRowRule( # check for a struct field
# use "error" criticality if not provided
check_func=check_funcs.is_not_null,
column="col7.field1",
),
DQRowRule( # check for a map element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col5", F.lit("key1")),
),
DQRowRule( # check for an array element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col6", F.lit(1)),
),
DQRowRule( # check uniqueness of composite key, multi-column rule
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"]
),
DQRowRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="col1",
check_func_kwargs={"aggr_type": "count", "group_by": ["col2"], "limit": 10},
),
DQRowRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 1.2},
),
]

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, output_table="catalog1.schema1.output", quarantine_table="catalog1.schema1.quarantine")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, output_table="catalog1.schema1.output")
Note

The validation of arguments and keyword arguments for the check function is automatically performed upon creating a DQRowRule.

Method 2: Using YAML/JSON Definition

Checks defined declaratively in yaml or json can applied using one of the following methods:

  • apply_checks_by_metadata_and_split: if you want to split the checked data into valid and invalid (quarantined) dataframes.
  • apply_checks_by_metadata: if you want to report issues as additional columns.

Example:

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

checks = yaml.safe_load("""
# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4
# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com
# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2
# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided
# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')
# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)
# check uniqueness of composite key, multi-column rule
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2
# aggregation checks working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2
""")

input_df = spark.read.table("catalog1.schema1.table1")

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, output_table="catalog1.schema1.output", quarantine_table="catalog1.schema1.quarantine")

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, output_table="catalog1.schema1.output")

Integration with DLT (Delta Live Tables)

DLT provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. The example below demonstrates integrating DQX with DLT to provide comprehensive quality information. The DQX integration with DLT does not use DLT Expectations but DQX's own methods.

Option 1: Apply quality rules and quarantine bad records

import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

checks = ... # quality rules / checks

@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)

@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)

@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)

Option 2: Apply quality rules and report issues as additional columns

import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())

@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)

@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df

Validating syntax of Quality Checks

You can validate the syntax of checks defined declaratively in a Delta table or file (either yaml or json) before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine. The validation cannot be used for checks defined programmatically using DQX classes. When checks are defined programmatic with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly.

import yaml
from databricks.labs.dqx.engine import DQEngine

checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
""")

status = DQEngine.validate_checks(checks)
print(status)
Usage tips

Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.

Quality Check Results

Quality check results are added as additional columns to the output DataFrame. These columns capture the outcomes of the checks performed on the input data. The reporting columns are named _error and _warning by default, but you can customize them as described in the Additional Configuration section. The reporting columns can be used to monitor and track data quality issues and for further processing, such as using in a dashboard, or other downstream applications. The reporting columns can also be written to a separate table by the user to store quality results independently from the transformed data.

The quality check results (DataFrames) can be saved by the user to table(s). DQX offers a convenient method to save the results in table(s), either specified by the user or defined in the run configuration:

from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

dq_engine = DQEngine(WorkspaceClient())

# save results to provided tables in batch
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_table="catalog1.schema1.output",
quarantine_table="catalog1.schema1.quarantine",
output_table_mode= "append", # or "overwrite"
quarantine_table_mode= "append", # or "overwrite"
output_table_options={"mergeSchema": "true"}, # or other options, default {}
quarantine_table_options={"mergeSchema": "true"}, # or other options, default {}
)

# save results to tables from run configuration in batch
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
run_config_name="default", # use run config to get output table and quarantine table names
)

# save results to tables from run configuration using streaming
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
run_config_name="default", # use run config to get output table and quarantine table names
options={
"checkpointLocation": "/tmp/dqx/checkpoint",
},
trigger={"availableNow": True}
)

Below is a sample output of a check as stored in a reporting column:

[
{
"name": "col_city_is_null",
"message": "Column 'city' is null",
"columns": ["city"],
"filter": "country = 'Poland'",
"function": "is_not_null",
"run_time": "2025-01-01 14:31:21",
"user_metadata": {"key1": "value1", "key2": "value2"},
},
]

The structure of the reporting columns is an array of struct containing the following fields (see the exact structure here):

  • name: name of the check (string type).
  • message: message describing the quality issue (string type).
  • columns: name of the column(s) where the quality issue was found (string type).
  • filter: filter applied to if any (string type).
  • function: rule/check function applied (string type).
  • run_time: timestamp when the check was executed (timestamp type).
  • user_metadata: optional key-value custom metadata provided by the user (dictionary type).

The below example demonstrates how to extract the results from a reporting column in PySpark:

import pyspark.sql.functions as F

# apply quality checks
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)

# extract errors
results_df = quarantine_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))

# extract warnings
results_df = quarantine_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))

# The results_df will contain the following columns:
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
# | name | message | columns | filter | function | run_time | user_metadata |
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
# | col_city_is_null | Column 'city' is null | ['city'] | country = 'Poland' | is_not_null | 2025-01-01 14:31:21 | {} |
# | ... | ... | ... | ... | ... | ... | ... |
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+

An example of how to provide user metadata can be found in the Additional Configuration section.