Applying Quality Checks
You can apply quality checks to your data using the following approaches:
- Programmatic approach: you can apply checks programmatically to a DataFrame or Tables. This method is required if you want to perform a data quality check for data in-transit.
- No-code approach (Workflows): you can apply checks for data at-rest using workflows if DQX is installed in the workspace as a tool. This does not require code-level integration and is suitable for data already stored in Delta tables or files.
DQX offers a set of predefined built-in quality rules (checks) as described here.
Programmatic approach
Checks can be applied to the input data by one of the following methods of the DQEngine
class:
-
For checks defined with DQX classes:
apply_checks
: apply quality checks to a DataFrame and report issues as additional columns.apply_checks_and_split
: apply quality checks and split the input DataFrame into valid and invalid (quarantined) DataFrames.apply_checks_and_save_in_table
: end-to-end approach to apply quality checks to a table and save results to the output table(s) via single method call.
-
For checks defined as metadata (list of dictionaries, or loaded from a storage):
apply_checks_by_metadata
: apply quality checks to a DataFrame and report issues as additional columns.apply_checks_by_metadata_and_split
: apply quality checks and split the input DataFrame into valid and invalid (quarantined) DataFrames.apply_checks_by_metadata_and_save_in_table
: end-to-end approach to apply quality checks to a table and save results to output table(s) via single method call.
You can see the full list of DQEngine
methods here.
The engine will raise an error if you try to apply checks with invalid definition. In addition, you can also perform a standalone syntax validation of the checks as described here.
You can apply quality checks to streaming pipelines using the same methods as for batch processing.
You can either use the end-to-end methods or manage the input stream and output directly with native Spark APIs (e.g. spark.readStream
and writeStream
).
You can find ready-to-use examples of applying checks with different methods in the demo section.
The results are reported as additional columns in the output DataFrame (by default _warning
and _error
columns).
See quality check results for details on the structure of the results.
You can customize the reporting columns as described in the additional configuration section.
Applying checks defined with DQX classes
- Python
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]
input_df = spark.read.table("catalog.schema.input")
# Option 1: apply quality checks on the DataFrame and output results to a single DataFrame
valid_and_invalid_df = dq_engine.apply_checks(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_and_invalid_df,
output_config=OutputConfig(location="catalog.schema.output"),
)
# Option 2: apply quality checks on the DataFrame and provide valid and invalid (quarantined) DataFrames
valid_df, invalid_df = dq_engine.apply_checks_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=invalid_df,
output_config=OutputConfig(location="catalog.schema.valid"),
quarantine_config=OutputConfig(location="catalog.schema.quarantine"),
)
# Option 3 End-to-End approach: apply quality checks to a table and save results to valid and invalid (quarantined) tables
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig(location="catalog.schema.input"),
output_config=OutputConfig(location="catalog.schema.valid"),
quarantine_config=OutputConfig(location="catalog.schema.quarantine"),
)
# Option 4 End-to-End approach: apply quality checks to a table and save results to an output table
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig(location="catalog.schema.input"),
output_config=OutputConfig(location="catalog.schema.output"),
)
Applying checks defined using metadata
Users can save and load checks as metadata (list of dictionaries) from a storage through several supported methods, as described here. When loading checks from a storage, they are always returned as metadata (list of dictionaries). You can convert checks from metadata to classes and back using serialization methods described here.
In the example below, checks are defined in YAML syntax for convenience and then loaded into a list of dictionaries.
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# checks can also be loaded from a storage
checks: list[dict] = yaml.safe_load("""
- criticality: warn
check:
function: is_not_null
arguments:
column: col3
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
- col2
- name: email_invalid_format
criticality: error
check:
function: regex_match
arguments:
column: email
pattern: ^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$
""")
input_df = spark.read.table("catalog.schema.input")
# Option 1: apply quality checks on the DataFrame and output results as a single DataFrame
valid_and_invalid_df = dq_engine.apply_checks_by_metadata(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_and_invalid_df,
output_config=OutputConfig(location="catalog.schema.output"),
)
# Option 2: apply quality checks on the DataFrame and provide valid and invalid (quarantined) DataFrames
valid_df, invalid_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=invalid_df,
output_config=OutputConfig(location="catalog.schema.valid"),
quarantine_config=OutputConfig(location="catalog.schema.quarantine"),
)
# Option 3 End-to-End approach: apply quality checks on the input table and save results to valid and invalid (quarantined) tables
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig(location="catalog.schema.input"),
output_config=OutputConfig(location="catalog.schema.valid"),
quarantine_config=OutputConfig(location="catalog.schema.quarantine"),
)
# Option 4 End-to-End approach: apply quality checks on the input table and save results to an output table
dq_engine.apply_checks_by_metadata_and_save_in_table(
checks=checks,
input_config=InputConfig(location="catalog.schema.input"),
output_config=OutputConfig(location="catalog.schema.output"),
)
Checks are applied to a column
specified in the arguments
of the check
. This can be either a column name as string
or column expression.
Some checks require columns
(e.g. is_unique
) instead of column
, which is a list of column names or expressions. There are also checks that require no columns at all (e.g. sql_expression
).
Alternatively, for_each_column
can be used to define a list of columns that the check should be applied for one by one.
For details on each check, please refer to the documentation here.
Applying checks in Lakeflow Pipelines
Lakeflow Pipelines (formerly DLT - Delta Live Tables) provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail and don't provide native quarantine functionality.
The example below demonstrates integrating DQX with Lakeflow Pipelines to provide comprehensive quality information. The DQX integration with Lakeflow Pipelines does not use Expectations but DQX's own methods.
Option 1: Apply quality check and quarantine bad records
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = ... # quality rules / checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)
Option 2: Apply quality checks and report issues as additional columns
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df
Saving quality results
The quality check results (DataFrames) can be saved by the user to arbitrary tables using Spark APIs. DQX also offers convenient methods to save the results to tables without the need to use Spark APIs directly. Users can also use end-to-end methods that handle applying checks and saving results in a single method.
You can see the full list of DQEngine
methods here.
Below are examples on how to save the quality checking results in tables using the DQEngine
methods:
from databricks.labs.dqx.config import OutputConfig
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# apply checks
valid_df, invalid_df = dq_engine.apply_checks_and_split(input_df, checks)
# use spark native APIs to save the results
valid_df.write.options({"mergeSchema": "true"}).mode("append").saveAsTable("catalog.schema.valid")
invalid_df.write.options({"mergeSchema": "true"}).mode("append").saveAsTable("catalog.schema.quarantine")
# save results to tables using DQX method
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=invalid_df,
output_config=OutputConfig(
location="catalog.schema.valid",
mode="append", # or "overwrite"
options={"mergeSchema": "true"}, # default {}
),
quarantine_config=OutputConfig(
location="catalog.schema.quarantine",
mode="append", # or "overwrite"
options={"mergeSchema": "true"}, # default {}
),
)
# save results to tables using streaming using DQX method
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=invalid_df, # optional
output_config=OutputConfig(
location="catalog.schema.valid",
trigger={"availableNow": True},
options={"checkpointLocation": "/path/to/checkpoint_output", "mergeSchema": "true"},
),
# required if quarantine_df is defined
quarantine_config=OutputConfig(
location="catalog.schema.quarantine",
trigger={"availableNow": True},
options={"checkpointLocation": "/path/to/checkpoint_quarantine", "mergeSchema": "true"},
),
)
# save results to tables defined in run config using DQX method
dq_engine.save_results_in_table(
output_df=valid_df,
quarantine_df=invalid_df,
run_config_name="default", # use run config to get output and quarantine table details, and writing options
)
# End-to-end method: apply checks and save results in tables using DQX method
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig(
location="catalog.schema.input"
),
output_config=OutputConfig(
location="catalog.schema.valid",
mode="append", # or "overwrite"
),
# optional
quarantine_config=OutputConfig(
location="catalog.schema.quarantine",
mode="append", # or "overwrite"
),
)
# End-to-end method: apply checks and save results in tables using streaming using DQX method
dq_engine.apply_checks_and_save_in_table(
checks=checks,
input_config=InputConfig(
location="catalog.schema.input",
is_streaming=True,
),
output_config=OutputConfig(
location="catalog.schema.valid",
mode="append", # or "update" or "complete"
trigger={"availableNow": True},
options={"checkpointLocation": "/path/to/checkpoint_output", "mergeSchema": "true"},
),
# optional
quarantine_config=OutputConfig(
location="catalog.schema.quarantine",
mode="append", # or "update" or "complete"
trigger={"availableNow": True},
options={"checkpointLocation": "/path/to/checkpoint_quarantine", "mergeSchema": "true"},
),
)
No-code approach (Workflows)
You can apply quality checks to data at-rest using DQX Workflows without writing any code. This requires installation of DQX in the workspace as a tool (see installation guide).
The workflows use configuration file to specify input and output locations, and to apply sets of quality checks from a storage.
You can open the configuration file (.dqx/config.yml
) in the installation folder from Databricks UI or by executing the following Databricks CLI command:
databricks labs dqx open-remote-config
There are currently two workflows available in DQX: quality checker and end-to-end workflow.
The workflows can be run manually or scheduled to run periodically. The workflows are not scheduled to run automatically by default minimizing concerns regarding compute and associated costs.
The workflows can be executed as batch or streaming jobs.
Quality Checker Workflow
Quality checker workflow performs the following:
- apply checks to the input data (files or a table)
- save results to output tables (with or without a quarantine)
To run the workflows manually, you can use Databricks UI or Databricks CLI:
databricks labs dqx apply-checks --run-config "default"
You can also provide --timeout-minutes
option.
Execution logs are printed in the console and saved in the installation folder. You can display the logs from the latest run by executing the following command:
databricks labs dqx logs --workflow quality-checker
The following fields from the configuration file are used by the quality checker workflow:
input_config
: configuration for the input data.output_config
: configuration for the output data.quarantine_config
: (optional) configuration for the quarantine data.checks_location
: location of the quality checks in storage.serverless_clusters
: whether to use serverless clusters for running the workflow (default:true
). Using serverless clusters is recommended as it allows for automated cluster management and scaling.quality_checker_spark_conf
: (optional) spark configuration to use for the workflow, only applicable ifserverless_clusters
is set tofalse
.quality_checker_override_clusters
: optional cluster configuration to use for the workflow, only applicable ifserverless_clusters
is set tofalse
.extra_params
: (optional) extra parameters to pass to the jobs such as result column names and user_metadatacustom_check_functions
: (optional) mapping of custom check function name to Python file (module) containing check function definition.reference_tables
: (optional) mapping of reference table names to reference table locations.
Example of the configuration file (relevant fields only):
serverless_clusters: true # default is true, set to false to use standard clusters
run_configs:
- name: default
checks_location: catalog.table.checks # can be a table or file
input_config:
format: delta
location: /databricks-datasets/delta-sharing/samples/nyctaxi_2019
is_streaming: false # set to true if wanting to run it using streaming
output_config:
format: delta
location: main.nytaxi.output
mode: append
#checkpointLocation: /Volumes/catalog/schema/volume/checkpoint # only applicable if input_config.is_streaming is enabled
#trigger: # streaming trigger, only applicable if input_config.is_streaming is enabled
# availableNow: true
quarantine_config: # optional
format: delta
location: main.nytaxi.quarantine
mode: append
#checkpointLocation: /Volumes/catalog/schema/volume/checkpoint # only applicable if input_config.is_streaming is enabled
#trigger: # streaming trigger, only applicable if input_config.is_streaming is enabled
# availableNow: true
custom_check_functions: # optional
my_func: custom_checks/my_funcs.py # relative workspace path (installation folder prefix applied)
my_other: /Workspace/Shared/MyApp/my_funcs.py # absolute workspace path
email_mask: /Volumes/main/dqx_utils/custom/email.py # UC volume path
reference_tables: # optional
reference_vendor:
input_config:
format: delta
location: main.nytaxi.ref
End-to-End Workflow
The end-to-end (e2e) workflow is designed to automate the entire data quality checking process, from profiling the input data to applying quality checks and saving results. The workflow executes the profiler and quality checker workflows in sequence, allowing you to generate quality checks based on the input data and then apply those checks to the same data.
End-to-end (e2e) workflow performs the following:
- profile input data (files or a table)
- generate quality checks
- apply the generated checks to the input data
- save results to output tables (with or without a quarantine)
To run the workflows manually, you can use Databricks UI or Databricks CLI:
databricks labs dqx e2e --run-config "default"
You can also provide --timeout-minutes
option.
Execution logs are printed in the console and saved in the installation folder. You can display the logs from the latest run by executing the following command:
databricks labs dqx logs --workflow e2e
# see logs of the profiler and quality-checker jobs triggered by the e2e workflow
databricks labs dqx logs --workflow profiler
databricks labs dqx logs --workflow quality-checker
The following fields from the configuration file are used by e2e workflow:
input_config
: configuration for the input data.output_config
: configuration for the output data.quarantine_config
: (optional) configuration for the quarantine data.checks_location
: location of the quality checks in storage.serverless_clusters
: whether to use serverless clusters for running the workflow (default:true
). Using serverless clusters is recommended as it allows for automated cluster management and scaling.e2e_spark_conf
: (optional) spark configuration to use for the e2e workflow, only applicable ifserverless_clusters
is set tofalse
.e2e_override_clusters
: (optional) cluster configuration to use for the e2e workflow, only applicable ifserverless_clusters
is set tofalse
.quality_checker_spark_conf
: (optional) spark configuration to use for the quality checker workflow, only applicable ifserverless_clusters
is set tofalse
.quality_checker_override_clusters
: (optional) cluster configuration to use for the quality checker workflow, only applicable ifserverless_clusters
is set tofalse
.profiler_spark_conf
: (optional) spark configuration to use with the profiler workflow, only applicable ifserverless_clusters
is set tofalse
.profiler_override_clusters
: (optional) cluster configuration to use for profiler workflow, only applicable ifserverless_clusters
is set tofalse
.profiler_config
: configuration for the profiler containing:summary_stats_file
: relative location within the installation folder of the summary statistics (default:profile_summary.yml
)sample_fraction
: fraction of data to sample for profiling.sample_seed
: seed for reproducible sampling.limit
: maximum number of records to analyze.
extra_params
: (optional) extra parameters to pass to the jobs such as result column names and user_metadatacustom_check_functions
: (optional) custom check functions defined in Python files that can be used in the quality checks.reference_tables
: (optional) reference tables that can be used in the quality checks.
Example of the configuration file (relevant fields only):
serverless_clusters: true # default is true, set to false to use standard clusters
run_configs:
- name: default
checks_location: catalog.table.checks # can be a table or file
input_config:
format: delta
location: /databricks-datasets/delta-sharing/samples/nyctaxi_2019
is_streaming: false # set to true if wanting to run it using streaming
output_config:
format: delta
location: main.nytaxi.output
mode: append
#checkpointLocation: /Volumes/catalog/schema/volume/checkpoint # only applicable if input_config.is_streaming is enabled
#trigger: # streaming trigger, only applicable if input_config.is_streaming is enabled
# availableNow: true
quarantine_config: # optional
format: delta
location: main.nytaxi.quarantine
mode: append
#checkpointLocation: /Volumes/catalog/schema/volume/checkpoint # only applicable if input_config.is_streaming is enabled
#trigger: # streaming trigger, only applicable if input_config.is_streaming is enabled
# availableNow: true
profiler_config:
limit: 1000
sample_fraction: 0.3
summary_stats_file: profile_summary_stats.yml
custom_check_functions: # optional
my_func: custom_checks/my_funcs.py # relative workspace path (installation folder prefix applied)
my_other: /Workspace/Shared/MyApp/my_funcs.py # absolute workspace path
email_mask: /Volumes/main/dqx_utils/custom/email.py # UC volume path
reference_tables: # optional
reference_vendor:
input_config:
format: delta
location: main.nytaxi.ref
Quality checking results
Quality check results are added as additional columns to the output or quarantine (if defined) DataFrame or tables (if saved). These columns capture the outcomes of the checks performed on the input data.
The result columns are named _error
and _warning
by default, but you can customize them as described in the additional configuration section.
The result columns can be used to monitor and track data quality issues and for further processing, such as using in a dashboard, or other downstream applications.
Below is a sample output of a check stored in a result column (error or warning):
[
{
"name": "col_city_is_null",
"message": "Column 'city' is null",
"columns": ["city"],
"filter": "country = 'Poland'",
"function": "is_not_null",
"run_time": "2025-01-01 14:31:21",
"user_metadata": {"key1": "value1", "key2": "value2"},
},
]
The structure of the result columns is an array of struct containing the following fields (see the exact structure here):
name
: name of the check (string type).message
: message describing the quality issue (string type).columns
: name of the column(s) where the quality issue was found (string type).filter
: filter applied to if any (string type).function
: rule/check function applied (string type).run_time
: timestamp when the check was executed (timestamp type).user_metadata
: optional key-value custom metadata provided by the user (dictionary type).
The below example demonstrates how to extract the results from a result columns in PySpark:
- Python
import pyspark.sql.functions as F
# apply quality checks
valid_df, invalid_df = dq_engine.apply_checks_and_split(input_df, checks)
# extract errors
results_df = invalid_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))
# extract warnings
results_df = invalid_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))
The results_df
will contain the following columns:
+------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
| name | message | columns | filter | function | run_time | user_metadata |
+------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
| col_city_is_null | Column 'city' is null | ['city'] | country = 'Poland' | is_not_null | 2025-01-01 14:31:21 | {} |
| ... | ... | ... | ... | ... | ... | ... |
+------------------+-----------------------+----------+--------------------+-------------+---------------------+----------------+
An example of how to provide user metadata can be found in the Additional Configuration section.