Skip to main content

databricks.labs.dqx.engine

DQEngineCore Objects

class DQEngineCore(DQEngineCoreBase)

Core engine to apply data quality checks to a DataFrame.

Arguments:

  • workspace_client - WorkspaceClient instance used to access the workspace.
  • spark - Optional SparkSession to use. If not provided, the active session is used.
  • extra_params - Optional extra parameters for the engine, such as result column names and run metadata.

apply_checks

def apply_checks(df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame

Apply data quality checks to the given DataFrame.

Arguments:

  • df - Input DataFrame to check.
  • checks - List of checks to apply. Each check must be a DQRule instance.
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame with errors and warnings result columns.

Raises:

  • InvalidCheckError - If any of the checks are invalid.

apply_checks_and_split

def apply_checks_and_split(
df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]

Apply data quality checks to the given DataFrame and split the results into two DataFrames ("good" and "bad").

Arguments:

  • df - Input DataFrame to check.
  • checks - List of checks to apply. Each check must be a DQRule instance.
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows with errors or warnings and the corresponding result columns).

Raises:

  • InvalidCheckError - If any of the checks are invalid.

apply_checks_by_metadata

def apply_checks_by_metadata(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame

Apply data quality checks defined as metadata to the given DataFrame.

Arguments:

  • df - Input DataFrame to check.
  • checks - List of dictionaries describing checks. Each check dictionary must contain the following:
    • check - A check definition including check function and arguments to use.
    • name - Optional name for the resulting column. Auto-generated if not provided.
    • criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame with errors and warnings result columns.

apply_checks_by_metadata_and_split

def apply_checks_by_metadata_and_split(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]

Apply data quality checks defined as metadata to the given DataFrame and split the results into two DataFrames ("good" and "bad").

Arguments:

  • df - Input DataFrame to check.
  • checks - List of dictionaries describing checks. Each check dictionary must contain the following:
    • check - A check definition including check function and arguments to use.
    • name - Optional name for the resulting column. Auto-generated if not provided.
    • criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame that includes errors and warnings result columns.

Raises:

  • InvalidCheckError - If any of the checks are invalid.

validate_checks

@staticmethod
def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True
) -> ChecksValidationStatus

Validate checks defined as metadata to ensure they conform to the expected structure and types.

This method validates the presence of required keys, the existence and callability of functions, and the types of arguments passed to those functions.

Arguments:

  • checks - List of checks to apply to the DataFrame. Each check should be a dictionary.
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • validate_custom_check_functions - If True, validate custom check functions.

Returns:

ChecksValidationStatus indicating the validation result.

get_invalid

def get_invalid(df: DataFrame) -> DataFrame

Return records that violate data quality checks (rows with warnings or errors).

Arguments:

  • df - Input DataFrame.

Returns:

DataFrame with rows that have errors or warnings and the corresponding result columns.

get_valid

def get_valid(df: DataFrame) -> DataFrame

Return records that do not violate data quality checks (rows with warnings but no errors).

Arguments:

  • df - Input DataFrame.

Returns:

DataFrame with warning rows but without the results columns.

load_checks_from_local_file

@staticmethod
def load_checks_from_local_file(filepath: str) -> list[dict]

Load DQ rules (checks) from a local JSON or YAML file.

The returned checks can be used as input to apply_checks_by_metadata.

Arguments:

  • filepath - Path to a file containing checks definitions.

Returns:

List of DQ rules.

save_checks_in_local_file

@staticmethod
def save_checks_in_local_file(checks: list[dict], filepath: str)

Save DQ rules (checks) to a local YAML or JSON file.

Arguments:

  • checks - List of DQ rules (checks) to save.
  • filepath - Path to a file where the checks definitions will be saved.

DQEngine Objects

class DQEngine(DQEngineBase)

High-level engine to apply data quality checks and manage IO.

This class delegates core checking logic to DQEngineCore while providing helpers to read inputs, persist results, and work with different storage backends for checks.

apply_checks

@telemetry_logger("engine", "apply_checks")
def apply_checks(df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame

Apply data quality checks to the given DataFrame.

Arguments:

  • df - Input DataFrame to check.
  • checks - List of checks to apply. Each check must be a DQRule instance.
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame with errors and warnings result columns.

apply_checks_and_split

@telemetry_logger("engine", "apply_checks_and_split")
def apply_checks_and_split(
df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]

Apply data quality checks to the given DataFrame and split the results into two DataFrames ("good" and "bad").

Arguments:

  • df - Input DataFrame to check.
  • checks - List of checks to apply. Each check must be a DQRule instance.
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows with errors or warnings and the corresponding result columns).

Raises:

  • InvalidCheckError - If any of the checks are invalid.

apply_checks_by_metadata

@telemetry_logger("engine", "apply_checks_by_metadata")
def apply_checks_by_metadata(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame

Apply data quality checks defined as metadata to the given DataFrame.

Arguments:

  • df - Input DataFrame to check.
  • checks - List of dictionaries describing checks. Each check dictionary must contain the following:
    • check - A check definition including check function and arguments to use.
    • name - Optional name for the resulting column. Auto-generated if not provided.
    • criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame with errors and warnings result columns.

apply_checks_by_metadata_and_split

@telemetry_logger("engine", "apply_checks_by_metadata_and_split")
def apply_checks_by_metadata_and_split(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]

Apply data quality checks defined as metadata to the given DataFrame and split the results into two DataFrames ("good" and "bad").

Arguments:

  • df - Input DataFrame to check.
  • checks - List of dictionaries describing checks. Each check dictionary must contain the following:
    • check - A check definition including check function and arguments to use.
    • name - Optional name for the resulting column. Auto-generated if not provided.
    • criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • ref_dfs - Optional reference DataFrames to use in the checks.

Returns:

DataFrame that includes errors and warnings result columns.

apply_checks_and_save_in_table

@telemetry_logger("engine", "apply_checks_and_save_in_table")
def apply_checks_and_save_in_table(
checks: list[DQRule],
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> None

Apply data quality checks to input data and save results.

If quarantine_config is provided, split the data into valid and invalid records:

  • valid records are written using output_config.
  • invalid records are written using quarantine_config.

If quarantine_config is not provided, write all rows (including result columns) using output_config.

Arguments:

  • checks - List of DQRule checks to apply.
  • input_config - Input configuration (e.g., table/view or file location and read options).
  • output_config - Output configuration (e.g., table name, mode, and write options).
  • quarantine_config - Optional configuration for writing invalid records.
  • ref_dfs - Optional reference DataFrames used by checks.

apply_checks_by_metadata_and_save_in_table

@telemetry_logger("engine", "apply_checks_by_metadata_and_save_in_table")
def apply_checks_by_metadata_and_save_in_table(
checks: list[dict],
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> None

Apply metadata-defined data quality checks to input data and save results.

If quarantine_config is provided, split the data into valid and invalid records:

  • valid records are written using output_config;
  • invalid records are written using quarantine_config.

If quarantine_config is not provided, write all rows (including result columns) using output_config.

Arguments:

  • checks - List of dicts describing checks. Each check dictionary must contain the following:
    • check - A check definition including check function and arguments to use.
    • name - Optional name for the resulting column. Auto-generated if not provided.
    • criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
  • input_config - Input configuration (e.g., table/view or file location and read options).
  • output_config - Output configuration (e.g., table name, mode, and write options).
  • quarantine_config - Optional configuration for writing invalid records.
  • custom_check_functions - Optional mapping of custom check function names to callables/modules (e.g., globals()).
  • ref_dfs - Optional reference DataFrames used by checks.

apply_checks_and_save_in_tables

@telemetry_logger("engine", "apply_checks_and_save_in_tables")
def apply_checks_and_save_in_tables(
run_configs: list[RunConfig], max_parallelism: int | None = os.cpu_count()
) -> None

Apply data quality checks to multiple tables or views and write the results to output table(s).

If quarantine tables are provided in the run configuration, the data will be split into good and bad records, with good records written to the output table and bad records to the quarantine table. If quarantine tables are not provided, all records (with error/warning columns) will be written to the output table.

Arguments:

  • run_configs list[RunConfig] - List of run configurations containing input configs, output configs, quarantine configs, and a checks file location.
  • max_parallelism int, optional - Maximum number of tables to check in parallel. Defaults to the number of CPU cores.

Returns:

None

apply_checks_and_save_in_tables_for_patterns

@telemetry_logger("engine", "apply_checks_and_save_in_tables_for_patterns")
def apply_checks_and_save_in_tables_for_patterns(
patterns: list[str],
checks_location: str,
exclude_patterns: list[str] | None = None,
exclude_matched: bool = False,
run_config_template: RunConfig = RunConfig(),
max_parallelism: int | None = os.cpu_count(),
output_table_suffix: str = "_dq_output",
quarantine_table_suffix: str = "_dq_quarantine") -> None

Apply data quality checks to tables or views matching a pattern and write the results to output table(s).

If quarantine option is enabled the data will be split into good and bad records, with good records written to the output table (under the same name as input table and "_dq" suffix) and bad records to the quarantine table (under the same name as input table and "_quarantine" suffix). If quarantine is not enabled, all records (with error/warning columns) will be written to the output table.

Checks are expected to be available under the same name as the table, with a .yml extension.

Arguments:

  • patterns - List of table names or filesystem-style wildcards (e.g. 'schema.*') to include. If None, all tables are included. By default, tables matching the pattern are included.
  • checks_location - Location of the checks files (e.g., absolute workspace or volume directory, or delta table). For file based locations, checks are expected to be found under checks_location/table_name.yml.
  • exclude_matched bool - Specifies whether to include tables matched by the pattern. If True, matched tables are excluded. If False, matched tables are included.
  • exclude_patterns - List of table names or filesystem-style wildcards to exclude. If None, no tables are excluded.
  • run_config_template - Run configuration template to use for all tables. Skip location in the input_config, output_config, and quarantine_config as it is derived from patterns. Skip checks_location of the run config as it is derived separately. Autogenerate input_config and output_config if not provided.
  • max_parallelism int - Maximum number of tables to check in parallel.
  • output_table_suffix - Suffix to append to the original table name for the output table.
  • quarantine_table_suffix - Suffix to append to the original table name for the quarantine table.

Returns:

None

validate_checks

@staticmethod
def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True
) -> ChecksValidationStatus

Validate checks defined as metadata to ensure they conform to the expected structure and types.

This method validates the presence of required keys, the existence and callability of functions, and the types of arguments passed to those functions.

Arguments:

  • checks - List of checks to apply to the DataFrame. Each check should be a dictionary.
  • custom_check_functions - Optional dictionary with custom check functions (e.g., globals() of the calling module).
  • validate_custom_check_functions - If True, validate custom check functions.

Returns:

ChecksValidationStatus indicating the validation result.

get_invalid

def get_invalid(df: DataFrame) -> DataFrame

Return records that violate data quality checks (rows with warnings or errors).

Arguments:

  • df - Input DataFrame.

Returns:

DataFrame with rows that have errors or warnings and the corresponding result columns.

get_valid

def get_valid(df: DataFrame) -> DataFrame

Return records that do not violate data quality checks (rows with warnings but no errors).

Arguments:

  • df - Input DataFrame.

Returns:

DataFrame with warning rows but without the results columns.

save_results_in_table

@telemetry_logger("engine", "save_results_in_table")
def save_results_in_table(output_df: DataFrame | None = None,
quarantine_df: DataFrame | None = None,
output_config: OutputConfig | None = None,
quarantine_config: OutputConfig | None = None,
run_config_name: str | None = "default",
product_name: str = "dqx",
assume_user: bool = True,
install_folder: str | None = None)

Persist result DataFrames using explicit configs or the named run configuration.

Behavior:

  • If output_df is provided and output_config is None, load the run config and use its output_config.
  • If quarantine_df is provided and quarantine_config is None, load the run config and use its quarantine_config.
  • A write occurs only when both a DataFrame and its corresponding config are available.

Arguments:

  • output_df - DataFrame with valid rows to be saved (optional).
  • quarantine_df - DataFrame with invalid rows to be saved (optional).
  • output_config - Configuration describing where/how to write the valid rows. If omitted, falls back to the run config.
  • quarantine_config - Configuration describing where/how to write the invalid rows (optional). If omitted, falls back to the run config.
  • run_config_name - Name of the run configuration to load when a config parameter is omitted.
  • product_name - Product/installation identifier used to resolve installation paths for config loading in install_folder is not provided ("dqx" as default).
  • assume_user - Whether to assume a per-user installation when loading the run configuration (True as default, skipped if install_folder is provided).
  • install_folder - Custom workspace installation folder. Required if DQX is installed in a custom folder.

Returns:

None

load_checks

def load_checks(config: BaseChecksStorageConfig) -> list[dict]

Load DQ rules (checks) from the storage backend described by config.

This method delegates to a storage handler selected by the factory based on the concrete type of config and returns the parsed list of checks (as dictionaries) ready for apply_checks_by_metadata.

Supported storage configurations include, for example:

  • FileChecksStorageConfig (local file);
  • WorkspaceFileChecksStorageConfig (Databricks workspace file);
  • TableChecksStorageConfig (table-backed storage);
  • InstallationChecksStorageConfig (installation directory);
  • VolumeFileChecksStorageConfig (Unity Catalog volume file);

Arguments:

  • config - Configuration object describing the storage backend.

Returns:

List of DQ rules (checks) represented as dictionaries.

Raises:

  • InvalidConfigError - If the configuration type is unsupported.

save_checks

def save_checks(checks: list[dict], config: BaseChecksStorageConfig) -> None

Persist DQ rules (checks) to the storage backend described by config.

The appropriate storage handler is resolved from the configuration type and used to write the provided checks. Any write semantics (e.g., append/overwrite) are controlled by fields on config such as mode where applicable.

Supported storage configurations include, for example:

  • FileChecksStorageConfig (local file);
  • WorkspaceFileChecksStorageConfig (Databricks workspace file);
  • TableChecksStorageConfig (table-backed storage);
  • InstallationChecksStorageConfig (installation directory);
  • VolumeFileChecksStorageConfig (Unity Catalog volume file);

Arguments:

  • checks - List of DQ rules (checks) to save (as dictionaries).
  • config - Configuration object describing the storage backend and write options.

Returns:

None

Raises:

  • InvalidConfigError - If the configuration type is unsupported.