databricks.labs.dqx.checks_storage
DataFrameConverter Objects
class DataFrameConverter()
Handles conversion between DataFrames and check dictionaries.
from_dataframe
@staticmethod
def from_dataframe(df: DataFrame,
run_config_name: str = "default",
rule_set_fingerprint: str | None = None) -> list[dict]
Converts a list of quality checks defined in a DataFrame to a list of quality checks defined as Python dictionaries.
Arguments:
df- DataFrame with data quality check rules. Each row should define a check. Rows should have the following columns:- name - Name that will be given to a resulting column. Autogenerated if not provided.
- criticality (optional) - Possible values are error (data going only into "bad" dataframe) and warn (data is going into both dataframes).
- check - DQX check function used in the check; A StructType column defining the data quality check.
- filter - Expression for filtering data quality checks.
- run_config_name (optional) - Run configuration name for storing checks across runs.
- user_metadata (optional) - User-defined key-value pairs added to metadata generated by the check.
run_config_name- Run configuration name for filtering quality rules, e.g. input table or job name (use "default" if not provided).rule_set_fingerprint- Optional SHA-256 fingerprint of the rule set to load. When provided, filters to rules with this specific fingerprint, in addition to the run_config_name filter, instead of loading the latest batch.
Returns:
List of data quality check specifications as a Python dictionary
to_dataframe
@staticmethod
def to_dataframe(spark: SparkSession,
checks: list[dict],
run_config_name: str = "default",
rule_set_fingerprint: str | None = None,
created_at: datetime | None = None) -> DataFrame
Converts a list of quality checks defined as Python dictionaries to a DataFrame.
Arguments:
spark- Spark session.checks- list of check specifications as Python dictionaries. Each check consists of the following fields:- check - Column expression to evaluate. This expression should return string value if it's evaluated to true (it will be used as an error/warning message) or null if it's evaluated to false
- name - Name that will be given to a resulting column. Autogenerated if not provided
- criticality (optional) - Possible values are error (data going only into "bad" dataframe) and warn (data is going into both dataframes)
- filter (optional) - Expression for filtering data quality checks
- user_metadata (optional) - User-defined key-value pairs added to metadata generated by the check.
run_config_name- Run configuration name for storing quality checks across runs, e.g. input table or job name (use "default" if not provided).rule_set_fingerprint- Optional precomputed SHA-256 fingerprint of the rule set. When provided, avoids recomputing it.created_at- Timestamp to stamp each row with. Defaults to the current UTC time when not provided. Pass an explicit value in tests or whenever a deterministic timestamp is required.
Returns:
DataFrame with data quality check rules
Raises:
InvalidCheckError- If any check is invalid or unsupported.
ChecksStorageHandler Objects
class ChecksStorageHandler(ABC, Generic[T])
Abstract base class for handling storage of quality rules (checks).
load
@abstractmethod
def load(config: T) -> list[dict]
Load quality rules from the source. The returned checks can be used as input for apply_checks_by_metadata or apply_checks_by_metadata_and_split functions.
Arguments:
config- configuration for loading checks, including the table location and run configuration name.
Returns:
list of dq rules or raise an error if checks file is missing or is invalid.
save
@abstractmethod
def save(checks: list[dict], config: T) -> None
Save quality rules to the target.
TableChecksStorageHandler Objects
class TableChecksStorageHandler(ChecksStorageHandler[TableChecksStorageConfig]
)
Handler for storing quality rules (checks) in a Delta table in the workspace.
load
@telemetry_logger("load_checks", "table")
def load(config: TableChecksStorageConfig) -> list[dict]
Load checks (dq rules) from a Delta table in the workspace.
Arguments:
config- configuration for loading checks, including the table location and run configuration name.
Returns:
list of dq rules or raise an error if checks table is missing or is invalid.
Raises:
NotFound- if the table does not exist in the workspace
save
@telemetry_logger("save_checks", "table")
def save(checks: list[dict], config: TableChecksStorageConfig) -> None
Save checks to a Delta table in the workspace.
Arguments:
checks- list of dq rules to saveconfig- configuration for saving checks, including the table location and run configuration name.
Raises:
InvalidCheckError- If any check is invalid or unsupported.UnsafeSqlQueryError- If run_config_name contains unsafe SQL (e.g. DML/DDL keywords) when mode is overwrite.
Notes:
-
Idempotency- If the table already contains a rule set with the same rule_set_fingerprint for this run_config_name, the save is skipped and the method returns without writing. This applies regardless of config.mode (including "overwrite"). So a call with mode="overwrite" and a checks payload that hashes to an existing fingerprint will not overwrite (e.g. if only non-fingerprinted metadata changed).Mode behavior:
- overwrite: If the fingerprint differs, replaces all rows for this run_config_name with the new checks (via Delta replaceWhere). If the fingerprint matches, no write is performed.
- append: If the fingerprint differs, appends the new checks as additional rows for this run_config_name (multiple versions accumulate). If the fingerprint matches, no write is performed.
-
Concurrency- The idempotency check is not atomic for Delta tables. The check-then-write sequence (read fingerprint → skip if found → write) is not wrapped in a transaction, so two concurrent callers saving the same rule set may both pass the check and both write, producing duplicate rows in append mode or redundant overwrites in overwrite mode. This is benign in practice because duplicate rows with the same fingerprint are functionally equivalent and the load path always returns the latest version. For strict once-only guarantees use a single writer or apply external coordination (e.g. a Databricks job with a single task).
LakebaseChecksStorageHandler Objects
class LakebaseChecksStorageHandler(
ChecksStorageHandler[LakebaseChecksStorageConfig])
Handler for storing dq rules (checks) in a Lakebase table.
get_engine
def get_engine(config: LakebaseChecksStorageConfig) -> Engine
Create a SQLAlchemy engine for the Lakebase instance.
Arguments:
config- Configuration for saving and loading checks to Lakebase.
Returns:
SQLAlchemy engine for the Lakebase instance.
get_table_definition
@staticmethod
def get_table_definition(schema_name: str,
table_name: str,
versioning: bool = True) -> Table
Create a SQLAlchemy table definition for storing DQ rules (checks) in Lakebase.
Arguments:
schema_name- The schema where the checks table is located.table_name- The table where the checks are stored.versioning- If True (default), include versioning columns (created_at, rule_fingerprint, rule_set_fingerprint). Pass False for legacy tables that predate versioning support.
Returns:
SQLAlchemy table definition for the Lakebase instance.
load
@telemetry_logger("load_checks", "lakebase")
def load(config: LakebaseChecksStorageConfig) -> list[dict]
Load dq rules (checks) from a Lakebase table.
Arguments:
config- Configuration for saving and loading checks to Lakebase.
Returns:
List of dq rules or error if loading checks fails.
Raises:
NotFound- If the table does not exist in the Lakebase instance.ProgrammingError- If SQL syntax errors or missing objects (converted to NotFound for missing tables).DatabaseError- If other database operations fail (includes OperationalError, IntegrityError, etc.).
save
@telemetry_logger("save_checks", "lakebase")
def save(checks: list[dict], config: LakebaseChecksStorageConfig) -> None
Save dq rules (checks) to a Lakebase table.
Arguments:
checks- List of dq rules (checks) to save.config- Configuration for saving and loading checks to Lakebase.
Returns:
None
Raises:
InvalidCheckError- If any check is invalid or unsupported.IntegrityError- If constraint violations occur (e.g., duplicate keys).ProgrammingError- If SQL syntax errors or missing objects.DatabaseError- If other database operations fail (includes OperationalError, DataError, etc.).
WorkspaceFileChecksStorageHandler Objects
class WorkspaceFileChecksStorageHandler(
ChecksStorageHandler[WorkspaceFileChecksStorageConfig])
Handler for storing quality rules (checks) in a file (json or yaml) in the workspace.
load
@telemetry_logger("load_checks", "workspace_file")
def load(config: WorkspaceFileChecksStorageConfig) -> list[dict]
Load checks (dq rules) from a file (json or yaml) in the workspace. This does not require installation of DQX in the workspace.
Arguments:
config- configuration for loading checks, including the file location and storage type.
Returns:
list of dq rules or raise an error if checks file is missing or is invalid.
Raises:
NotFound- if the checks file is not found in the workspace.InvalidCheckError- if the checks file cannot be parsed.
save
@telemetry_logger("save_checks", "workspace_file")
def save(checks: list[dict], config: WorkspaceFileChecksStorageConfig) -> None
Save checks (dq rules) to yaml file in the workspace. This does not require installation of DQX in the workspace.
Arguments:
checks- list of dq rules to saveconfig- configuration for saving checks, including the file location and storage type.
FileChecksStorageHandler Objects
class FileChecksStorageHandler(ChecksStorageHandler[FileChecksStorageConfig])
Handler for storing quality rules (checks) in a file (json or yaml) in the local filesystem.
load
def load(config: FileChecksStorageConfig) -> list[dict]
Load checks (dq rules) from a file (json or yaml) in the local filesystem.
Arguments:
config- configuration for loading checks, including the file location.
Returns:
list of dq rules or raise an error if checks file is missing or is invalid.
Raises:
FileNotFoundError- if the file path does not existInvalidCheckError- if the checks file cannot be parsed
save
def save(checks: list[dict], config: FileChecksStorageConfig) -> None
Save checks (dq rules) to a file (json or yaml) in the local filesystem.
Arguments:
checks- list of dq rules to saveconfig- configuration for saving checks, including the file location.
Raises:
FileNotFoundError- if the file path does not exist
InstallationChecksStorageHandler Objects
class InstallationChecksStorageHandler(
ChecksStorageHandler[InstallationChecksStorageConfig],
InstallationMixin)
Handler for storing quality rules (checks) defined in the installation configuration.
load
@telemetry_logger("load_checks", "installation")
def load(config: InstallationChecksStorageConfig) -> list[dict]
Load checks (dq rules) from the installation configuration.
Arguments:
config- configuration for loading checks, including the run configuration name and method.
Returns:
list of dq rules or raise an error if checks file is missing or is invalid.
Raises:
NotFound- if the checks file or table is not found in the installation.InvalidCheckError- if the checks file cannot be parsed.
save
@telemetry_logger("save_checks", "installation")
def save(checks: list[dict], config: InstallationChecksStorageConfig) -> None
Save checks (dq rules) to yaml file or table in the installation folder. This will overwrite existing checks file or table.
Arguments:
checks- list of dq rules to saveconfig- configuration for saving checks, including the run configuration name, method, and table location.
VolumeFileChecksStorageHandler Objects
class VolumeFileChecksStorageHandler(
ChecksStorageHandler[VolumeFileChecksStorageConfig])
Handler for storing quality rules (checks) in a file (json or yaml) in a Unity Catalog volume.
load
@telemetry_logger("load_checks", "volume")
def load(config: VolumeFileChecksStorageConfig) -> list[dict]
Load checks (dq rules) from a file (json or yaml) in a Unity Catalog volume.
Arguments:
config- configuration for loading checks, including the file location and storage type.
Returns:
list of dq rules or raise an error if checks file is missing or is invalid.
Raises:
NotFound- if the checks file is not found in the workspace.InvalidCheckError- if the checks file cannot be parsed.CheckDownloadError- if there is an error downloading the file from the volume.
save
@telemetry_logger("save_checks", "volume")
def save(checks: list[dict], config: VolumeFileChecksStorageConfig) -> None
Save checks (dq rules) to yaml file in a Unity Catalog volume. This does not require installation of DQX in a Unity Catalog volume.
Arguments:
checks- list of dq rules to saveconfig- configuration for saving checks, including the file location and storage type.
BaseChecksStorageHandlerFactory Objects
class BaseChecksStorageHandlerFactory(ABC)
Abstract base class for factories that create storage handlers for checks.
create
@abstractmethod
def create(config: BaseChecksStorageConfig) -> ChecksStorageHandler
Abstract method to create a handler based on the type of the provided configuration object.
Arguments:
config- Configuration object for loading or saving checks.
Returns:
An instance of the corresponding BaseChecksStorageHandler.
create_for_location
@abstractmethod
def create_for_location(
location: str,
run_config_name: str = "default"
) -> tuple[ChecksStorageHandler, BaseChecksStorageConfig]
Abstract method to create a handler and config based on checks location.
Arguments:
location- location of the checks (file path, table name, volume, etc.).run_config_name- the name of the run configuration to use for checks, e.g. input table or job name (use "default" if not provided).
Returns:
An instance of the corresponding BaseChecksStorageHandler.
create_for_run_config
@abstractmethod
def create_for_run_config(
run_config: RunConfig
) -> tuple[ChecksStorageHandler, BaseChecksStorageConfig]
Abstract method to create a handler and config based on a RunConfig.
This method inspects the RunConfig to determine the appropriate storage handler. If Lakebase connection parameters are present (lakebase_instance_name), it creates a LakebaseChecksStorageHandler. Otherwise, it delegates to create_for_location to infer the handler from the checks location string.
Arguments:
run_config- RunConfig containing checks location and optional Lakebase parameters.
Returns:
A tuple of (ChecksStorageHandler, BaseChecksStorageConfig).
ChecksStorageHandlerFactory Objects
class ChecksStorageHandlerFactory(BaseChecksStorageHandlerFactory)
create
def create(config: BaseChecksStorageConfig) -> ChecksStorageHandler
Factory method to create a handler based on the type of the provided configuration object.
Arguments:
config- Configuration object for loading or saving checks.
Returns:
An instance of the corresponding BaseChecksStorageHandler.
Raises:
InvalidConfigError- If the configuration type is unsupported.
create_for_run_config
def create_for_run_config(
run_config: RunConfig
) -> tuple[ChecksStorageHandler, BaseChecksStorageConfig]
Factory method to create a handler and config based on a RunConfig.
This method inspects the RunConfig to determine the appropriate storage handler. If Lakebase connection parameters are present (lakebase_instance_name), it creates a LakebaseChecksStorageHandler. Otherwise, it delegates to create_for_location to infer the handler from the checks location string.
Arguments:
run_config- RunConfig containing checks location and optional Lakebase parameters.
Returns:
A tuple of (ChecksStorageHandler, BaseChecksStorageConfig).
Raises:
InvalidConfigError- If the configuration is invalid or unsupported.
is_table_location
def is_table_location(location: str) -> bool
True if location points to a Delta table (catalog.schema.table) and is not a file path with a known checks serializer extension.
Arguments:
locationstr - The checks location to validate.
Returns:
bool- True if the location is a valid table name and not a file path, False otherwise.