User guide
Data Profiling and Quality Rules Generation
Data profiling can be run to profile the input data and generate quality rule candidates with summary statistics. The generated data quality rules (checks) candidates can be used as input for the quality checking (see Adding quality checks to the application). In addition, the DLT generator can generate native Delta Live Tables (DLT) expectations.
Data profiling is typically performed as a one-time action for the input dataset to discover the initial set of quality rule candidates. The check candidates should be manually reviewed before being applied to the data. This is not intended to be a continuously repeated or scheduled process, thereby also minimizing concerns regarding compute intensity and associated costs.
- Python
- CLI
Profiling and generating DQX rules/checks:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
input_df = spark.read.table("catalog1.schema1.table1")
# profile input data
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
dq_engine = DQEngine(ws)
# save checks in arbitrary workspace location
dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")
# save checks in the installation folder specified in the default run config (only works if DQX is installed in the workspace)
dq_engine.save_checks_in_installation(checks, run_config_name="default")
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)
The profiler samples 30% of the data (sample ratio = 0.3) and limits the input to 1000 records by default. These and other configuration options can be customized as detailed in the Profiling Options section.
You can optionally install DQX in the workspace (see the Installation Guide). A config, dashboard, and profiler workflow are installed as part of the installation. The workflow can be run manually in the workspace UI or using the CLI as below.
DQX operates exclusively at the PySpark dataframe level and does not interact directly with databases or storage systems. DQX does not persist data after performing quality checks, meaning users must handle data storage themselves. Since DQX does not manage the input location, output table, or quarantine table, it is the user's responsibility to store or persist the processed data as needed.
Open the config to check available run configs and adjust the settings if needed:
databricks labs dqx open-remote-config
See the example config below:
log_level: INFO
version: 1
run_configs:
- name: default # <- unique name of the run config (default used during installation)
input_location: s3://iot-ingest/raw # <- Input location for profiling (UC table or cloud path)
input_format: delta # <- format, required if cloud path provided
output_table: main.iot.silver # <- output UC table used in quality dashboard
quarantine_table: main.iot.quarantine # <- quarantine UC table used in quality dashboard
checks_file: iot_checks.yml # <- relative location of the quality rules (checks) defined as json or yaml
profile_summary_stats_file: iot_profile_summary_stats.yml # <- relative location of profiling summary stats
warehouse_id: your-warehouse-id # <- warehouse id for refreshing dashboard
profiler_sample_fraction: 0.3 # <- fraction of data to sample in the profiler (30%)
profiler_limit: 1000 # <- limit the number of records to profile
- name: another_run_config # <- unique name of the run config
...
Run profiler workflow:
databricks labs dqx profile --run-config "default"
The generated quality rule candidates and summary statistics will be in the installation folder, as defined in the run config. The "default" run config will be used if the run config is not provided. The run config is used to select specific run configuration from the 'config.yml'.
The following DQX configuration from 'config.yml' is used by the profiler workflow:
- 'input_location': input data as a path or a table.
- 'input_format': input data format. Required if input data is a path.
- 'checks_file': relative location of the generated quality rule candidates as
yaml
orjson
file inside the installation folder (default:checks.yml
). - 'profile_summary_stats_file': relative location of the summary statistics (default:
profile_summary.yml
) inside the installation folder.
Logs are printed in the console and saved in the installation folder. You can display the logs from the latest profiler workflow run by executing:
databricks labs dqx logs --workflow profiler
Adding quality checks to the application
DQX offers a set of predefined quality rules (checks) to leverage. See details and list of all check functions here. The quality checking can be done on simple column types and complex types like structs, maps and arrays. Additionally, you can define custom checks to meet specific requirements. Learn more here.
Quality rules configured in a file
Quality rules can be stored in a yaml
or json
file. Below is an example yaml
file ('checks.yml') defining several checks:
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
- name: col_col3_is_null_or_empty
# criticality not provided, default "error" criticality will be used
check:
function: is_not_null_and_not_empty
arguments:
col_name: col3
- criticality: warn
check:
function: is_in_list
arguments:
col_name: col4
allowed:
- 1
- 2
Fields:
criticality
: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both "good" and "bad" dataframes). If not provided, the default is "error".check
: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name asstr
or sql expression the check will be applied for) or "col_names" (column names asarray
the check will be applied for).- (optional)
name
for the check: autogenerated if not provided.
Loading and execution methods
Checks can be loaded from a file in the installation folder, workspace, or local file system. The engine will raise an error if the checks file contains invalid JSON or YAML definition.
Checks loaded from a file can be applied using one of the following methods:
apply_checks_by_metadata_and_split
: splits the input data into valid and invalid (quarantined) dataframes.apply_checks_by_metadata
: report issues as additional columns.
Syntax of the loaded checks is validated automatically as part of these methods. In addition, you can also perform a standalone syntax validation of the checks as described here.
Method 1: Loading checks from a workspace file in the installation folder
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# load check file specified in the run configuration (if DQX installed in the workspace)
checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_name="default")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Method 2: Loading checks from an arbitrary workspace file
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/checks.yml")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Method 3: Loading checks from a local file
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = DQEngine.load_checks_from_local_file("checks.yml")
dq_engine = DQEngine(WorkspaceClient())
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Quality rules defined in code
Method 1: Using DQX classes
Checks defined using DQX classes can applied using one of the following methods:
apply_checks_and_split
: if you want to split the checked data into valid and invalid (quarantined) dataframes.apply_checks
: if you want to report issues as additional columns.
Example:
- Python
from databricks.labs.dqx.col_check_functions import is_not_null, is_not_null_and_not_empty, is_in_list, is_in_range
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQColSetRule, DQColRule
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = [
DQColRule( # define rule for a single column
name="col3_is_null_or_empty",
criticality="warn",
check_func=is_not_null_and_not_empty,
col_name="col3"
),
DQColRule( # define rule with a filter
name="col_4_is_null_or_empty",
criticality="warn",
filter="col1 < 3",
check_func=is_not_null_and_not_empty,
col_name="col4"
),
DQColRule( # provide check func arguments using positional arguments
# if no name is provided, it is auto-generated
criticality="warn",
check_func=is_in_list,
col_name="col1",
check_func_args=[[1, 2]]
),
DQColRule( # provide check func arguments using keyword arguments
# criticality not provided, default "error" criticality will be used
check_func=is_in_list,
col_name="col2",
check_func_kwargs={"allowed": [1, 2]}
),
] + DQColSetRule( # define rule for multiple columns at once, name auto-generated if not provided
columns=["col1", "col2"],
criticality="error",
check_func=is_not_null
).get_rules()
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
The validation of arguments and keyword arguments for the check function is automatically performed upon creating a DQColRule
.
Method 2: Using metadata config (yaml/json)
Checks defined as metadata in yaml
or json
can applied using one of the following methods:
apply_checks_by_metadata_and_split
: if you want to split the checked data into valid and invalid (quarantined) dataframes.apply_checks_by_metadata
: if you want to report issues as additional columns.
Example:
- Python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
- criticality: error
check:
function: is_not_null_and_not_empty
arguments:
col_name: col3
- criticality: error
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
col_name: col4
- criticality: warn
check:
function: is_in_list
arguments:
col_name: col4
allowed:
- 1
- 2
""")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Integration with DLT (Delta Live Tables)
DLT provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. The example below demonstrates integrating DQX with DLT to provide comprehensive quality information. The DQX integration with DLT does not use DLT Expectations but DQX's own methods.
Option 1: Apply quality rules and quarantine bad records
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = ... # quality rules / checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)
Option 2: Apply quality rules and report issues as additional columns
- Python
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df
Validating syntax of quality checks defined in yaml/json
You can validate the syntax of checks defined as metadata in yaml
or json
format before applying them. This validation ensures that the checks are correctly defined and can be interpreted by the DQX engine.
The validation cannot be used for checks defined using DQX classes. When checks are defined with DQX classes, syntax validation is unnecessary because the application will fail to interpret them if the DQX objects are constructed incorrectly.
- Python
- CLI
import yaml
from databricks.labs.dqx.engine import DQEngine
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
""")
status = DQEngine.validate_checks(checks)
print(status)
Validate checks stored in the installation folder:
databricks labs dqx validate-checks --run-config "default"
The following DQX configuration from 'config.yml' will be used by default:
- 'checks_file': relative location of the quality rules defined as
yaml
orjson
inside the installation folder (default:checks.yml
).
Validating quality rules are typically done as part of the CI/CD process to ensure checks are ready to use in the application.
Quality Check Results
The quality check results are reported as additional columns in the output DataFrame.
The reporting columns are named _error
and _warning
by default, but you can customize them as described in the Additional Configuration section.
The reporting columns can be used to monitor and track data quality issues and for further processing, such as using in a dashboard, or other downstream applications.
Below is a sample output of a check as stored in a reporting column:
[
{
"name": "col_city_is_null",
"message": "Column city is null",
"col_name": "city",
"filter": "country = 'Poland'",
"function": "is_not_null",
"run_time": "2025-01-01 14:31:21",
"user_metadata": {"key1": "value1", "key2": "value2"},
},
]
The structure of the reporting columns is an array of struct containing the following fields (see the exact structure here):
name
: name of the check (string type).message
: message describing the quality issue (string type).col_name
: name of the column where the quality issue was found (string type).filter
: filter applied to the column if any (string type).function
: rule/check function applied (string type).run_time
: timestamp when the check was executed (timestamp type).user_metadata
: optional key-value custom metadata provided by the user (dictionary type).
The below example demonstrates how to extract the results from a reporting column in PySpark:
- Python
import pyspark.sql.functions as F
# apply quality checks
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)
# extract errors
results_df = quarantined_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))
# extract warnings
results_df = quarantined_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))
# The results_df will contain the following columns:
# +------------------+---------------------+----------+--------------------+-------------+---------------------+----------------+
# | name | message | col_name | filter | function | run_time | user_metadata |
# +------------------+---------------------+----------+--------------------+-------------+---------------------+----------------+
# | col_city_is_null | Column city is null | city | country = 'Poland' | is_not_null | 2025-01-01 14:31:21 | {} |
# | ... | ... | ... | ... | ... | ... | ... |
# +------------------+---------------------+----------+--------------------+-------------+---------------------+----------------+
An example of how to provide user metadata can be found in the Additional Configuration section.
Data Quality Dashboard
The data quality dashboard is automatically installed in the dashboards
folder of the workspace installation directory when you install DQX in the Databricks workspace. For more details on the installation process, see the Installation Guide.
The dashboard lets you monitor and track data quality issues easily. You can customize them to align with your specific requirements.
The dashboard is not scheduled to refresh automatically by default, minimizing concerns regarding associated cluster costs. When you open a dashboard, refresh it manually to view the latest data. However, as needed, you can configure the dashboard to refresh periodically.
- CLI
- Python
You can locate the dashboard using Databricks workspace UI directly or use the following command:
databricks labs dqx open-dashboards
After executing the command:
- Locate and click on a dashboard file in the workspace UI.
- Open the dashboard and click
Refresh
to load the latest data.
You can locate the dashboard using Databricks workspace UI directly or use the following code:
from databricks.labs.dqx.contexts.workspace import WorkspaceContext
ctx = WorkspaceContext(WorkspaceClient())
dashboards_folder_link = f"{ctx.installation.workspace_link('')}dashboards/"
print(f"Open a dashboard from the following folder and refresh it:")
print(dashboards_folder_link)
DQX dashboard(s) only use the quarantined table for queries as defined in config.yml
during installation.
If you change the quarantine table in the run config after the deployment (quarantine_table
field), you must update the dashboard queries accordingly.
Additional Configuration
Profiling options
Profiler will sample the input data by default with a factor of 0.3 (30%) and limit the input to 1000 records. You can adjust these and other parameters as follows:
- Python
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
input_df = spark.read.table("catalog1.schema1.table1")
default_profile_options = {
"round": True, # round the min/max values
"max_in_count": 10, # generate is_in if we have less than 1 percent of distinct values
"distinct_ratio": 0.05, # generate is_distinct if we have less than 1 percent of distinct values
"max_null_ratio": 0.01, # generate is_null if we have less than 1 percent of nulls
"remove_outliers": True, # remove outliers
"outlier_columns": [], # remove outliers in the columns
"num_sigmas": 3, # number of sigmas to use when remove_outliers is True
"trim_strings": True, # trim whitespace from strings
"max_empty_ratio": 0.01, # generate is_empty if we have less than 1 percent of empty strings
"sample_fraction": 0.3, # fraction of data to sample (30%)
"sample_seed": None, # seed for sampling
"limit": 1000, # limit the number of samples
}
columns_to_profile = ["col1", "col2"]
# profile input data
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df, cols=columns_to_profile, opts=profile_options)
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
Adding User Metadata to the Results
You can provide user metadata to the results by specifying extra parameters when creating the engine.
The custom key-value metadata will be included in every quality check result inside the user_metadata
field.
- Python
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import ExtraParams
user_metadata = {"key1": "value1", "key2": "value2"}
# use ExtraParams to configure one or more optional parameters
extra_parameters = ExtraParams(user_metadata=user_metadata)
ws = WorkspaceClient()
dq_engine = DQEngine(ws, extra_params=extra_parameters)
Customizing Reporting Columns
By default, DQX appends _error
and _warning
reporting columns to the output DataFrame to flag quality issues.
You can customize the names of these reporting columns by specifying extra parameters when creating the engine.
- Python
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import ExtraParams
custom_column_names = {"errors": "dq_errors", "warnings": "dq_warnings"}
# use ExtraParams to configure one or more optional parameters
extra_parameters = ExtraParams(column_names=custom_column_names)
ws = WorkspaceClient()
dq_engine = DQEngine(ws, extra_params=extra_parameters)