Quality Check Definition and Application
DQX offers a set of predefined built-in quality rules (checks). See details and list of all check functions here.
The quality checking can be done on simple column types and complex types like structs, maps and arrays. Additionally, you can define custom checks to meet specific requirements.
There are several ways to define and apply quality checks in DQX:
- Quality rules defined in a file: Quality rules can be stored in a
yaml
orjson
file. - Quality rules defined in a Delta table: Quality rules can also be stored in a Delta table in Unity Catalog.
- Quality rules defined in code: Quality rules can be defined using DQX classes or metadata.
Quality Rules defined in a File
Quality rules can be defined declaratively as part of yaml
or json
file and stored in the installation folder, workspace, or local file system.
Below is an example yaml
file ('checks.yml') defining several checks:
# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4
# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com
# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2
# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided
# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')
# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)
# check uniqueness of composite key, multi-column rule
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2
# aggregation checks working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2
- criticality: error
check:
function: is_aggr_equal
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 5
- criticality: error
check:
function: is_aggr_not_equal
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 10.5
Fields:
criticality
: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".check
column expression containing:function
: check function name to apply.arguments
: check function arguments if any.for_each_column
: (optional) list of column names or expressions to which the check will be applied individually.
- (optional)
name
: name of the check: autogenerated if not provided. - (optional)
filter
: spark expression to filter the rows for which the check is applied (e.g."business_unit = 'Finance'"
). The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function. - (optional)
user_metadata
: key-value pairs added to the row-level warnings and errors
Checks are applied to a column
specified in the arguments
of the check
. This can be either a column name as string
or column expression.
Some checks require as input columns
(eg. is_unique
) instead of column
, which is a list of column names or expressions. There are also checks that require no columns at all (eg. sql_expression
).
Alternatively, for_each_column
can be used to define list of columns that the check should be applied for one by one.
For details on each check, please refer to the documentation here.
Applying Quality Rules defined in a File
Checks can be loaded from a file in the installation folder, workspace, or local file system. The checks can be applied using one of the following methods:
apply_checks_by_metadata_and_split
: splits the input data into valid and invalid (quarantined) dataframes.apply_checks_by_metadata
: report issues as additional columns.
The engine will raise an error if you try to load checks with invalid definition. In addition, you can also perform a standalone syntax validation of the checks as described here.
Method 1: Loading and Applying Checks Defined in the Installation Folder
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Load check from the installation (from file defined in 'checks_file' in the run config)
# Only works if DQX is installed in the workspace
default_checks = dq_engine.load_checks_from_installation(method="file", assume_user=True, run_config_name="default")
workflow_checks = dq_engine.load_checks_from_installation(method="file", assume_user=True, run_config_name="workflow_001")
checks = default_checks + workflow_checks
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_df, quarantine_df=quarantine_df, run_config_name="default")
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(output_df=valid_and_quarantine_df, run_config_name="default")
Method 2: Loading and Applying Checks Defined in a Workspace File
- Python
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Load checks from multiple files and merge
default_checks = dq_engine.load_checks_from_workspace_file("/Shared/App1/default_checks.yml")
workflow_checks = dq_engine.load_checks_from_workspace_file("/Shared/App1/workflow_checks.yml")
checks = default_checks + workflow_checks
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.valid_data"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data")
)
Method 3: Loading and Applying Checks Defined in a Local File System
- Python
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Load checks from multiple files and merge
default_checks = DQEngine.load_checks_from_local_file("default_checks.yml")
workflow_checks = DQEngine.load_checks_from_local_file("workflow_checks.yml")
checks = default_checks + workflow_checks
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.valid_data"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_and_quarantine_df,
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data")
)
Quality Rules defined in a Delta Table
Quality rules can also be stored in a Delta table in Unity Catalog. Each row represents a check with column values for the name
, check
, criticality
, filter
, and run_config_name
.
Example:
# The checks table will contain the following columns:
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+
# | name | criticality | check | filter | run_config_name | user_metadata |
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+
# | "city_is_null" | "warn" | {function: 'is_not_null', | "country = 'Poland'" | "default" | {'key1': 'value1', ...} |
# | | | arguments: {'column': 'city'}} | | | |
# | ... | ... | ... | ... | ... | ... |
# +------------------+-----------------------+---------------------------------------+----------------------+--------------------+-------------------------+
Fields:
criticality
: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".check
: aStructType
value with the following fields:function
: name of the DQX check function to apply.arguments
:MapType
value with the function's keyword arguments, if any, stored as json.for_each_column
: (optional) list of column names or expressions to which the check will be applied individually.
- (optional)
name
: name to use for the check. - (optional)
filter
: spark expression to filter the rows for which the check is applied (e.g."business_unit = 'Finance'"
). The check function will run only on the rows matching the filter condition. The condition can reference any column of the validated dataset, not only the one where you apply the check function. run_config_name
: a run or workflow name. Can be used to load and apply a subset of checks to specific workflows. Default value is"default"
.- (optional)
user_metadata
: aMapType
column with key-value pairs added to the row-level warnings and errors generated by DQX checks.
Applying Quality Rules defined in a Delta Table
Checks loaded from a table can be applied using one of the following methods:
apply_checks_by_metadata_and_split
: splits the input data into valid and invalid (quarantined) dataframes.apply_checks_by_metadata
: report issues as additional columns.
The engine will raise an error if you try to load checks with invalid definition. In addition, you can also perform a standalone syntax validation of the checks as described here.
- Python
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Load checks from the "default" run config
default_checks = dq_engine.load_checks_from_table(table_name="dq.config.checks_table")
# Load checks from the "workflow_001" run config
workflow_checks = dq_engine.load_checks_from_table(table_name="dq.config.checks_table", run_config_name="workflow_001")
# Load checks from the installation (from a table defined in 'checks_table' in the run config)
# Only works if DQX is installed in the workspace
workflow_checks = dq_engine.load_checks_from_installation(method="table", assume_user=True, run_config_name="workflow_001")
checks = default_checks + workflow_checks
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.validdata"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(
output_df= valid_and_quarantine_df,
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data")
)
Quality Rules defined in Code
Method 1: Using DQX Classes
Checks defined programmatically using DQX classes can applied using one of the following methods:
apply_checks_and_split
: if you want to split the checked data into valid and invalid (quarantined) dataframes.apply_checks
: if you want to report issues as additional columns.
Example:
- Python
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = [
DQRowRule( # check for a single column
name="col3_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col3",
),
*DQForEachColRule( # check for multiple columns applied individually
columns=["col1", "col2"],
criticality="error",
check_func=check_funcs.is_not_null).get_rules(),
DQRowRule( # check with a filter
name="col_4_is_null_or_empty",
criticality="warn",
filter="col1 < 3",
check_func=check_funcs.is_not_null_and_not_empty,
column="col4",
),
DQRowRule( # check with user metadata
name="col_5_is_null_or_empty",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col5",
user_metadata={
"check_type": "completeness",
"responsible_data_steward": "someone@email.com"
},
),
DQRowRule( # provide check func arguments using positional arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col1",
check_func_args=[[1, 2]],
),
DQRowRule( # provide check func arguments using keyword arguments
criticality="warn",
check_func=check_funcs.is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2]},
),
DQRowRule( # check for a struct field
# use "error" criticality if not provided
check_func=check_funcs.is_not_null,
column="col7.field1",
),
DQRowRule( # check for a map element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col5", F.lit("key1")),
),
DQRowRule( # check for an array element
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col6", F.lit(1)),
),
DQDatasetRule( # check uniqueness of composite key, multi-column rule
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"]
),
DQDatasetRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="col1",
check_func_kwargs={"aggr_type": "count", "group_by": ["col2"], "limit": 10},
),
DQDatasetRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 1.2},
),
DQDatasetRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_equal,
column="col1",
check_func_kwargs={"aggr_type": "count", "group_by": ["col2"], "limit": 5},
),
DQDatasetRule( # aggregation check working across group of rows
criticality="error",
check_func=check_funcs.is_aggr_not_equal,
column="col1",
check_func_kwargs={"aggr_type": "avg", "group_by": ["col2"], "limit": 10.5},
),
]
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.validdata"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_and_quarantine_df,
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data")
)
The validation of arguments and keyword arguments for the check function is automatically performed upon creating a DQRowRule
.
Method 2: Using YAML/JSON Definition
Checks defined declaratively in yaml
or json
can applied using one of the following methods:
apply_checks_by_metadata_and_split
: if you want to split the checked data into valid and invalid (quarantined) dataframes.apply_checks_by_metadata
: if you want to report issues as additional columns.
Example:
- Python
import yaml
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = yaml.safe_load("""
# check for a single column
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
# check for multiple columns applied individually
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
# check with a filter
- criticality: warn
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
column: col4
# check with user metadata
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col5
user_metadata:
check_category: completeness
responsible_data_steward: someone@email.com
# check with auto-generated name
- criticality: warn
check:
function: is_in_list
arguments:
column: col1
allowed:
- 1
- 2
# check for a struct field
- check:
function: is_not_null
arguments:
column: col7.field1
# "error" criticality used if not provided
# check for a map element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col5, 'key1')
# check for an array element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col6, 1)
# check uniqueness of composite key, multi-column rule
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2
# aggregation checks working across group of rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 10
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 1.2
- criticality: error
check:
function: is_aggr_equal
arguments:
column: col1
aggr_type: count
group_by:
- col2
limit: 5
- criticality: error
check:
function: is_aggr_not_equal
arguments:
column: col1
aggr_type: avg
group_by:
- col2
limit: 10.5
""")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig("catalog.schema.validdata"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_and_quarantine_df,
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data")
)
End-to-end Quality Checking
Data can be loaded from Delta tables or files, checked by DQX, and written directly to Delta tables with a single method call.
Method 1: Using DQX Classes
The apply_checks_and_save_in_table
method reads data from an input table or set of files, applies checks defined with DQX classes, and writes checked data to output tables.
- Python
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Define some checks using DQRule classes
checks = [
DQRowRule(
name="id_not_null",
criticality="error",
check_func=check_funcs.is_not_null,
column="id",
),
DQRowRule(
name="amount_positive",
criticality="warn",
check_func=check_funcs.is_greater_than,
column="amount",
limit=0,
),
]
# Option 1: Split data and write to separate output tables (validated and quarantined data)
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig("catalog.schema.input_data"),
output_config=OutputConfig("catalog.schema.valid_data"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: Write all data with error/warning columns to a single output table
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig("catalog.schema.input_data")
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data"),
)
Method 2: Using YAML/JSON Definitions
The apply_checks_by_metadata_and_save_in_table
method reads data from an input table or set of files, applies checks loaded from configuration, and writes checked data to output tables.
- Python
import yaml
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# Define checks using YAML
checks = yaml.safe_load("""
- name: id_is_null
criticality: error
check:
function: is_not_null
arguments:
column: id
- name: email_invalid_format
criticality: warn
check:
function: regex_match
arguments:
column: email
pattern: ^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$
""")
# Option 1: Split data and write to separate output tables (validated and quarantined data)
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig("catalog.schema.input_data")
output_config=OutputConfig("catalog.schema.valid_data"),
quarantine_config=OutputConfig("catalog.schema.quarantine_data")
)
# Option 2: Write all data with error/warning columns to a single output table
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig("catalog.schema.input_data")
output_config=OutputConfig("catalog.schema.valid_and_quarantine_data"),
)
End-to-end Checking with Streaming
Both end-to-end quality checking methods support streaming writes.
- Python
# Read from Delta as a streaming source, apply checks, and write outputs to a Delta table
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig(
"catalog.schema.input_data",
is_streaming=True
),
output_config=OutputConfig(
"catalog.schema.valid_and_quarantine_data",
trigger={"availableNow": True},
options={"checkpointLocation": "/path/to/checkpoint", "mergeSchema": "true"}
),
)
Advanced Configuration Options
Data source and sink options can be set for reading input data and writing to output tables.
- Python
# Use custom output modes and options for output Delta tables
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig(
"catalog.schema.input_data",
options={"versionAsOf": "10"}
),
output_config=OutputConfig(
"catalog.schema.valid_data",
mode="append",
options={"overwriteSchema": "true"}
),
quarantine_config=OutputConfig(
"catalog.schema.quarantine_data",
mode="append",
options={"mergeSchema": "true"}
)
)
Integration with Lakeflow Pipelines (formerly DLT (Delta Live Tables))
Lakeflow Pipelines provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. The example below demonstrates integrating DQX with Lakeflow Pipelines to provide comprehensive quality information. The DQX integration with Lakeflow Pipelines does not use Expectations but DQX's own methods.
Option 1: Apply quality rules and quarantine bad records
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = ... # quality rules / checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)
Option 2: Apply quality rules and report issues as additional columns
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df
Validating syntax of Quality Checks
You can validate the syntax of checks defined declaratively in a Delta table or file (either yaml
or json
) before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine.
The validation cannot be used for checks defined programmatically using DQX classes. When checks are defined programmatic with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly.
- Python
- CLI
import yaml
from databricks.labs.dqx.engine import DQEngine
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
""")
status = DQEngine.validate_checks(checks)
print(status)
Validate checks stored in the installation folder:
databricks labs dqx validate-checks --run-config "default"
The following DQX configuration from 'config.yml' will be used by default:
- 'checks_file': relative location of the quality rules defined declaratively as
yaml
orjson
inside the installation folder (default:checks.yml
).
Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.
Quality Check Results
Quality check results are added as additional columns to the output DataFrame. These columns capture the outcomes of the checks performed on the input data.
The result columns are named _error
and _warning
by default, but you can customize them as described in the Additional Configuration section.
The result columns can be used to monitor and track data quality issues and for further processing, such as using in a dashboard, or other downstream applications.
The result columns can also be written to a separate table by the user to store quality results independently from the transformed data.
The quality check results (DataFrames) can be saved by the user to table(s). DQX offers a convenient method to save the results in table(s), either specified by the user or defined in the run configuration:
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# save results to provided tables in batch
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
output_config=OutputConfig(
location="catalog1.schema1.output",
mode="append", # or "overwrite"
options={"mergeSchema", "true"} # or other options, default {}
),
quarantine_config=OutputConfig(
location="catalog1.schema1.quarantine",
mode="append", # or "overwrite"
options={"mergeSchema", "true"} # or other options, default {}
),
)
# save results to tables from run configuration in batch
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
run_config_name="default", # use run config to get output table and quarantine table names
)
# save results to tables from run configuration using streaming
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
run_config_name="default", # use run config to get output table and quarantine table names
options={
"checkpointLocation": "/tmp/dqx/checkpoint",
},
trigger={"availableNow": True}
)
Below is a sample output of a check as stored in a result column:
[
{
"name": "col_city_is_null",
"message": "Column 'city' is null",
"columns": ["city"],
"filter": "country = 'Poland'",
"function": "is_not_null",
"run_time": "2025-01-01 14:31:21",
"user_metadata": {"key1": "value1", "key2": "value2"},
},
]
The structure of the result columns is an array of struct containing the following fields (see the exact structure here):
name
: name of the check (string type).message
: message describing the quality issue (string type).columns
: name of the column(s) where the quality issue was found (string type).filter
: filter applied to if any (string type).function
: rule/check function applied (string type).run_time
: timestamp when the check was executed (timestamp type).user_metadata
: optional key-value custom metadata provided by the user (dictionary type).
The below example demonstrates how to extract the results from a result column in PySpark:
- Python
import pyspark.sql.functions as F
# apply quality checks
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)
# extract errors
results_df = quarantine_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))
# extract warnings
results_df = quarantine_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))
# The results_df will contain the following columns:
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
# | name | message | columns | filter | function | run_time | user_metadata |
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
# | col_city_is_null | Column 'city' is null | ['city'] | country = 'Poland' | is_not_null | 2025-01-01 14:31:21 | {} |
# | ... | ... | ... | ... | ... | ... | ... |
# +------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
An example of how to provide user metadata can be found in the Additional Configuration section.