databricks.labs.dqx.engine
DQEngineCore Objects
class DQEngineCore(DQEngineCoreBase)
Core engine to apply data quality checks to a DataFrame.
Arguments:
workspace_client
- WorkspaceClient instance used to access the workspace.spark
- Optional SparkSession to use. If not provided, the active session is used.extra_params
- Optional extra parameters for the engine, such as result column names and run metadata.
apply_checks
def apply_checks(df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame
Apply data quality checks to the given DataFrame.
Arguments:
df
- Input DataFrame to check.checks
- List of checks to apply. Each check must be a DQRule instance.ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame with errors and warnings result columns.
apply_checks_and_split
def apply_checks_and_split(
df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]
Apply data quality checks to the given DataFrame and split the results into two DataFrames ("good" and "bad").
Arguments:
df
- Input DataFrame to check.checks
- List of checks to apply. Each check must be a DQRule instance.ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows with errors or warnings and the corresponding result columns).
apply_checks_by_metadata
def apply_checks_by_metadata(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame
Apply data quality checks defined as metadata to the given DataFrame.
Arguments:
df
- Input DataFrame to check.checks
- List of dictionaries describing checks. Each check dictionary must contain the following:- check - A check definition including check function and arguments to use.
- name - Optional name for the resulting column. Auto-generated if not provided.
- criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame with errors and warnings result columns.
apply_checks_by_metadata_and_split
def apply_checks_by_metadata_and_split(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]
Apply data quality checks defined as metadata to the given DataFrame and split the results into two DataFrames ("good" and "bad").
Arguments:
df
- Input DataFrame to check.checks
- List of dictionaries describing checks. Each check dictionary must contain the following:- check - A check definition including check function and arguments to use.
- name - Optional name for the resulting column. Auto-generated if not provided.
- criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame that includes errors and warnings result columns.
validate_checks
@staticmethod
def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True
) -> ChecksValidationStatus
Validate checks defined as metadata to ensure they conform to the expected structure and types.
This method validates the presence of required keys, the existence and callability of functions, and the types of arguments passed to those functions.
Arguments:
checks
- List of checks to apply to the DataFrame. Each check should be a dictionary.custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).validate_custom_check_functions
- If True, validate custom check functions.
Returns:
ChecksValidationStatus indicating the validation result.
get_invalid
def get_invalid(df: DataFrame) -> DataFrame
Return records that violate data quality checks (rows with warnings or errors).
Arguments:
df
- Input DataFrame.
Returns:
DataFrame with rows that have errors or warnings and the corresponding result columns.
get_valid
def get_valid(df: DataFrame) -> DataFrame
Return records that do not violate data quality checks (rows with warnings but no errors).
Arguments:
df
- Input DataFrame.
Returns:
DataFrame with warning rows but without the results columns.
load_checks_from_local_file
@staticmethod
def load_checks_from_local_file(filepath: str) -> list[dict]
Load DQ rules (checks) from a local JSON or YAML file.
The returned checks can be used as input to apply_checks_by_metadata.
Arguments:
filepath
- Path to a file containing checks definitions.
Returns:
List of DQ rules.
save_checks_in_local_file
@staticmethod
def save_checks_in_local_file(checks: list[dict], filepath: str)
Save DQ rules (checks) to a local YAML or JSON file.
Arguments:
checks
- List of DQ rules (checks) to save.filepath
- Path to a file where the checks definitions will be saved.
DQEngine Objects
class DQEngine(DQEngineBase)
High-level engine to apply data quality checks and manage IO.
This class delegates core checking logic to DQEngineCore while providing helpers to read inputs, persist results, and work with different storage backends for checks.
apply_checks
def apply_checks(df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame
Apply data quality checks to the given DataFrame.
Arguments:
df
- Input DataFrame to check.checks
- List of checks to apply. Each check must be a DQRule instance.ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame with errors and warnings result columns.
apply_checks_and_split
def apply_checks_and_split(
df: DataFrame,
checks: list[DQRule],
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]
Apply data quality checks to the given DataFrame and split the results into two DataFrames ("good" and "bad").
Arguments:
df
- Input DataFrame to check.checks
- List of checks to apply. Each check must be a DQRule instance.ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
A tuple of two DataFrames: "good" (may include rows with warnings but no result columns) and "bad" (rows with errors or warnings and the corresponding result columns).
apply_checks_by_metadata
def apply_checks_by_metadata(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> DataFrame
Apply data quality checks defined as metadata to the given DataFrame.
Arguments:
df
- Input DataFrame to check.checks
- List of dictionaries describing checks. Each check dictionary must contain the following:- check - A check definition including check function and arguments to use.
- name - Optional name for the resulting column. Auto-generated if not provided.
- criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame with errors and warnings result columns.
apply_checks_by_metadata_and_split
def apply_checks_by_metadata_and_split(
df: DataFrame,
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None
) -> tuple[DataFrame, DataFrame]
Apply data quality checks defined as metadata to the given DataFrame and split the results into two DataFrames ("good" and "bad").
Arguments:
df
- Input DataFrame to check.checks
- List of dictionaries describing checks. Each check dictionary must contain the following:- check - A check definition including check function and arguments to use.
- name - Optional name for the resulting column. Auto-generated if not provided.
- criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).ref_dfs
- Optional reference DataFrames to use in the checks.
Returns:
DataFrame that includes errors and warnings result columns.
apply_checks_and_save_in_table
def apply_checks_and_save_in_table(
checks: list[DQRule],
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> None
Apply data quality checks to input data and save results.
If quarantine_config is provided, split the data into valid and invalid records:
- valid records are written using output_config.
- invalid records are written using quarantine_config.
If quarantine_config is not provided, write all rows (including result columns) using output_config.
Arguments:
checks
- List of DQRule checks to apply.input_config
- Input configuration (e.g., table/view or file location and read options).output_config
- Output configuration (e.g., table name, mode, and write options).quarantine_config
- Optional configuration for writing invalid records.ref_dfs
- Optional reference DataFrames used by checks.
apply_checks_by_metadata_and_save_in_table
def apply_checks_by_metadata_and_save_in_table(
checks: list[dict],
input_config: InputConfig,
output_config: OutputConfig,
quarantine_config: OutputConfig | None = None,
custom_check_functions: dict[str, Callable] | None = None,
ref_dfs: dict[str, DataFrame] | None = None) -> None
Apply metadata-defined data quality checks to input data and save results.
If quarantine_config is provided, split the data into valid and invalid records:
- valid records are written using output_config;
- invalid records are written using quarantine_config.
If quarantine_config is not provided, write all rows (including result columns) using output_config.
Arguments:
checks
- List of dicts describing checks. Each check dictionary must contain the following:- check - A check definition including check function and arguments to use.
- name - Optional name for the resulting column. Auto-generated if not provided.
- criticality - Optional; either error (rows go only to the "bad" DataFrame) or warn (rows appear in both DataFrames).
input_config
- Input configuration (e.g., table/view or file location and read options).output_config
- Output configuration (e.g., table name, mode, and write options).quarantine_config
- Optional configuration for writing invalid records.custom_check_functions
- Optional mapping of custom check function names to callables/modules (e.g., globals()).ref_dfs
- Optional reference DataFrames used by checks.
validate_checks
@staticmethod
def validate_checks(
checks: list[dict],
custom_check_functions: dict[str, Callable] | None = None,
validate_custom_check_functions: bool = True
) -> ChecksValidationStatus
Validate checks defined as metadata to ensure they conform to the expected structure and types.
This method validates the presence of required keys, the existence and callability of functions, and the types of arguments passed to those functions.
Arguments:
checks
- List of checks to apply to the DataFrame. Each check should be a dictionary.custom_check_functions
- Optional dictionary with custom check functions (e.g., globals() of the calling module).validate_custom_check_functions
- If True, validate custom check functions.
Returns:
ChecksValidationStatus indicating the validation result.
get_invalid
def get_invalid(df: DataFrame) -> DataFrame
Return records that violate data quality checks (rows with warnings or errors).
Arguments:
df
- Input DataFrame.
Returns:
DataFrame with rows that have errors or warnings and the corresponding result columns.
get_valid
def get_valid(df: DataFrame) -> DataFrame
Return records that do not violate data quality checks (rows with warnings but no errors).
Arguments:
df
- Input DataFrame.
Returns:
DataFrame with warning rows but without the results columns.
save_results_in_table
def save_results_in_table(output_df: DataFrame | None = None,
quarantine_df: DataFrame | None = None,
output_config: OutputConfig | None = None,
quarantine_config: OutputConfig | None = None,
run_config_name: str | None = "default",
product_name: str = "dqx",
assume_user: bool = True)
Persist result DataFrames using explicit configs or the named run configuration.
Behavior:
- If output_df is provided and output_config is None, load the run config and use its output_config.
- If quarantine_df is provided and quarantine_config is None, load the run config and use its quarantine_config.
- A write occurs only when both a DataFrame and its corresponding config are available.
Arguments:
output_df
- DataFrame with valid rows to be saved (optional).quarantine_df
- DataFrame with invalid rows to be saved (optional).output_config
- Configuration describing where/how to write the valid rows. If omitted, falls back to the run config.quarantine_config
- Configuration describing where/how to write the invalid rows. If omitted, falls back to the run config.run_config_name
- Name of the run configuration to load when a config parameter is omitted.product_name
- Product/installation identifier used to resolve installation paths for config loading.assume_user
- Whether to assume a per-user installation when loading the run configuration.
Returns:
None
load_checks
def load_checks(config: BaseChecksStorageConfig) -> list[dict]
Load DQ rules (checks) from the storage backend described by config.
This method delegates to a storage handler selected by the factory based on the concrete type of config and returns the parsed list of checks (as dictionaries) ready for apply_checks_by_metadata.
Supported storage configurations include, for example:
- FileChecksStorageConfig (local file);
- WorkspaceFileChecksStorageConfig (Databricks workspace file);
- TableChecksStorageConfig (table-backed storage);
- InstallationChecksStorageConfig (installation directory);
- VolumeFileChecksStorageConfig (Unity Catalog volume file);
Arguments:
config
- Configuration object describing the storage backend.
Returns:
List of DQ rules (checks) represented as dictionaries.
Raises:
ValueError
- If the configuration type is unsupported.
save_checks
def save_checks(checks: list[dict], config: BaseChecksStorageConfig) -> None
Persist DQ rules (checks) to the storage backend described by config.
The appropriate storage handler is resolved from the configuration type and used to write the provided checks. Any write semantics (e.g., append/overwrite) are controlled by fields on config such as mode where applicable.
Supported storage configurations include, for example:
- FileChecksStorageConfig (local file);
- WorkspaceFileChecksStorageConfig (Databricks workspace file);
- TableChecksStorageConfig (table-backed storage);
- InstallationChecksStorageConfig (installation directory);
- VolumeFileChecksStorageConfig (Unity Catalog volume file);
Arguments:
checks
- List of DQ rules (checks) to save (as dictionaries).config
- Configuration object describing the storage backend and write options.
Returns:
None
Raises:
ValueError
- If the configuration type is unsupported.
load_checks_from_local_file
@staticmethod
def load_checks_from_local_file(filepath: str) -> list[dict]
Deprecated: Use load_checks with FileChecksStorageConfig instead.
Load DQ rules (checks) from a local JSON or YAML file.
Arguments:
filepath
- Path to a file containing checks definitions.
Returns:
List of DQ rules (checks) represented as dictionaries.
save_checks_in_local_file
@staticmethod
def save_checks_in_local_file(checks: list[dict], path: str)
Deprecated: Use save_checks with FileChecksStorageConfig instead.
Save DQ rules (checks) to a local YAML or JSON file.
Arguments:
checks
- List of DQ rules (checks) to save.path
- File path where the checks definitions will be saved.
Returns:
None
load_checks_from_workspace_file
def load_checks_from_workspace_file(workspace_path: str) -> list[dict]
Deprecated: Use load_checks with WorkspaceFileChecksStorageConfig instead.
Load checks stored in a Databricks workspace file.
Arguments:
workspace_path
- Path to the workspace file containing checks definitions.
Returns:
List of DQ rules (checks) represented as dictionaries.
save_checks_in_workspace_file
def save_checks_in_workspace_file(checks: list[dict], workspace_path: str)
Deprecated: Use save_checks with WorkspaceFileChecksStorageConfig instead.
Save checks to a Databricks workspace file.
Arguments:
checks
- List of DQ rules (checks) to save.workspace_path
- Path to the workspace file where checks will be saved.
Returns:
None
load_checks_from_table
def load_checks_from_table(table_name: str,
run_config_name: str = "default") -> list[dict]
Deprecated: Use load_checks with TableChecksStorageConfig instead.
Load checks from a table.
Arguments:
table_name
- Fully qualified table name where checks are stored.run_config_name
- Name of the run configuration (used by the storage handler if needed).
Returns:
List of DQ rules (checks) represented as dictionaries.
save_checks_in_table
def save_checks_in_table(checks: list[dict],
table_name: str,
run_config_name: str = "default",
mode: str = "append")
Deprecated: Use save_checks with TableChecksStorageConfig instead.
Save checks to a table.
Arguments:
checks
- List of DQ rules (checks) to save.table_name
- Fully qualified table name where checks will be written.run_config_name
- Name of the run configuration (used by the storage handler if needed).mode
- Write mode, e.g., "append" or "overwrite".
Returns:
None
load_checks_from_installation
def load_checks_from_installation(run_config_name: str = "default",
method: str = "file",
product_name: str = "dqx",
assume_user: bool = True) -> list[dict]
Deprecated: Use load_checks with InstallationChecksStorageConfig instead.
Load checks from the installation directory.
Arguments:
run_config_name
- Named run configuration to resolve installation paths and defaults.method
- Deprecated parameter; ignored.product_name
- Product/installation identifier (e.g., "dqx").assume_user
- Whether to assume a per-user installation layout.
Returns:
List of DQ rules (checks) represented as dictionaries.
save_checks_in_installation
def save_checks_in_installation(checks: list[dict],
run_config_name: str = "default",
method: str = "file",
product_name: str = "dqx",
assume_user: bool = True)
Deprecated: Use save_checks with InstallationChecksStorageConfig instead.
Save checks to the installation directory.
Arguments:
checks
- List of DQ rules (checks) to save.run_config_name
- Named run configuration to resolve installation paths and defaults.method
- Deprecated parameter; ignored.product_name
- Product/installation identifier (e.g., "dqx").assume_user
- Whether to assume a per-user installation layout.
Returns:
None
load_run_config
def load_run_config(run_config_name: str = "default",
assume_user: bool = True,
product_name: str = "dqx") -> RunConfig
Deprecated: Use RunConfigLoader.load_run_config directly.
Load a run configuration by name. This wrapper will be removed in a future version.
Arguments:
run_config_name
- Name of the run configuration to load.assume_user
- Whether to assume a per-user installation when resolving paths.product_name
- Product/installation identifier (e.g., "dqx").
Returns:
Loaded RunConfig instance.