Skip to main content

Quality Rules

This page provides a reference for the quality checks (rule functions) available in DQX.

Row-level checks reference

The following built-in row-level checks are currently available in DQX. These checks are applied to each row of a PySpark DataFrame and report quality issues as additional columns. You can also define your own custom checks (see Creating custom checks).

Some of the checks such as is_unique, is_aggr_not_greater_than, or is_aggr_not_less_than are performed over a group of rows using window functions, while others operate on individual rows. In all cases, the quality check results are always reported for individual rows in the reporting columns.

Available row-level checks
CheckDescriptionArgumentsOperate on Group of Rows
is_not_nullChecks whether the values in the input column are not null.column: column to check (can be a string column name or a column expression)No
is_not_emptyChecks whether the values in the input column are not empty (but may be null).column: column to check (can be a string column name or a column expression)No
is_not_null_and_not_emptyChecks whether the values in the input column are not null and not empty.column: column to check (can be a string column name or a column expression); trim_strings: optional boolean flag to trim spaces from stringsNo
is_in_listChecks whether the values in the input column are present in the list of allowed values (null values are allowed).column: column to check (can be a string column name or a column expression); allowed: list of allowed valuesNo
is_not_null_and_is_in_listChecks whether the values in the input column are not null and present in the list of allowed values.column: column to check (can be a string column name or a column expression); allowed: list of allowed valuesNo
is_not_null_and_not_empty_arrayChecks whether the values in the array input column are not null and not empty.column: column to check (can be a string column name or a column expression)No
is_in_rangeChecks whether the values in the input column are in the provided range (inclusive of both boundaries).column: column to check (can be a string column name or a column expression); min_limit: min limit as number, date, timestamp, column name or sql expression; max_limit: max limit as number, date, timestamp, column name or sql expressionNo
is_not_in_rangeChecks whether the values in the input column are outside the provided range (inclusive of both boundaries).column: column to check (can be a string column name or a column expression); min_limit: min limit as number, date, timestamp, column name or sql expression; max_limit: max limit as number, date, timestamp, column name or sql expressionNo
is_not_less_thanChecks whether the values in the input column are not less than the provided limit.column: column to check (can be a string column name or a column expression); limit: limit as number, date, timestamp, column name or sql expressionNo
is_not_greater_thanChecks whether the values in the input column are not greater than the provided limit.column: column to check (can be a string column name or a column expression); limit: limit as number, date, timestamp, column name or sql expressionNo
is_valid_dateChecks whether the values in the input column have valid date formats.column: column to check (can be a string column name or a column expression); date_format: optional date format (e.g. 'yyyy-mm-dd')No
is_valid_timestampChecks whether the values in the input column have valid timestamp formats.column: column to check (can be a string column name or a column expression); timestamp_format: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss')No
is_not_in_futureChecks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds).column: column to check (can be a string column name or a column expression); offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is usedNo
is_not_in_near_futureChecks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds).column: column to check (can be a string column name or a column expression); offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is usedNo
is_older_than_n_daysChecks whether the values in one input column are at least N days older than the values in another column.column: column to check (can be a string column name or a column expression); days: number of days; curr_date: current date, if not provided current_date() function is used; negate: if the condition should be negatedNo
is_older_than_col2_for_n_daysChecks whether the values in one input column are at least N days older than the values in another column.column1: first column to check (can be a string column name or a column expression); column2: second column to check (can be a string column name or a column expression); days: number of days; negate: if the condition should be negatedNo
regex_matchChecks whether the values in the input column match a given regex.column: column to check (can be a string column name or a column expression); regex: regex to check; negate: if the condition should be negated (true) or notNo
sql_expressionChecks whether the values meet the condition provided as an SQL expression, e.g. a = 'str1' and a > b seeexpression: sql expression to check on a dataframe; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negatedNo
is_uniqueChecks whether the values in the input column are unique and reports an issue for each row that contains a duplicate value. It supports uniqueness check for multiple columns (composite key). Null values are not considered duplicates by default, following the ANSI SQL standard.columns: columns to check (can be a list of column names or column expressions); window_spec: optional window specification as a string or column object, you must handle NULLs correctly using coalesce() to prevent rows exclusion; nulls_distinct: controls how null values are treated, default is True, thus nulls are not duplicates, eg. (NULL, NULL) not equals (NULL, NULL) and (1, NULL) not equals (1, NULL)Yes
is_aggr_not_greater_thanChecks whether the aggregated values over group of rows or all rows are not greater than the provided limit.column: column to check (can be a string column name or a column expression), optional for 'count' aggregation; limit: limit as number, column name or sql expression; aggr_type: aggregation function to use, such as "count" (default), "sum", "avg", "min", and "max"; group_by: optional list of columns or column expressions to group the rows for aggregation (no grouping by default); row_filter: filter from the check definition, auto-injected when applying the checkYes
is_aggr_not_less_thanChecks whether the aggregated values over group of rows or all rows are not less than the provided limit.column: column to check (can be a string column name or a column expression), optional for 'count' aggregation; limit: limit as number, column name or sql expression; aggr_type: aggregation function to use, such as "count" (default), "sum", "avg", "min", and "max"; group_by: optional list of columns or column expressions to group the rows for aggregation (no grouping by default); row_filter: filter from the check definition, auto-injected when applying the checkYes
PII detectionChecks whether the values in the input column contain Personally Identifiable Information (PII). This check is not included in DQX built-in rules to avoid introducing 3rd-party dependencies. An example implementation can be found here.No
Applicability

Row-level checks are designed to operate on a single PySpark DataFrame. If your validation logic requires multiple DataFrames/Tables, you can join them beforehand to produce a single DataFrame on which the checks can be applied. See more here.

You can explore the implementation details of the row checks here. If you have a custom check that could be broadly useful, feel free to submit a PR to DQX (see the contribution guide for details).

Usage examples of Row-level checks

Below are fully specified examples of how to define checks declaratively in YAML format and programmatically with DQX classes. Both are equivalent and can be used interchangeably.

The criticality field can be either "error" (data goes only into the bad / quarantine DataFrame) or "warn" (data goes into good and bad DataFrames). For brevity, the name field in the examples is omitted and it will be auto-generated in the results.

Checks define in YAML
# is_not_null check
- criticality: error
check:
function: is_not_null
arguments:
column: col1

# is_not_empty check
- criticality: error
check:
function: is_not_empty
arguments:
column: col1

# is_not_null_and_not_empty check
- criticality: error
check:
function: is_not_null_and_not_empty
arguments:
column: col1
trim_strings: true

# is_in_list check
- criticality: error
check:
function: is_in_list
arguments:
column: col2
allowed:
- 1
- 2
- 3

# is_not_null_and_is_in_list check
- criticality: error
check:
function: is_not_null_and_is_in_list
arguments:
column: col2
allowed:
- 1
- 2
- 3

# is_not_null_and_not_empty_array check
- criticality: error
check:
function: is_not_null_and_not_empty_array
arguments:
column: col4

# is_in_range check
- criticality: error
check:
function: is_in_range
arguments:
column: col2
min_limit: 1
max_limit: 10
- criticality: error
check:
function: is_in_range
arguments:
column: col5
min_limit: 2025-01-01
max_limit: 2025-02-24
- criticality: error
check:
function: is_in_range
arguments:
column: col6
min_limit: 2025-01-01 00:00:00
max_limit: 2025-02-24 01:00:00
- criticality: error
check:
function: is_in_range
arguments:
column: col3
min_limit: col2
max_limit: col2 * 2

# is_not_in_range check
- criticality: error
check:
function: is_not_in_range
arguments:
column: col2
min_limit: 11
max_limit: 20
- criticality: error
check:
function: is_not_in_range
arguments:
column: col5
min_limit: 2025-02-25
max_limit: 2025-02-26
- criticality: error
check:
function: is_not_in_range
arguments:
column: col6
min_limit: 2025-02-25 00:00:00
max_limit: 2025-02-26 01:00:00
- criticality: error
check:
function: is_not_in_range
arguments:
column: col3
min_limit: col2 + 10
max_limit: col2 * 10

# is_not_less_than check
- criticality: error
check:
function: is_not_less_than
arguments:
column: col2
limit: 0
- criticality: error
check:
function: is_not_less_than
arguments:
column: col5
limit: 2025-01-01
- criticality: error
check:
function: is_not_less_than
arguments:
column: col6
limit: 2025-01-01 01:00:00
- criticality: error
check:
function: is_not_less_than
arguments:
column: col3
limit: col2 - 10

# is_not_greater_than check
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col2
limit: 10
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col5
limit: 2025-03-01
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col6
limit: 2025-03-24 01:00:00
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col3
limit: col2 + 10

# is_valid_date check
- criticality: error
check:
function: is_valid_date
arguments:
column: col5
- criticality: error
name: col5_is_not_valid_date2
check:
function: is_valid_date
arguments:
column: col5
date_format: yyyy-MM-dd

# is_valid_timestamp check
- criticality: error
check:
function: is_valid_timestamp
arguments:
column: col6
timestamp_format: yyyy-MM-dd HH:mm:ss
- criticality: error
name: col6_is_not_valid_timestamp2
check:
function: is_valid_timestamp
arguments:
column: col6

# is_not_in_future check
- criticality: error
check:
function: is_not_in_future
arguments:
column: col6
offset: 86400

# is_not_in_near_future check
- criticality: error
check:
function: is_not_in_near_future
arguments:
column: col6
offset: 36400

# is_older_than_n_days check
- criticality: error
check:
function: is_older_than_n_days
arguments:
column: col5
days: 10000

# is_older_than_col2_for_n_days check
- criticality: error
check:
function: is_older_than_col2_for_n_days
arguments:
column1: col5
column2: col6
days: 2

# is_unique check
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1

# is_unique on multiple columns (composite key), nulls are distinct (default behavior)
# eg. (1, NULL) not equals (1, NULL) and (NULL, NULL) not equals (NULL, NULL)
- criticality: error
name: composite_key_col1_and_col2_is_not_unique
check:
function: is_unique
arguments:
columns:
- col1
- col2

# is_unique on multiple columns (composite key), nulls are not distinct
# eg. (1, NULL) equals (1, NULL) and (NULL, NULL) equals (NULL, NULL)
- criticality: error
name: composite_key_col1_and_col2_is_not_unique_not_nulls_distinct
check:
function: is_unique
arguments:
columns:
- col1
- col2
nulls_distinct: False

# is_unique check with custom window
# provide default value for NULL in the time column of the window spec using coalesce() to prevent rows exclusion!
- criticality: error
name: col1_is_not_unique_custom_window
check:
function: is_unique
arguments:
columns:
- col1
window_spec: window(coalesce(col6, '1970-01-01'), '10 minutes')

# is_aggr_not_greater_than check with count aggregation over all rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: '*'
aggr_type: count
limit: 10

# is_aggr_not_greater_than check with aggregation over col2 (skip nulls)
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
limit: 10

# is_aggr_not_greater_than check with aggregation over col2 grouped by col3 (skip nulls)
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
group_by:
- col3
limit: 10

# is_aggr_not_less_than check with count aggregation over all rows
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: '*'
aggr_type: count
limit: 1

# is_aggr_not_less_than check with aggregation over col2 (skip nulls)
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
limit: 1

# is_aggr_not_less_than check with aggregation over col2 grouped by col3 (skip nulls)
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
group_by:
- col3
limit: 1

# regex_match check
- criticality: error
check:
function: regex_match
arguments:
column: col2
regex: '[0-9]+'
negate: false

# sql_expression check
- criticality: error
check:
function: sql_expression
arguments:
expression: col3 >= col2 and col3 <= 10
msg: col3 is less than col2 and col3 is greater than 10
name: custom_output_name
negate: false

# apply check to multiple columns
- criticality: error
check:
function: is_not_null # 'column' as first argument
for_each_column: # apply the check for each column in the list
- col3
- col5
- criticality: error
check:
function: is_unique # 'column' as first argument
for_each_column: # apply the check for each column in the list
- [col3, col5]
- [col1]
Checks defined programmatically using DQX classes
from databricks.labs.dqx.rule import DQRowRule, DQRowRuleForEachCol
from databricks.labs.dqx import check_funcs
from datetime import datetime

checks = [
# is_not_null check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column="col1"
),
# is_not_empty check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_empty,
column="col1"
),
# is_not_null_and_not_empty check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_not_empty,
column="col1",
check_func_kwargs={"trim_strings": True}
),
# is_in_list check
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2, 3]}
),
# is_not_null_and_is_in_list check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2, 3]}
),
# is_not_null_and_not_empty_array check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_not_empty_array,
column="col4"
),
# is_in_range check
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_kwargs={"min_limit": 1, "max_limit": 10}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col5",
check_func_kwargs={
"min_limit": datetime(2025, 1, 1).date(),
"max_limit": datetime(2025, 2, 24).date()
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col6",
check_func_kwargs={
"min_limit": datetime(2025, 1, 1, 0, 0, 0),
"max_limit": datetime(2025, 2, 24, 1, 0, 0)
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col3",
check_func_kwargs={
"min_limit": "col2",
"max_limit": "col2 * 2"
}
),
# is_not_in_range check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col2",
check_func_kwargs={
"min_limit": 11,
"max_limit": 20
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col5",
check_func_kwargs={
"min_limit": datetime(2025, 2, 25).date(),
"max_limit": datetime(2025, 2, 26).date()
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col6",
check_func_kwargs={
"min_limit": datetime(2025, 2, 25, 0, 0, 0),
"max_limit": datetime(2025, 2, 26, 1, 0, 0)
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col3",
check_func_kwargs={
"min_limit": "col2 + 10",
"max_limit": "col2 * 10"
}
),
# is_not_less_than check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col2",
check_func_kwargs={"limit": 0}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col5",
check_func_kwargs={"limit": datetime(2025, 1, 1).date()}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col6",
check_func_kwargs={"limit": datetime(2025, 1, 1, 1, 0, 0)}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col3",
check_func_kwargs={"limit": "col2 - 10"}
),
# is_not_greater_than check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col2",
check_func_kwargs={"limit": 10}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col5",
check_func_kwargs={"limit": datetime(2025, 3, 1).date()}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col6",
check_func_kwargs={"limit": datetime(2025, 3, 24, 1, 0, 0)}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col3",
check_func_kwargs={"limit": "col2 + 10"}
),
# is_valid_date check
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_date,
column="col5"
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_date,
column="col5",
check_func_kwargs={"date_format": "yyyy-MM-dd"},
name="col5_is_not_valid_date2"
),
# is_valid_timestamp check
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_timestamp,
column="col6"
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_timestamp,
column="col6",
check_func_kwargs={"timestamp_format": "yyyy-MM-dd HH:mm:ss"},
name="col6_is_not_valid_timestamp2"
),
# is_not_in_future check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_future,
column="col6",
check_func_kwargs={"offset": 86400}
),
# is_not_in_near_future check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_near_future,
column="col6",
check_func_kwargs={"offset": 36400}
),
# is_older_than_n_days check
DQRowRule(
criticality="error",
check_func=check_funcs.is_older_than_n_days,
column="col5",
check_func_kwargs={"days": 10000}
),
# is_older_than_col2_for_n_days check
DQRowRule(
criticality="error",
check_func=check_funcs.is_older_than_col2_for_n_days,
check_func_kwargs={"column1": "col5", "column2": "col6", "days": 2}
),
# is_unique check
DQRowRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1"] # require list of columns
),
# is_unique check
DQRowRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=[F.col("col1")] # require list of columns
),
# is_unique on multiple columns (composite key), nulls are distinct (default behavior)
# eg. (1, NULL) not equals (1, NULL) and (NULL, NULL) not equals (NULL, NULL)
DQRowRule(
criticality="error",
name="composite_key_col1_and_col2_is_not_unique",
check_func=check_funcs.is_unique,
columns=["col1", "col2"]
),
# is_unique on multiple columns (composite key), nulls are not distinct
# eg. (1, NULL) equals (1, NULL) and (NULL, NULL) equals (NULL, NULL)
DQRowRule(
criticality="error",
name="composite_key_col1_and_col2_is_not_unique_nulls_not_distinct",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
check_func_kwargs={
"nulls_distinct": False
}
),
# is_unique check with custom window
DQRowRule(
criticality="error",
name="col1_is_not_unique_custom_window",
# provide default value for NULL in the time column of the window spec using coalesce()
# to prevent rows exclusion!
check_func=check_funcs.is_unique,
columns=["col1"],
check_func_kwargs={
"window_spec": F.window(F.coalesce(F.col("col6"), F.lit(datetime(1970, 1, 1))), "10 minutes")
}
),
# is_aggr_not_greater_than check with count aggregation over all rows
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
check_func_kwargs={
"column": "*",
"aggr_type": "count",
"limit": 10
},
),
# is_aggr_not_greater_than check with aggregation over col2 (skip nulls)
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
check_func_kwargs={
"column": "col2",
"aggr_type": "count",
"limit": 10
},
),
# is_aggr_not_greater_than check with aggregation over col2 grouped by col3 (skip nulls)
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
check_func_kwargs={
"column": "col2",
"aggr_type": "count",
"group_by": ["col3"],
"limit": 10
},
),
# is_aggr_not_less_than check with count aggregation over all rows
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="*",
check_func_kwargs={
"aggr_type": "count",
"limit": 1
},
),
# is_aggr_not_less_than check with aggregation over col2 (skip nulls)
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"limit": 1
},
),
# is_aggr_not_less_than check with aggregation over col2 grouped by col3 (skip nulls)
DQRowRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"group_by": ["col3"],
"limit": 1
},
),
# regex_match check
DQRowRule(
criticality="error",
check_func=check_funcs.regex_match,
column="col2",
check_func_kwargs={
"regex": "[0-9]+", "negate": False
}
),
# sql_expression check
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "col3 >= col2 and col3 <= 10",
"msg": "col3 is less than col2 and col3 is greater than 10",
"name": "custom_output_name",
"negate": False
}
),
]

# apply check to multiple columns
checks = checks + DQRowRuleForEachCol(
check_func=check_funcs.is_not_null, # 'column' as first argument
criticality="error",
columns=["col3", "col5"] # apply the check for each column in the list
).get_rules() + DQRowRuleForEachCol(
check_func=check_funcs.is_unique, # 'columns' as first argument
criticality="error",
columns=[["col3", "col5"], ["col1"]] # apply the check for each list of columns
).get_rules()
Details
  • Some checks require a list of columns (columns) as input (eg. is_unique), while others require a single column (eg. is_not_null requires column) or no columns at all (eg. sql_expression). For details on each check, please refer to the documentation here.

  • Checks defined programmatically with DQX classes can be created using positional or keyword/named arguments:

# using keyword/named arguments
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_kwargs={"min_limit": 1, "max_limit": 10}
)

# using positional arguments
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_args=[1, 10]
)

Usage examples of Row-level checks for complex types (Struct, Map, Array)

You can apply any of checks to complex column types (Struct, MapType, ArrayType) by passing a column expression to the check function or by using the sql_expression check function. Below are examples of how to apply checks for complex types declaratively in YAML format and programmatically with DQX classes.

Checks on complex column types defined declaratively in YAML
# is_not_null check applied to a struct column element (dot notation)
- criticality: error
check:
function: is_not_null
arguments:
column: col8.field1

# is_not_null check applied to a map column element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col7, 'key1')

# is_not_null check applied to an array column element at the specified position
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col4, 1)

# is_not_greater_than check applied to an array column
- criticality: error
check:
function: is_not_greater_than
arguments:
column: array_max(col4)
limit: 10

# is_not_less_than check applied to an array column
- criticality: error
check:
function: is_not_less_than
arguments:
column: array_min(col4)
limit: 1

# sql_expression check applied to a map column element
- criticality: error
check:
function: sql_expression
arguments:
expression: try_element_at(col7, 'key1') < 10
msg: col7 element 'key1' is less than 10
name: col7_element_key1_less_than_10
negate: false

# sql_expression check applied to an array of map column elements
- criticality: error
check:
function: sql_expression
arguments:
expression: not exists(col4, x -> x >= 10)
msg: array col4 has an element greater than 10
name: col4_all_elements_less_than_10
negate: false

# apply check to multiple columns (simple col, struct, map and array)
- criticality: error
check:
function: is_not_null
for_each_column:
- col1 # col
- col8.field1 # struct col
- try_element_at(col7, 'key1') # map col
- try_element_at(col4, 1) # array col
Checks on complex column types defined programmatically using DQX classes
import pyspark.sql.functions as F
from databricks.labs.dqx.rule import DQRowRule, DQRowRuleForEachCol
from databricks.labs.dqx import check_funcs

checks = [
# is_not_null check applied to a struct column element (dot notation)
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column="col8.field1",
),
# is_not_null check applied to a map column element
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col7", F.lit("key1")),
),
# is_not_null check applied to an array column element at the specified position
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col4", F.lit(1)),
),
# is_not_greater_than check applied to an array column
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column=F.array_max("col4"),
check_func_kwargs={"limit": 10},
),
# is_not_less_than check applied to an array column
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column=F.array_min("col4"),
check_func_kwargs={"limit": 1},
),
# sql_expression check applied to a map column element
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "try_element_at(col7, 'key1') < 10",
"msg": "col7 element 'key1' is less than 10",
"name": "col7_element_key1_less_than_10",
"negate": False,
},
),
# sql_expression check applied to an array of map column elements
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "not exists(col4, x -> x >= 10)",
"msg": "array col4 has an element greater than 10",
"name": "col4_all_elements_less_than_10",
"negate": False,
},
),
]

# apply check to multiple columns (simple col, map and array)
checks = checks + DQRowRuleForEachCol(
check_func=check_funcs.is_not_null,
criticality="error",
columns=[
"col1", # col as string
F.col("col2"), # col
"col8.field1", # struct col
F.try_element_at("col7", F.lit("key1")), # map col
F.try_element_at("col4", F.lit(1)) # array col
]
).get_rules()

Applying checks on Multiple Data Sets

Row-level checks are intended to operate on a single DataFrame as input. If your validation logic requires multiple dataframes/tables as input, they should be joined before applying the row-level checks.

For example, if you need to verify that values in column col1 from one DataFrame/Table are greater than values in column col1 from another DataFrame/Table, you can join the two DataFrames/Tables first, and then apply the checks to the resulting DataFrame.

Example
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

# Join and filter two input tables
input_df = spark.sql("""
SELECT t2.col1, t2.col2, t2.col3
FROM (
SELECT DISTINCT col1 FROM table1 WHERE col4 = 'foo'
) AS t1
JOIN table2 AS t2
ON t1.col1 = t2.col1
WHERE t2.col1 > 0
""")

# Define and apply checks on the joined data sets
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
arguments:
column: col1
- criticality: error
check:
function: sql_expression
arguments:
expression: col3 >= col2 and col3 <= 10
msg: col3 is less than col2 and col3 is greater than 10
name: custom_output_name
negate: false
""")

dq_engine = DQEngine(WorkspaceClient())
good_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)

Applying filters on checks

You can apply checks to a part of the DataFrame by using a filter. For example, to check that a column a is not null only when a column b is positive, you can define the check as follows:

- criticality: error
filter: col2 > 0
check:
function: is_not_null
arguments:
column: col1

Creating Custom Checks

Using SQL Expression

You can define custom checks using SQL Expression rule (sql_expression), for example:

- criticality: error
check:
function: sql_expression
arguments:
expression: col1 LIKE '%foo'
msg: col1 ends with 'foo'

SQL Expressions are also useful if you need to make cross-column validation, for example:

- criticality: error
check:
function: sql_expression
arguments:
expression: col1 > col2
msg: col1 is greater than col2

Using Python function

If you need a reusable check or want to implement more complex logic which is challenging to implement using SQL expression, you can define your own custom check function in Python. A check function is a callable that uses make_condition to return pyspark.sql.Column.

Custom check example

import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.check_funcs import make_condition

def not_ends_with(column: str, suffix: str) -> Column:
col_expr = F.col(column)
return make_condition(col_expr.endswith(suffix), f"Column {column} ends with {suffix}", f"{column}_ends_with_{suffix}")

Execution of the custom check programmatically using DQX classes

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx.check_funcs import is_not_null

checks = [
# custom check
DQRowRule(criticality="error", check_func=not_ends_with, column="col1", check_func_kwargs={"suffix": "foo"}),
# built-in check
DQRowRule(criticality="error", check_func=is_not_null, column="col1"),
]

dq_engine = DQEngine(WorkspaceClient())

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)

# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks(input_df, checks)

Execution of the custom check using declarative YAML definition

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

checks = yaml.safe_load("""
- criticality: error
check:
function: not_ends_with
arguments:
column: col1
suffix: foo
- criticality: error
check:
function: is_not_null
arguments:
column: col1
""")

dq_engine = DQEngine(WorkspaceClient())

custom_check_functions = {"not_ends_with": not_ends_with} # list of custom check functions
# or include all functions with globals() for simplicity
#custom_check_functions=globals()

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, custom_check_functions)

# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)

Detecting Personally Identifiable Information (PII) using a Python function

You can write custom check to detect Personally Identifiable Information (PII) in the input data. To use PII-detection libraries (e.g. Presidio), install any required dependencies, define a Spark UDF to detect PII, and use make_condition to call the UDF as a DQX check. See DQX for PII Detection Notebook for a full example.

from pyspark.sql.functions import concat_ws, col, lit, pandas_udf
from pyspark.sql import Column
from databricks.labs.dqx.row_checks import make_condition
from presidio_analyzer import AnalyzerEngine

# Create the Presidio analyzer:
analyzer = AnalyzerEngine()

# Create a wrapper function to generate the entity mapping results:
def get_entity_mapping(data: str) -> str | None:
# Run the Presidio analyzer to detect PII in the string:
results = analyzer.analyze(
text=data,
entities=["PERSON", "EMAIL_ADDRESS"],
language='en',
score_threshold=0.5,
)

# Validate and return the results:
results = [
result.to_dict()
for result in results.entity_mapping()
if result.score >= 0.5
]
if results != []:
return json.dumps(results)
return None

# Register a pandas UDF to run the analyzer:
@pandas_udf('string')
def contains_pii(batch: pd.Series) -> pd.Series:
# Apply `get_entity_mapping` to each value:
return batch.map(get_entity_mapping)

def does_not_contain_pii(col_name: str) -> Column:
# Define a PII detection expression calling the pandas UDF:
pii_info = contains_pii(col(col_name))

# Return the DQX condition that uses the PII detection expression:
return make_condition(
pii_info.isNotNull(),
concat_ws(
' ',
lit(col_name),
lit('contains pii with the following info:'),
pii_info
),
f'{col_name}_contains_pii'
)