Skip to main content

databricks.labs.dqx.check_funcs

DQPattern Objects

class DQPattern(Enum)

Enum class to represent DQ patterns used to match data in columns.

make_condition

def make_condition(condition: Column, message: Column | str,
alias: str) -> Column

Helper function to create a condition column.

Arguments:

  • condition - condition expression.
    • Pass the check if the condition evaluates to False
    • Fail the check if condition evaluates to True
  • message - message to output - it could be either Column object, or string constant
  • alias - name for the resulting column

Returns:

an instance of Column type, that either returns string if condition is evaluated to true, or null if condition is evaluated to false

matches_pattern

def matches_pattern(column: str | Column, pattern: DQPattern) -> Column

Checks whether the values in the input column match a given pattern.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • pattern - pattern to match against

Returns:

Column object for condition

is_not_null_and_not_empty

@register_rule("row")
def is_not_null_and_not_empty(column: str | Column,
trim_strings: bool | None = False) -> Column

Checks whether the values in the input column are not null and not empty.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • trim_strings - boolean flag to trim spaces from strings

Returns:

Column object for condition

is_not_empty

@register_rule("row")
def is_not_empty(column: str | Column) -> Column

Checks whether the values in the input column are not empty (but may be null).

Arguments:

  • column - column to check; can be a string column name or a column expression

Returns:

Column object for condition

is_not_null

@register_rule("row")
def is_not_null(column: str | Column) -> Column

Checks whether the values in the input column are not null.

Arguments:

  • column - column to check; can be a string column name or a column expression

Returns:

Column object for condition

is_not_null_and_is_in_list

@register_rule("row")
def is_not_null_and_is_in_list(column: str | Column, allowed: list) -> Column

Checks whether the values in the input column are not null and present in the list of allowed values.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • allowed - list of allowed values (actual values or Column objects)

Returns:

Column object for condition

is_in_list

@register_rule("row")
def is_in_list(column: str | Column, allowed: list) -> Column

Checks whether the values in the input column are present in the list of allowed values (null values are allowed).

Arguments:

  • column - column to check; can be a string column name or a column expression
  • allowed - list of allowed values (actual values or Column objects)

Returns:

Column object for condition

sql_expression

@register_rule("row")
def sql_expression(expression: str,
msg: str | None = None,
name: str | None = None,
negate: bool = False,
columns: list[str | Column] | None = None) -> Column

Checks whether the condition provided as an SQL expression is met.

Arguments:

  • expression - SQL expression. Fail if expression evaluates to True, pass if it evaluates to False.
  • msg - optional message of the Column type, automatically generated if None
  • name - optional name of the resulting column, automatically generated if None
  • negate - if the condition should be negated (true) or not. For example, "col is not null" will mark null values as "bad". Although sometimes it's easier to specify it other way around "col is null" + negate set to False
  • columns - optional list of columns to be used for reporting. Unused in the actual logic.

Returns:

new Column

is_older_than_col2_for_n_days

@register_rule("row")
def is_older_than_col2_for_n_days(column1: str | Column,
column2: str | Column,
days: int = 0,
negate: bool = False) -> Column

Checks whether the values in one input column are at least N days older than the values in another column.

Arguments:

  • column1 - first column to check; can be a string column name or a column expression
  • column2 - second column to check; can be a string column name or a column expression
  • days - number of days
  • negate - if the condition should be negated (true) or not; if negated, the check will fail when values in the first column are at least N days older than values in the second column

Returns:

new Column

is_older_than_n_days

@register_rule("row")
def is_older_than_n_days(column: str | Column,
days: int,
curr_date: Column | None = None,
negate: bool = False) -> Column

Checks whether the values in the input column are at least N days older than the current date.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • days - number of days
  • curr_date - (optional) set current date
  • negate - if the condition should be negated (true) or not; if negated, the check will fail when values in the first column are at least N days older than values in the second column

Returns:

new Column

is_not_in_future

@register_rule("row")
def is_not_in_future(column: str | Column,
offset: int = 0,
curr_timestamp: Column | None = None) -> Column

Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds).

Arguments:

  • column - column to check; can be a string column name or a column expression
  • offset - offset (in seconds) to add to the current timestamp at time of execution
  • curr_timestamp - (optional) set current timestamp

Returns:

new Column

is_not_in_near_future

@register_rule("row")
def is_not_in_near_future(column: str | Column,
offset: int = 0,
curr_timestamp: Column | None = None) -> Column

Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds).

Arguments:

  • column - column to check; can be a string column name or a column expression
  • offset - offset (in seconds) to add to the current timestamp at time of execution
  • curr_timestamp - (optional) set current timestamp

Returns:

new Column

is_equal_to

@register_rule("row")
def is_equal_to(
column: str | Column,
value: int | float | str | datetime.date | datetime.datetime | Column
| None = None
) -> Column

Check whether the values in the input column are equal to the given value.

Arguments:

  • column str | Column - Column to check. Can be a string column name or a column expression. value (int | float | str | datetime.date | datetime.datetime | Column | None, optional): The value to compare with. Can be a literal or a Spark Column. Defaults to None.

Returns:

  • Column - A Spark Column condition that fails if the column value is not equal to the given value.

is_not_equal_to

@register_rule("row")
def is_not_equal_to(
column: str | Column,
value: int | float | str | datetime.date | datetime.datetime | Column
| None = None
) -> Column

Check whether the values in the input column are not equal to the given value.

Arguments:

  • column str | Column - Column to check. Can be a string column name or a column expression. value (int | float | str | datetime.date | datetime.datetime | Column | None, optional): The value to compare with. Can be a literal or a Spark Column. Defaults to None.

Returns:

  • Column - A Spark Column condition that fails if the column value is equal to the given value.

is_not_less_than

@register_rule("row")
def is_not_less_than(
column: str | Column,
limit: int | datetime.date | datetime.datetime | str | Column | None = None
) -> Column

Checks whether the values in the input column are not less than the provided limit.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • limit - limit to use in the condition as number, date, timestamp, column name or sql expression

Returns:

new Column

is_not_greater_than

@register_rule("row")
def is_not_greater_than(
column: str | Column,
limit: int | datetime.date | datetime.datetime | str | Column | None = None
) -> Column

Checks whether the values in the input column are not greater than the provided limit.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • limit - limit to use in the condition as number, date, timestamp, column name or sql expression

Returns:

new Column

is_in_range

@register_rule("row")
def is_in_range(
column: str | Column,
min_limit: int | datetime.date | datetime.datetime | str | Column
| None = None,
max_limit: int | datetime.date | datetime.datetime | str | Column
| None = None
) -> Column

Checks whether the values in the input column are in the provided limits (inclusive of both boundaries).

Arguments:

  • column - column to check; can be a string column name or a column expression
  • min_limit - min limit to use in the condition as number, date, timestamp, column name or sql expression
  • max_limit - max limit to use in the condition as number, date, timestamp, column name or sql expression

Returns:

new Column

is_not_in_range

@register_rule("row")
def is_not_in_range(
column: str | Column,
min_limit: int | datetime.date | datetime.datetime | str | Column
| None = None,
max_limit: int | datetime.date | datetime.datetime | str | Column
| None = None
) -> Column

Checks whether the values in the input column are outside the provided limits (inclusive of both boundaries).

Arguments:

  • column - column to check; can be a string column name or a column expression
  • min_limit - min limit to use in the condition as number, date, timestamp, column name or sql expression
  • max_limit - min limit to use in the condition as number, date, timestamp, column name or sql expression

Returns:

new Column

regex_match

@register_rule("row")
def regex_match(column: str | Column,
regex: str,
negate: bool = False) -> Column

Checks whether the values in the input column matches a given regex.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • regex - regex to check
  • negate - if the condition should be negated (true) or not

Returns:

Column object for condition

is_not_null_and_not_empty_array

@register_rule("row")
def is_not_null_and_not_empty_array(column: str | Column) -> Column

Checks whether the values in the array input column are not null and not empty.

Arguments:

  • column - column to check; can be a string column name or a column expression

Returns:

Column object for condition

is_valid_date

@register_rule("row")
def is_valid_date(column: str | Column,
date_format: str | None = None) -> Column

Checks whether the values in the input column have valid date formats.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • date_format - date format (e.g. 'yyyy-mm-dd')

Returns:

Column object for condition

is_valid_timestamp

@register_rule("row")
def is_valid_timestamp(column: str | Column,
timestamp_format: str | None = None) -> Column

Checks whether the values in the input column have valid timestamp formats.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • timestamp_format - timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss')

Returns:

Column object for condition

is_valid_ipv4_address

@register_rule("row")
def is_valid_ipv4_address(column: str | Column) -> Column

Checks whether the values in the input column have valid IPv4 address formats.

Arguments:

  • column - column to check; can be a string column name or a column expression

Returns:

Column object for condition

is_ipv4_address_in_cidr

@register_rule("row")
def is_ipv4_address_in_cidr(column: str | Column, cidr_block: str) -> Column

Checks if an IP column value falls within the given CIDR block.

Arguments:

  • column - column to check; can be a string column name or a column expression
  • cidr_block - CIDR block string (e.g., '192.168.1.0/24')

Raises:

  • ValueError - If cidr_block is not a valid string in CIDR notation.

Returns:

Column object for condition

is_data_fresh

@register_rule("row")
def is_data_fresh(
column: str | Column,
max_age_minutes: int,
base_timestamp: str | datetime.date | datetime.datetime | Column
| None = None
) -> Column

Checks whether the values in the timestamp column are not older than the specified number of minutes from the base timestamp column.

This is useful for identifying stale data due to delayed pipelines and helps catch upstream issues early.

Arguments:

  • column - column to check; can be a string column name or a column expression containing timestamp values
  • max_age_minutes - maximum age in minutes before data is considered stale
  • base_timestamp - (optional) set base timestamp column from which the stale check is calculated, if not provided uses current_timestamp()

Returns:

Column object for condition

is_unique

@register_rule("dataset")
def is_unique(columns: list[str | Column],
nulls_distinct: bool = True,
row_filter: str | None = None) -> tuple[Column, Callable]

Build a uniqueness check condition and closure for dataset-level validation.

This function checks whether the specified columns contain unique values within the dataset and reports rows with duplicate combinations. When nulls_distinct is True (default), rows with NULLs are treated as distinct (SQL ANSI behavior); otherwise, NULLs are treated as equal when checking for duplicates.

In streaming, uniqueness is validated within individual micro-batches only.

Arguments:

  • columns - List of column names (str) or Spark Column expressions to validate for uniqueness.
  • nulls_distinct - Whether NULLs are treated as distinct (default: True).
  • row_filter - Optional SQL expression for filtering rows before checking uniqueness. Auto-injected from the check filter.

Returns:

A tuple of:

  • A Spark Column representing the condition for uniqueness violations.
  • A closure that applies the uniqueness check and adds the necessary condition/count columns.

foreign_key

@register_rule("dataset")
def foreign_key(columns: list[str | Column],
ref_columns: list[str | Column],
ref_df_name: str | None = None,
ref_table: str | None = None,
negate: bool = False,
row_filter: str | None = None) -> tuple[Column, Callable]

Build a foreign key check condition and closure for dataset-level validation.

This function verifies that values in the specified foreign key columns exist (or don't exist, if negate=True) in the corresponding reference columns of another DataFrame or table. Rows where foreign key values do not match the reference are reported as violations.

NULL values in the foreign key columns are ignored (SQL ANSI behavior).

Arguments:

  • columns - List of column names (str) or Column expressions in the dataset (foreign key).
  • ref_columns - List of column names (str) or Column expressions in the reference dataset.
  • ref_df_name - Name of the reference DataFrame (used when passing DataFrames directly).
  • ref_table - Name of the reference table (used when reading from catalog).
  • row_filter - Optional SQL expression for filtering rows before checking the foreign key. Auto-injected from the check filter.
  • negate - If True, the condition is negated (i.e., the check fails when the foreign key values exist in the reference DataFrame/Table). If False, the check fails when the foreign key values do not exist in the reference.

Returns:

A tuple of:

  • A Spark Column representing the condition for foreign key violations.
  • A closure that applies the foreign key validation by joining against the reference.

sql_query

@register_rule("dataset")
def sql_query(query: str,
merge_columns: list[str],
msg: str | None = None,
name: str | None = None,
negate: bool = False,
condition_column: str = "condition",
input_placeholder: str = "input_view",
row_filter: str | None = None) -> tuple[Column, Callable]

Checks whether the condition column generated by SQL query is met.

Arguments:

  • query - SQL query that must return as a minimum a condition column and all merge columns. The resulting DataFrame is automatically joined back to the input DataFrame using the merge_columns. Reference DataFrames when provided in the ref_dfs parameter are registered as temp view.
  • condition_column - Column name indicating violation (boolean). Fail the check if True, pass it if False
  • merge_columns - List of columns for join back to the input DataFrame. They must provide a unique key for the join, otherwise a duplicate records may be produced.
  • msg - Optional custom message or Column expression.
  • name - Optional name for the result.
  • negate - If True, the condition is negated (i.e., the check fails when the condition is False).
  • input_placeholder - Name to be used in the sql query as {{ input_placeholder }} to refer to the input DataFrame on which the checks are applied.
  • row_filter - Optional SQL expression for filtering rows before checking the foreign key. Auto-injected from the check filter.

Returns:

Tuple (condition column, apply function).

is_aggr_not_greater_than

@register_rule("dataset")
def is_aggr_not_greater_than(
column: str | Column,
limit: int | float | str | Column,
aggr_type: str = "count",
group_by: list[str | Column] | None = None,
row_filter: str | None = None) -> tuple[Column, Callable]

Build an aggregation check condition and closure for dataset-level validation.

This function verifies that an aggregation (count, sum, avg, min, max) on a column or group of columns does not exceed a specified limit. Rows where the aggregation result exceeds the limit are flagged.

Arguments:

  • column - Column name (str) or Column expression to aggregate.
  • limit - Numeric value, column name, or SQL expression for the limit.
  • aggr_type - Aggregation type: 'count', 'sum', 'avg', 'min', or 'max' (default: 'count').
  • group_by - Optional list of column names or Column expressions to group by.
  • row_filter - Optional SQL expression to filter rows before aggregation. Auto-injected from the check filter.

Returns:

A tuple of:

  • A Spark Column representing the condition for aggregation limit violations.
  • A closure that applies the aggregation check and adds the necessary condition/metric columns.

is_aggr_not_less_than

@register_rule("dataset")
def is_aggr_not_less_than(
column: str | Column,
limit: int | float | str | Column,
aggr_type: str = "count",
group_by: list[str | Column] | None = None,
row_filter: str | None = None) -> tuple[Column, Callable]

Build an aggregation check condition and closure for dataset-level validation.

This function verifies that an aggregation (count, sum, avg, min, max) on a column or group of columns is not below a specified limit. Rows where the aggregation result is below the limit are flagged.

Arguments:

  • column - Column name (str) or Column expression to aggregate.
  • limit - Numeric value, column name, or SQL expression for the limit.
  • aggr_type - Aggregation type: 'count', 'sum', 'avg', 'min', or 'max' (default: 'count').
  • group_by - Optional list of column names or Column expressions to group by.
  • row_filter - Optional SQL expression to filter rows before aggregation. Auto-injected from the check filter.

Returns:

A tuple of:

  • A Spark Column representing the condition for aggregation limit violations.
  • A closure that applies the aggregation check and adds the necessary condition/metric columns.

is_aggr_equal

@register_rule("dataset")
def is_aggr_equal(column: str | Column,
limit: int | float | str | Column,
aggr_type: str = "count",
group_by: list[str | Column] | None = None,
row_filter: str | None = None) -> tuple[Column, Callable]

Build an aggregation check condition and closure for dataset-level validation.

This function verifies that an aggregation (count, sum, avg, min, max) on a column or group of columns is equal to a specified limit. Rows where the aggregation result is not equal to the limit are flagged.

Arguments:

  • column - Column name (str) or Column expression to aggregate.
  • limit - Numeric value, column name, or SQL expression for the limit.
  • aggr_type - Aggregation type: 'count', 'sum', 'avg', 'min', or 'max' (default: 'count').
  • group_by - Optional list of column names or Column expressions to group by.
  • row_filter - Optional SQL expression to filter rows before aggregation. Auto-injected from the check filter.

Returns:

A tuple of:

  • A Spark Column representing the condition for aggregation limit violations.
  • A closure that applies the aggregation check and adds the necessary condition/metric columns.

is_aggr_not_equal

@register_rule("dataset")
def is_aggr_not_equal(
column: str | Column,
limit: int | float | str | Column,
aggr_type: str = "count",
group_by: list[str | Column] | None = None,
row_filter: str | None = None) -> tuple[Column, Callable]

Build an aggregation check condition and closure for dataset-level validation.

This function verifies that an aggregation (count, sum, avg, min, max) on a column or group of columns is not equal to a specified limit. Rows where the aggregation result is equal to the limit are flagged.

Arguments:

  • column - Column name (str) or Column expression to aggregate.
  • limit - Numeric value, column name, or SQL expression for the limit.
  • aggr_type - Aggregation type: 'count', 'sum', 'avg', 'min', or 'max' (default: 'count').
  • group_by - Optional list of column names or Column expressions to group by.
  • row_filter - Optional SQL expression to filter rows before aggregation. Auto-injected from the check filter.

Returns:

A tuple of:

  • A Spark Column representing the condition for aggregation limit violations.
  • A closure that applies the aggregation check and adds the necessary condition/metric columns.

compare_datasets

@register_rule("dataset")
def compare_datasets(columns: list[str | Column],
ref_columns: list[str | Column],
ref_df_name: str | None = None,
ref_table: str | None = None,
check_missing_records: bool | None = False,
exclude_columns: list[str | Column] | None = None,
null_safe_row_matching: bool | None = True,
null_safe_column_value_matching: bool | None = True,
row_filter: str | None = None) -> tuple[Column, Callable]

Dataset-level check that compares two datasets and returns a condition for changed rows, with details on row and column-level differences.

Only columns that are common across both datasets will be compared. Mismatched columns are ignored. Detailed information about the differences is provided in the condition column. The comparison does not support Map types (any column comparison on map type is skipped automatically).

The log containing detailed differences is written to the message field of the check result as a JSON string.

Examples:

{
"row_missing": false,
"row_extra": true,
"changed": {
"val": {
"df": "val1"
}
}
}

Arguments:

  • columns - List of columns to use for row matching with the reference DataFrame (can be a list of string column names or column expressions). Only simple column expressions are supported, e.g. F.col("col_name").
  • ref_columns - List of columns in the reference DataFrame or Table to row match against the source DataFrame (can be a list of string column names or column expressions). The columns parameter is matched with ref_columns by position, so the order of the provided columns in both lists must be exactly aligned. Only simple column expressions are supported, e.g. F.col("col_name").
  • ref_df_name - Name of the reference DataFrame (used when passing DataFrames directly).
  • ref_table - Name of the reference table (used when reading from catalog).
  • check_missing_records - Perform FULL OUTER JOIN between the DataFrames to also find records that could be missing from the DataFrame. Use with caution as it may produce output with more rows than in the original DataFrame.
  • exclude_columns - List of columns to exclude from the value comparison but not from row matching (can be a list of string column names or column expressions). Only simple column expressions are supported, e.g. F.col("col_name"). This parameter does not alter the list of columns used to determine row matches; it only controls which columns are skipped during the column value comparison.
  • null_safe_row_matching - If True, treats nulls as equal when matching rows.
  • null_safe_column_value_matching - If True, treats nulls as equal when matching column values. If enabled, (NULL, NULL) column values are equal and matching.
  • row_filter - Optional SQL expression to filter rows in the input DataFrame. Auto-injected from the check filter.

Returns:

Tuple[Column, Callable]:

  • A Spark Column representing the condition for comparison violations.
  • A closure that applies the comparison validation.

is_data_fresh_per_time_window

@register_rule("dataset")
def is_data_fresh_per_time_window(
column: str | Column,
window_minutes: int,
min_records_per_window: int,
lookback_windows: int | None = None,
row_filter: str | None = None,
curr_timestamp: Column | None = None) -> tuple[Column, Callable]

Build a completeness freshness check that validates records arrive at least every X minutes with a threshold for the expected number of rows per time window.

If lookback_windows is provided, only data within that lookback period will be validated. If omitted, the entire dataset will be checked.

Arguments:

  • column - Column name (str) or Column expression containing timestamps to check.
  • window_minutes - Time window in minutes to check for data arrival.
  • min_records_per_window - Minimum number of records expected per time window.
  • lookback_windows - Optional number of time windows to look back from curr_timestamp. This filters records to include only those within the specified number of time windows from curr_timestamp. If no lookback is provided, the check is applied to the entire dataset.
  • row_filter - Optional SQL expression to filter rows before checking.
  • curr_timestamp - Optional current timestamp column. If not provided, current_timestamp() function is used.

Returns:

A tuple of:

  • A Spark Column representing the condition for missing data within a time window.
  • A closure that applies the completeness check and adds the necessary condition columns.