Loading and Storing Quality Checks
DQX provides flexible methods to load and save quality checks (rules) defined as metadata (a list of dictionaries) from different storage backends, making it easier to manage, share, and reuse checks across workflows and environments. For the checks table schema and relationships to other DQX tables, see Table Schemas and Relationships.
Saving quality checks to a storage
You can save quality checks defined by metadata (list of dictionaries) or generated by the profiler to various storage locations.
For the save methods to work, checks must be defined declaratively as metadata (list of dictionaries).
If you create checks as a list of DQRule objects, you can convert them to metadata format and back using the methods described here.
- Python
- Workflows
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import (
FileChecksStorageConfig,
WorkspaceFileChecksStorageConfig,
InstallationChecksStorageConfig,
TableChecksStorageConfig,
VolumeFileChecksStorageConfig,
LakebaseChecksStorageConfig,
)
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# define checks as a list of dictionaries
checks: list[dict] = yaml.safe_load("""
- criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
- criticality: error
check:
function: is_not_null
for_each_column:
- col1
- col2
# ...
""")
# save checks as a YAML file in the local filesystem (overwrite the file) using relative or absolute path
# also works with absolute and relative workspace paths if invoked from Databricks notebook or job
dq_engine.save_checks(checks, config=FileChecksStorageConfig(location="checks.yml"))
# save checks as a YAML file in arbitrary workspace location (overwrite the file) using absolute path
dq_engine.save_checks(checks, config=WorkspaceFileChecksStorageConfig(location="/Shared/App1/checks.yml"))
# save checks in a Delta table to "default" run config, append checks
dq_engine.save_checks(checks, config=TableChecksStorageConfig(location="catalog.schema.checks_table", mode="append"))
# save checks in a Delta table with specific run config for filtering (it can be any string, e.g. input table or job name), overwrite checks
dq_engine.save_checks(checks, config=TableChecksStorageConfig(location="catalog.schema.checks_table", run_config_name="main.default.input_table", mode="overwrite"))
# save checks as a YAML in a Unity Catalog Volume location (overwrite the file)
dq_engine.save_checks(checks, config=VolumeFileChecksStorageConfig(location="/Volumes/dq/config/checks_volume/App1/checks.yml"))
# save checks as a Lakebase table using a Databricks service principal
dq_engine.save_checks(checks, config=LakebaseChecksStorageConfig(instance_name="my-instance", user="00000000-0000-0000-0000-000000000000", location="dqx.config.checks"))
# save checks as a YAML file or table defined in 'checks_location' of the run config
# only works if DQX is installed in the workspace
# the run config name can be any string, e.g. input table or job name
dq_engine.save_checks(checks, config=InstallationChecksStorageConfig(assume_user=True, run_config_name="main.default.input_table"))
When using the profiler workflow to generate quality check candidates, the checks are saved to the location specified in the checks_location field of the configuration file.
Saving quality rules in a Delta table without using DQX methods
Quality rules can be stored in a Delta table as showcased above using DQEngine.
You can also store them directly in a Delta table using Spark DataFrame API.
When using DQEngine.save_checks, you can pass checks with for_each_column; DQX stores them in compact format (one row per check). When creating a DataFrame manually (without DQX), you can use either compact format (one row with for_each_column) or expanded format (one row per column).
The minimal schema for manual creation has no versioning columns. When you first use DQEngine.save_checks, DQX adds created_at, rule_fingerprint, and rule_set_fingerprint via ALTER TABLE if missing.
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.config import TableChecksStorageConfig
# Minimal schema (no versioning columns). DQX adds created_at, rule_fingerprint, rule_set_fingerprint when saving.
schema = (
"name STRING, "
"criticality STRING, "
"check STRUCT<function STRING, for_each_column ARRAY<STRING>, arguments MAP<STRING, STRING>>, "
"filter STRING, "
"run_config_name STRING, "
"user_metadata MAP<STRING, STRING>"
)
checks = [
[
None,
"error",
{"function": "is_not_null", "for_each_column": ["col1", "col2"], "arguments": {}},
"col1 > 0",
"default",
{"check_owner": "someone@email.com"},
],
[
"column_not_less_than",
"warn",
{"function": "is_not_less_than", "arguments": {"column": "col_2", "limit": 1}},
None,
"default",
None,
],
[
"column_in_list",
"warn",
{"function": "is_in_list", "arguments": {"column": "col_2", "allowed": [1, 2]}},
None,
"default",
None,
],
]
# save checks
df = spark.createDataFrame(checks, schema)
df.write.format("delta").mode("overwrite").saveAsTable("main.default.dqx_checks_table")
# load checks as dict from the delta table
dq_engine = DQEngine(WorkspaceClient())
loaded_checks = dq_engine.load_checks(config=TableChecksStorageConfig(location="main.default.dqx_checks_table"))
# validate loaded checks
assert not dq_engine.validate_checks(loaded_checks).has_errors
Loading quality checks from a storage
You can load quality checks from various storage locations. You can then apply the loaded checks using the methods described here.
For the save methods to work, checks must be defined as a list of dictionaries.
If you create checks as a list of DQRule objects, you can convert them using the serialize_checks function, as described here.
- Python
- Workflows
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import (
FileChecksStorageConfig,
WorkspaceFileChecksStorageConfig,
InstallationChecksStorageConfig,
TableChecksStorageConfig,
VolumeFileChecksStorageConfig,
LakebaseChecksStorageConfig,
)
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# load checks from a local file using relative or absolute path
# also works for absolute and relative workspace paths if invoked from Databricks notebook or job
checks: list[dict] = dq_engine.load_checks(config=FileChecksStorageConfig(location="checks.yml"))
# load checks from a local file with variable substitution
checks: list[dict] = dq_engine.load_checks(
FileChecksStorageConfig(location="checks.yml"),
variables={"threshold": 100, "column_name": "total_amount"}
)
# load checks from arbitrary workspace location using absolute path
checks: list[dict] = dq_engine.load_checks(config=WorkspaceFileChecksStorageConfig(location="/Shared/App1/checks.yml"))
# load checks from a Delta table and default run config name
checks: list[dict] = dq_engine.load_checks(config=TableChecksStorageConfig(location="catalog.schema.checks_table"))
# load checks from a Delta table with specific run config for filtering (it can be any string, e.g. input table or job name); loads the latest rule set
checks: list[dict] = dq_engine.load_checks(config=TableChecksStorageConfig(location="catalog.schema.checks_table", run_config_name="main.default.input_table"))
# load checks from a Delta table with specific rule_set_fingerprint for filtering (it can be any rule_set_fingerprint from checks_table)
checks: list[dict] = dq_engine.load_checks(config=TableChecksStorageConfig(location="catalog.schema.checks_table", rule_set_fingerprint="9664332437da274d921cefac60bd509e0aa383292ba695341e1f3fbc2a716e48"))
# load checks from a Unity Catalog Volume
checks: list[dict] = dq_engine.load_checks(config=VolumeFileChecksStorageConfig(location="/Volumes/dq/config/checks_volume/App1/checks.yml"))
# load checks from a Lakebase table using a Databricks service principal
checks: list[dict] = dq_engine.load_checks(config=LakebaseChecksStorageConfig(instance_name="my-instance", user="00000000-0000-0000-0000-000000000000", location="dqx.config.checks"))
# load checks from a Lakebase table using specific rule_set_fingerprint for filtering (it can be any rule_set_fingerprint from checks_table)
checks: list[dict] = dq_engine.load_checks(config=LakebaseChecksStorageConfig(instance_name="my-instance", user="00000000-0000-0000-0000-000000000000", location="dqx.config.checks", rule_set_fingerprint="9664332437da274d921cefac60bd509e0aa383292ba695341e1f3fbc2a716e48"))
# load checks from a file or table defined in the run config ('checks_location' field)
# only works if DQX is installed in the workspace
# the run config name is a string (e.g. input table or job name)
checks: list[dict] = dq_engine.load_checks(config=InstallationChecksStorageConfig(run_config_name="main.default.input_table"))
# validate loaded checks
assert not dq_engine.validate_checks(checks).has_errors
When using the quality checker or e2e workflows to apply quality checks, they load checks from the checks_location field defined in the configuration file.
Wrong types, unknown arguments, and missing required check function parameters are reported by DQEngine.validate_checks.
load_checks and save_checks (except delta storage) methods do not validate the returned/provided metadata. Call validate_checks after load / before save when you want to catch problems before apply (for example hand edited YAML/JSON or checks written to a table without going through DQX).
For field semantics and validation details, see Quality checks definition.
Supported storage backends
Saving and loading methods accept a storage backend configuration as input.
The following backend configuration are currently supported:
FileChecksStorageConfig: local files (JSON/YAML), or workspace files if invoked from Databricks notebook or job. Containing fields:location: absolute or relative file path in the local filesystem (JSON or YAML); also works with absolute or relative workspace file paths if invoked from Databricks notebook or job.
WorkspaceFileChecksStorageConfig: workspace files (JSON/YAML) using absolute paths. Containing fields:location: absolute workspace file path (JSON or YAML).
TableChecksStorageConfig: Unity Catalog tables. Containing fields:location: table fully qualified name.run_config_name: (optional) run configuration name to load (it can be any string), e.g. input table or job name (use "default" if not provided).mode: (optional) write mode for saving checks (overwriteorappend, default isappend). overwrite: replaces all rows for this run_config_name when the fingerprint differs; skips when fingerprint exists. append: adds new rows when fingerprint differs (multiple versions accumulate); skips when fingerprint exists.rule_set_fingerprint: (optional) SHA-256 fingerprint of the rule set to load. When provided, loads rules matching this specific fingerprint instead of the latest batch.
VolumeFileChecksStorageConfig: Unity Catalog Volume (JSON/YAML file). Containing fields:location: Unity Catalog Volume file path (JSON or YAML).
LakebaseChecksStorageConfig: Lakebase table. Uses the same structure and compact format as Delta tables (for_each_columnpreserved). Containing fields:instance_name: name of the Lakebase instance, e.g., "my-instance".user: user to connect to the Lakebase instance, e.g., "user@domain.com" or Databricks service principal client ID.location: fully-qualified table name in the format "database.schema.table".port: (optional) port on which to connect to the Lakebase instance (use 5432 if not provided).run_config_name: (optional) run configuration name to load (use "default" if not provided).mode: (optional) write mode for saving checks (overwriteorappend, default isappend). overwrite: replaces all rows for this run_config_name when the fingerprint differs; skips when fingerprint exists. append: adds new rows when fingerprint differs (multiple versions accumulate); skips when fingerprint exists.rule_set_fingerprint: (optional) Optional SHA-256 fingerprint of the rule set to load. When provided, loads rules matching this specific fingerprint instead of the latest batch.
InstallationChecksStorageConfig: installation-managed location from the run config, ignores location and infers it fromchecks_locationin the run config. Containing fields:location(optional): automatically set based on thechecks_locationfield from the run configuration.install_folder: (optional) installation folder where DQX is installed, only required when custom installation folder is used.run_config_name(optional) - run configuration name to load (it can be any string), e.g. input table or job name (use "default" if not provided).product_name: (optional) name of the product (use "dqx" if not provided).assume_user: (optional) if True, assume user installation, otherwise global installation (skipped ifinstall_folderis provided).- the config inherits from the specific configs such as
WorkspaceFileChecksStorageConfig,TableChecksStorageConfig,VolumeFileChecksStorageConfig, andLakebaseChecksStorageConfigso relevant fields from these specific configs can be provided (e.g. instance_name and user for lakebase).
You can find details on how to define checks here.
Calculating fingerprints manually
When loading from Delta or Lakebase tables, you typically pass a run_config_name for filtering.
Alternatively, you can filter by rule_set_fingerprint to load a specific version of the rule set.
You can also use the fingerprint to verify that the current rule set matches the one used to apply the checks.
Single rule fingerprint
Use compute_rule_fingerprint to compute the fingerprint of a single rule from its metadata (dict). This is useful when joining row-level results with the checks table or when you need to identify a specific rule.
from databricks.labs.dqx.rule_fingerprint import compute_rule_fingerprint
check: dict = {
"criticality": "error",
"check": {"function": "is_not_null", "arguments": {"column": "id"}},
}
fingerprint = compute_rule_fingerprint(check)
# e.g. "f921efe3649267b67b9563861e0eee3adcbccd1ae8f38f86ac1467aeffea75eb"
When you have a DQRule instance, use the rule_fingerprint property instead:
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx import check_funcs
rule = DQRowRule(check_func=check_funcs.is_not_null, column="id", criticality="error")
fingerprint = rule.rule_fingerprint # same as compute_rule_fingerprint(rule.to_dict())
Rule set fingerprint
The rule_fingerprint module provides two functions to compute the rule set fingerprint (fingerprint for set of rules that are applied together):
From metadata
Use compute_rule_set_fingerprint_by_metadata when you have checks as a list of dictionaries (e.g. from YAML or JSON).
from databricks.labs.dqx.rule_fingerprint import compute_rule_set_fingerprint_by_metadata
from databricks.labs.dqx.config import TableChecksStorageConfig
checks: list[dict] = [
{"criticality": "error", "check": {"function": "is_not_null", "arguments": {"column": "id"}}},
{"criticality": "warn", "check": {"function": "is_not_null_and_not_empty", "for_each_column": ["col1", "col2"], "arguments": {}}},
]
fingerprint = compute_rule_set_fingerprint_by_metadata(checks)
# e.g. "9664332437da274d921cefac60bd509e0aa383292ba695341e1f3fbc2a716e48"
# Use when loading
loaded_checks = dq_engine.load_checks(config=TableChecksStorageConfig(location="catalog.schema.checks_table", rule_set_fingerprint=fingerprint))
From DQRule objects
Use compute_rule_set_fingerprint when you already have a list of DQRule objects (e.g. after building rules programmatically or deserializing).
from databricks.labs.dqx.rule import DQRule
from databricks.labs.dqx.rule_fingerprint import compute_rule_set_fingerprint
from databricks.labs.dqx.checks_serializer import deserialize_checks
rules: list[DQRule] = [...] # your DQRule instances
fingerprint = compute_rule_set_fingerprint(rules)
The fingerprint is a deterministic SHA-256 hash of the rule set. The order of checks does not matter. for_each_column is included in the fingerprint (sorted for determinism).
user_metadata is intentionally not included in the fingerprint. Two rule sets that differ only in user_metadata values will produce the same fingerprint. As a result, calling save_checks with a checks payload that is identical to an already-saved version — except for user_metadata — will be treated as a no-op and no new version will be written. If you need to record a new version, change the rule logic (function, arguments, filter, name, or criticality).