Quality Rules
This page provides a reference for the quality checks (rule functions) available in DQX.
You can explore the implementation details of the check functions here.
Row-level checks reference
Row-level checks are applied to each row in a PySpark DataFrame. The quality check results are reported for individual rows in the result columns. You can also define your own custom checks (see Creating custom checks).
Available row-level checks
Check | Description | Arguments |
---|---|---|
is_not_null | Checks whether the values in the input column are not null. | column : column to check (can be a string column name or a column expression) |
is_not_empty | Checks whether the values in the input column are not empty (but may be null). | column : column to check (can be a string column name or a column expression) |
is_not_null_and_not_empty | Checks whether the values in the input column are not null and not empty. | column : column to check (can be a string column name or a column expression); trim_strings : optional boolean flag to trim spaces from strings |
is_in_list | Checks whether the values in the input column are present in the list of allowed values (null values are allowed). | column : column to check (can be a string column name or a column expression); allowed : list of allowed values |
is_not_null_and_is_in_list | Checks whether the values in the input column are not null and present in the list of allowed values. | column : column to check (can be a string column name or a column expression); allowed : list of allowed values |
is_not_null_and_not_empty_array | Checks whether the values in the array input column are not null and not empty. | column : column to check (can be a string column name or a column expression) |
is_in_range | Checks whether the values in the input column are in the provided range (inclusive of both boundaries). | column : column to check (can be a string column name or a column expression); min_limit : min limit as number, date, timestamp, column name or sql expression; max_limit : max limit as number, date, timestamp, column name or sql expression |
is_not_in_range | Checks whether the values in the input column are outside the provided range (inclusive of both boundaries). | column : column to check (can be a string column name or a column expression); min_limit : min limit as number, date, timestamp, column name or sql expression; max_limit : max limit as number, date, timestamp, column name or sql expression |
is_not_less_than | Checks whether the values in the input column are not less than the provided limit. | column : column to check (can be a string column name or a column expression); limit : limit as number, date, timestamp, column name or sql expression |
is_not_greater_than | Checks whether the values in the input column are not greater than the provided limit. | column : column to check (can be a string column name or a column expression); limit : limit as number, date, timestamp, column name or sql expression |
is_valid_date | Checks whether the values in the input column have valid date formats. | column : column to check (can be a string column name or a column expression); date_format : optional date format (e.g. 'yyyy-mm-dd') |
is_valid_timestamp | Checks whether the values in the input column have valid timestamp formats. | column : column to check (can be a string column name or a column expression); timestamp_format : optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') |
is_not_in_future | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | column : column to check (can be a string column name or a column expression); offset : offset to use; curr_timestamp : current timestamp, if not provided current_timestamp() function is used |
is_not_in_near_future | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | column : column to check (can be a string column name or a column expression); offset : offset to use; curr_timestamp : current timestamp, if not provided current_timestamp() function is used |
is_older_than_n_days | Checks whether the values in one input column are at least N days older than the values in another column. | column : column to check (can be a string column name or a column expression); days : number of days; curr_date : current date, if not provided current_date() function is used; negate : if the condition should be negated |
is_older_than_col2_for_n_days | Checks whether the values in one input column are at least N days older than the values in another column. | column1 : first column to check (can be a string column name or a column expression); column2 : second column to check (can be a string column name or a column expression); days : number of days; negate : if the condition should be negated |
regex_match | Checks whether the values in the input column match a given regex. | column : column to check (can be a string column name or a column expression); regex: regex to check; negate : if the condition should be negated (true) or not |
sql_expression | Checks whether the values meet the condition provided as an SQL expression, e.g. a = 'str1' and a > b . SQL expressions are evaluated at runtime, so ensure that the expression is safe and that functions used within it (e.g. h3_ischildof, division) do not throw exceptions. You can achieve this by validating input arguments or columns beforehand using guards such as CASE WHEN, IS NOT NULL, RLIKE, or type try casts. | expression : sql expression to check on a DataFrame (fail the check if expression evaluates to True, pass if it evaluates to False); msg : optional message to output; name : optional name of the resulting column (it can be overwritten by name specified at the check level); negate : if the condition should be negated |
PII detection | Checks whether the values in the input column contain Personally Identifiable Information (PII). This check is not included in DQX built-in rules to avoid introducing 3rd-party dependencies. An example implementation can be found here. |
Row-level checks are designed to operate on a single PySpark DataFrame. If your validation logic requires multiple DataFrames/Tables, you can join them beforehand to produce a single DataFrame on which the checks can be applied. See more here.
Usage examples of Row-level checks
Below are fully specified examples of how to define row-level checks declaratively in YAML format and programmatically with DQX classes. Both are equivalent and can be used interchangeably.
The criticality
field can be either "error" (data goes only into the bad / quarantine DataFrame) or "warn" (data goes into good and bad DataFrames).
For brevity, the name
field in the examples is omitted and it will be auto-generated in the results.
Checks defined in YAML
# is_not_null check
- criticality: error
check:
function: is_not_null
arguments:
column: col1
# is_not_empty check
- criticality: error
check:
function: is_not_empty
arguments:
column: col1
# is_not_null_and_not_empty check
- criticality: error
check:
function: is_not_null_and_not_empty
arguments:
column: col1
trim_strings: true
# is_in_list check
- criticality: error
check:
function: is_in_list
arguments:
column: col2
allowed:
- 1
- 2
- 3
# is_not_null_and_is_in_list check
- criticality: error
check:
function: is_not_null_and_is_in_list
arguments:
column: col2
allowed:
- 1
- 2
- 3
# is_not_null_and_not_empty_array check
- criticality: error
check:
function: is_not_null_and_not_empty_array
arguments:
column: col4
# is_in_range check
- criticality: error
check:
function: is_in_range
arguments:
column: col2
min_limit: 1
max_limit: 10
- criticality: error
check:
function: is_in_range
arguments:
column: col5
min_limit: 2025-01-01
max_limit: 2025-02-24
- criticality: error
check:
function: is_in_range
arguments:
column: col6
min_limit: 2025-01-01 00:00:00
max_limit: 2025-02-24 01:00:00
- criticality: error
check:
function: is_in_range
arguments:
column: col3
min_limit: col2
max_limit: col2 * 2
# is_not_in_range check
- criticality: error
check:
function: is_not_in_range
arguments:
column: col2
min_limit: 11
max_limit: 20
- criticality: error
check:
function: is_not_in_range
arguments:
column: col5
min_limit: 2025-02-25
max_limit: 2025-02-26
- criticality: error
check:
function: is_not_in_range
arguments:
column: col6
min_limit: 2025-02-25 00:00:00
max_limit: 2025-02-26 01:00:00
- criticality: error
check:
function: is_not_in_range
arguments:
column: col3
min_limit: col2 + 10
max_limit: col2 * 10
# is_not_less_than check
- criticality: error
check:
function: is_not_less_than
arguments:
column: col2
limit: 0
- criticality: error
check:
function: is_not_less_than
arguments:
column: col5
limit: 2025-01-01
- criticality: error
check:
function: is_not_less_than
arguments:
column: col6
limit: 2025-01-01 01:00:00
- criticality: error
check:
function: is_not_less_than
arguments:
column: col3
limit: col2 - 10
# is_not_greater_than check
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col2
limit: 10
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col5
limit: 2025-03-01
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col6
limit: 2025-03-24 01:00:00
- criticality: error
check:
function: is_not_greater_than
arguments:
column: col3
limit: col2 + 10
# is_valid_date check
- criticality: error
check:
function: is_valid_date
arguments:
column: col5
- criticality: error
name: col5_is_not_valid_date2
check:
function: is_valid_date
arguments:
column: col5
date_format: yyyy-MM-dd
# is_valid_timestamp check
- criticality: error
check:
function: is_valid_timestamp
arguments:
column: col6
timestamp_format: yyyy-MM-dd HH:mm:ss
- criticality: error
name: col6_is_not_valid_timestamp2
check:
function: is_valid_timestamp
arguments:
column: col6
# is_not_in_future check
- criticality: error
check:
function: is_not_in_future
arguments:
column: col6
offset: 86400
# is_not_in_near_future check
- criticality: error
check:
function: is_not_in_near_future
arguments:
column: col6
offset: 36400
# is_older_than_n_days check
- criticality: error
check:
function: is_older_than_n_days
arguments:
column: col5
days: 10000
# is_older_than_col2_for_n_days check
- criticality: error
check:
function: is_older_than_col2_for_n_days
arguments:
column1: col5
column2: col6
days: 2
# regex_match check
- criticality: error
check:
function: regex_match
arguments:
column: col2
regex: '[0-9]+'
negate: false
# sql_expression check
- criticality: error
check:
function: sql_expression
arguments:
expression: col3 >= col2 and col3 <= 10
msg: col3 is less than col2 and col3 is greater than 10
name: custom_output_name
negate: false
# apply check to multiple columns
- criticality: error
check:
function: is_not_null # 'column' as first argument
for_each_column: # apply the check for each column in the list
- col3
- col5
Checks defined programmatically using DQX classes
from databricks.labs.dqx.rule import DQRowRule, DQForEachColRule
from databricks.labs.dqx import check_funcs
from datetime import datetime
checks = [
# is_not_null check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column="col1"
),
# is_not_empty check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_empty,
column="col1"
),
# is_not_null_and_not_empty check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_not_empty,
column="col1",
check_func_kwargs={"trim_strings": True}
),
# is_in_list check
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2, 3]}
),
# is_not_null_and_is_in_list check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_is_in_list,
column="col2",
check_func_kwargs={"allowed": [1, 2, 3]}
),
# is_not_null_and_not_empty_array check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null_and_not_empty_array,
column="col4"
),
# is_in_range check
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_kwargs={"min_limit": 1, "max_limit": 10}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col5",
check_func_kwargs={
"min_limit": datetime(2025, 1, 1).date(),
"max_limit": datetime(2025, 2, 24).date()
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col6",
check_func_kwargs={
"min_limit": datetime(2025, 1, 1, 0, 0, 0),
"max_limit": datetime(2025, 2, 24, 1, 0, 0)
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col3",
check_func_kwargs={
"min_limit": "col2",
"max_limit": "col2 * 2"
}
),
# is_not_in_range check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col2",
check_func_kwargs={
"min_limit": 11,
"max_limit": 20
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col5",
check_func_kwargs={
"min_limit": datetime(2025, 2, 25).date(),
"max_limit": datetime(2025, 2, 26).date()
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col6",
check_func_kwargs={
"min_limit": datetime(2025, 2, 25, 0, 0, 0),
"max_limit": datetime(2025, 2, 26, 1, 0, 0)
}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_range,
column="col3",
check_func_kwargs={
"min_limit": "col2 + 10",
"max_limit": "col2 * 10"
}
),
# is_not_less_than check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col2",
check_func_kwargs={"limit": 0}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col5",
check_func_kwargs={"limit": datetime(2025, 1, 1).date()}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col6",
check_func_kwargs={"limit": datetime(2025, 1, 1, 1, 0, 0)}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column="col3",
check_func_kwargs={"limit": "col2 - 10"}
),
# is_not_greater_than check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col2",
check_func_kwargs={"limit": 10}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col5",
check_func_kwargs={"limit": datetime(2025, 3, 1).date()}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col6",
check_func_kwargs={"limit": datetime(2025, 3, 24, 1, 0, 0)}
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column="col3",
check_func_kwargs={"limit": "col2 + 10"}
),
# is_valid_date check
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_date,
column="col5"
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_date,
column="col5",
check_func_kwargs={"date_format": "yyyy-MM-dd"},
name="col5_is_not_valid_date2"
),
# is_valid_timestamp check
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_timestamp,
column="col6"
),
DQRowRule(
criticality="error",
check_func=check_funcs.is_valid_timestamp,
column="col6",
check_func_kwargs={"timestamp_format": "yyyy-MM-dd HH:mm:ss"},
name="col6_is_not_valid_timestamp2"
),
# is_not_in_future check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_future,
column="col6",
check_func_kwargs={"offset": 86400}
),
# is_not_in_near_future check
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_in_near_future,
column="col6",
check_func_kwargs={"offset": 36400}
),
# is_older_than_n_days check
DQRowRule(
criticality="error",
check_func=check_funcs.is_older_than_n_days,
column="col5",
check_func_kwargs={"days": 10000}
),
# is_older_than_col2_for_n_days check
DQRowRule(
criticality="error",
check_func=check_funcs.is_older_than_col2_for_n_days,
check_func_kwargs={"column1": "col5", "column2": "col6", "days": 2}
),
# regex_match check
DQRowRule(
criticality="error",
check_func=check_funcs.regex_match,
column="col2",
check_func_kwargs={
"regex": "[0-9]+", "negate": False
}
),
# sql_expression check
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "col3 >= col2 and col3 <= 10",
"msg": "col3 is less than col2 and col3 is greater than 10",
"name": "custom_output_name",
"negate": False
}
),
]
# apply check to multiple columns
checks = checks + DQForEachColRule(
check_func=check_funcs.is_not_null, # 'column' as first argument
criticality="error",
columns=["col3", "col5"] # apply the check for each column in the list
).get_rules()
- Checks defined programmatically with DQX classes can be created using positional or keyword/named arguments:
# using keyword/named arguments
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_kwargs={"min_limit": 1, "max_limit": 10}
)
# using positional arguments
DQRowRule(
criticality="error",
check_func=check_funcs.is_in_range,
column="col2",
check_func_args=[1, 10]
)
Usage examples of Row-level checks for complex types (Struct, Map, Array)
You can apply any of checks to complex column types (Struct
, MapType
, ArrayType
) by passing a column expression to the check function or by using the sql_expression
check function.
Below are examples of how to apply checks for complex types declaratively in YAML format and programmatically with DQX classes.
Checks on complex column types defined declaratively in YAML
# is_not_null check applied to a struct column element (dot notation)
- criticality: error
check:
function: is_not_null
arguments:
column: col8.field1
# is_not_null check applied to a map column element
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col7, 'key1')
# is_not_null check applied to an array column element at the specified position
- criticality: error
check:
function: is_not_null
arguments:
column: try_element_at(col4, 1)
# is_not_greater_than check applied to an array column
- criticality: error
check:
function: is_not_greater_than
arguments:
column: array_max(col4)
limit: 10
# is_not_less_than check applied to an array column
- criticality: error
check:
function: is_not_less_than
arguments:
column: array_min(col4)
limit: 1
# sql_expression check applied to a map column element
- criticality: error
check:
function: sql_expression
arguments:
expression: try_element_at(col7, 'key1') >= 10
msg: col7 element 'key1' is less than 10
name: col7_element_key1_less_than_10
negate: false
# sql_expression check applied to an array of map column elements
- criticality: error
check:
function: sql_expression
arguments:
expression: not exists(col4, x -> x > 10)
msg: array col4 has an element greater than 10
name: col4_all_elements_less_than_10
negate: false
# apply check to multiple columns (simple col, struct, map and array)
- criticality: error
check:
function: is_not_null
for_each_column:
- col1 # col
- col8.field1 # struct col
- try_element_at(col7, 'key1') # map col
- try_element_at(col4, 1) # array col
Checks on complex column types defined programmatically using DQX classes
import pyspark.sql.functions as F
from databricks.labs.dqx.rule import DQRowRule, DQForEachColRule
from databricks.labs.dqx import check_funcs
checks = [
# is_not_null check applied to a struct column element (dot notation)
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column="col8.field1",
),
# is_not_null check applied to a map column element
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col7", F.lit("key1")),
),
# is_not_null check applied to an array column element at the specified position
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_null,
column=F.try_element_at("col4", F.lit(1)),
),
# is_not_greater_than check applied to an array column
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_greater_than,
column=F.array_max("col4"),
check_func_kwargs={"limit": 10},
),
# is_not_less_than check applied to an array column
DQRowRule(
criticality="error",
check_func=check_funcs.is_not_less_than,
column=F.array_min("col4"),
check_func_kwargs={"limit": 1},
),
# sql_expression check applied to a map column element
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "try_element_at(col7, 'key1') >= 10",
"msg": "col7 element 'key1' is less than 10",
"name": "col7_element_key1_less_than_10",
"negate": False,
},
),
# sql_expression check applied to an array of map column elements
DQRowRule(
criticality="error",
check_func=check_funcs.sql_expression,
check_func_kwargs={
"expression": "not exists(col4, x -> x > 10)",
"msg": "array col4 has an element greater than 10",
"name": "col4_all_elements_less_than_10",
"negate": False,
},
),
]
# apply check to multiple columns (simple col, map and array)
checks = checks + DQForEachColRule(
check_func=check_funcs.is_not_null,
criticality="error",
columns=[
"col1", # col as string
F.col("col2"), # col
"col8.field1", # struct col
F.try_element_at("col7", F.lit("key1")), # map col
F.try_element_at("col4", F.lit(1)) # array col
]
).get_rules()
Dataset-level checks reference
Dataset-level checks are applied to group of rows in a PySpark DataFrame. Similar to row-level checks, the results of these quality checks are reported for each individual row in the result columns. That means you can define and apply row-level and dataset-level checks at once.
Dataset-level checks are useful for validating aggregated values of the entire DataFrame or groups of rows within the DataFrame, such as ensuring that the number of rows does not exceed a certain limit or that the sum of a column is within a specified range.
You can also define your own custom dataset-level checks (see Creating custom checks).
Available dataset-level checks
Check | Description | Arguments |
---|---|---|
is_unique | Checks whether the values in the input column are unique and reports an issue for each row that contains a duplicate value. It supports uniqueness check for multiple columns (composite key). Null values are not considered duplicates by default, following the ANSI SQL standard. | columns : columns to check (can be a list of column names or column expressions); nulls_distinct : controls how null values are treated, default is True, thus nulls are not duplicates, eg. (NULL, NULL) not equals (NULL, NULL) and (1, NULL) not equals (1, NULL) |
is_aggr_not_greater_than | Checks whether the aggregated values over group of rows or all rows are not greater than the provided limit. | column : column to check (can be a string column name or a column expression), optional for 'count' aggregation; limit : limit as number, column name or sql expression; aggr_type : aggregation function to use, such as "count" (default), "sum", "avg", "min", and "max"; group_by : optional list of columns or column expressions to group the rows for aggregation (no grouping by default) |
is_aggr_not_less_than | Checks whether the aggregated values over group of rows or all rows are not less than the provided limit. | column : column to check (can be a string column name or a column expression), optional for 'count' aggregation; limit : limit as number, column name or sql expression; aggr_type : aggregation function to use, such as "count" (default), "sum", "avg", "min", and "max"; group_by : optional list of columns or column expressions to group the rows for aggregation (no grouping by default) |
foreign_key | Checks whether input column or columns can be found in the reference DataFrame or Table (foreign key check). It supports foreign key check on single and composite keys. | columns : columns to check (can be a list of string column names or a column expressions); ref_columns : reference columns to check against in the reference DataFrame or Table (can be a list string column name or a column expression); ref_df_name : (optional) name of the reference DataFrame (dictionary of DataFrames can be passed when applying checks); ref_table : (optional) full qualified reference table name; either ref_df_name or ref_table must be provided but never both. The number of passed columns and ref_columns must match and keys are checks in the given order. |
sql_query | Checks whether the condition column produced by a SQL query is satisfied. The query must return both the condition column and the necessary merge columns so that results from the query can be matched with the input DataFrame. If merge columns are not unique it can potentially lead to some false positives. | query : query string, must return all merge columns and condition column; input_placeholder : name to be used in the sql query as {{ input_placeholder }} to refer to the input DataFrame, optional reference DataFrames are referred by the name provided in the dictionary of reference DataFrames (e.g. {{ ref_view }} , dictionary of DataFrames can be passed when applying checks); merge_columns : list of columns used for merging with the input DataFrame (must exist in the input DataFrame and be present in output of the sql query); condition_column : name of the column indicating a violation - the check fails if this column evaluates to True, and passes if it evaluates to False; msg : optional message to output; name : optional name of the resulting check (it can be overwritten by name specified at the check level); negate : if the condition should be negated |
Usage examples of Dataset-level checks
Below are fully specified examples of how to define dataset-level checks declaratively in YAML format and programmatically with DQX classes. Both are equivalent and can be used interchangeably.
Similar to row-level checks, the results of the dataset-level quality checks are reported for each individual row in the result columns. Complex data types are supported as well.
Checks defined in YAML
# is_unique check
- criticality: error
check:
function: is_unique
arguments:
columns:
- col1
# is_unique on multiple columns (composite key), nulls are distinct (default behavior)
# eg. (1, NULL) not equals (1, NULL) and (NULL, NULL) not equals (NULL, NULL)
- criticality: error
name: composite_key_col1_and_col2_is_not_unique
check:
function: is_unique
arguments:
columns:
- col1
- col2
# is_unique on multiple columns (composite key), nulls are not distinct
# eg. (1, NULL) equals (1, NULL) and (NULL, NULL) equals (NULL, NULL)
- criticality: error
name: composite_key_col1_and_col2_is_not_unique_not_nulls_distinct
check:
function: is_unique
arguments:
columns:
- col1
- col2
nulls_distinct: False
# is_aggr_not_greater_than check with count aggregation over all rows
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: '*'
aggr_type: count
limit: 10
# is_aggr_not_greater_than check with aggregation over col2 (skip nulls)
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
limit: 10
# is_aggr_not_greater_than check with aggregation over col2 grouped by col3 (skip nulls)
- criticality: error
check:
function: is_aggr_not_greater_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
group_by:
- col3
limit: 10
# is_aggr_not_less_than check with count aggregation over all rows
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: '*'
aggr_type: count
limit: 1
# is_aggr_not_less_than check with aggregation over col2 (skip nulls)
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
limit: 1
# is_aggr_not_less_than check with aggregation over col2 grouped by col3 (skip nulls)
- criticality: error
check:
function: is_aggr_not_less_than
arguments:
column: col2
aggr_type: count # other types: sum, avg, min, max
group_by:
- col3
limit: 1
# foreign_key check using reference DataFrame
- criticality: error
check:
function: foreign_key
arguments:
columns:
- col1
ref_columns:
- ref_col1
ref_df_name: ref_df_key
# foreign_key check using reference table
- criticality: error
check:
function: foreign_key
arguments:
columns:
- col1
ref_columns:
- ref_col1
ref_table: catalog1.schema1.ref_table
# foreign_key check on composite key
- criticality: error
check:
function: foreign_key
arguments:
columns:
- col1
- col2
ref_columns:
- ref_col1
- ref_col2
ref_df_name: ref_df_key
# sql_query check
- criticality: error
check:
function: sql_query
arguments:
# sql query must return all merge_columns and condition column
query: SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} GROUP BY col1, col2
input_placeholder: input_view # name to be used in the sql query as `{{ input_view }}` to refer to the input DataFrame
merge_columns: # columns used for merging with the input DataFrame
- col1
- col2
condition_column: condition # the check fails if this column evaluates to True
msg: sql query check failed # optional
name: sql_query_violation # optional
negate: false # optional, default False
# apply check to multiple columns
- criticality: error
check:
function: is_unique # 'column' as first argument
for_each_column: # apply the check for each column in the list
- [col3, col5]
- [col1]
Providing Reference DataFrames
When applying dataset-level checks, you can optionally provide reference DataFrames.
These reference DataFrames are passed as a dictionary to the apply_checks_by_metadata
and apply_checks_by_metadata_and_split
methods:
ref_df = spark.table("catalog1.schema1.ref_table")
ref_dfs = {"ref_view": ref_df}
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(df, checks, ref_dfs=ref_dfs)
good_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(df, checks, ref_dfs=ref_dfs)
The reference DataFrames are used in selected Dataset-level checks:
-
foreign_key
: required for this check ifref_df_name
argument is specified, e.g.ref_df_name="ref_view"
The value of
ref_df_name
must match the key in theref_dfs
dictionary. -
sql_query
: the reference DataFrames are registered as temporary views and can be used in the sql query.For example, if you have a reference DataFrame named
ref_view
, you can use it in the SQL query as{{ ref_view }}
:SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} input JOIN {{ ref_view }} ref ON input.col1 = ref.ref_col1 GROUP BY col1, col2
You can also use reference table directly in the
sql
query without supplying it as a DataFrame:SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} input JOIN catalog1.schema1.ref_table ref ON input.col1 = ref.ref_col1 GROUP BY col1, col2
Checks defined programmatically using DQX classes
from databricks.labs.dqx.rule import DQRowRule, DQForEachColRule
from databricks.labs.dqx import check_funcs
from datetime import datetime
checks = [
# is_unique check
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1"] # require list of columns
),
# is_unique check
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=[F.col("col1")] # require list of columns
),
# is_unique on multiple columns (composite key), nulls are distinct (default behavior)
# eg. (1, NULL) not equals (1, NULL) and (NULL, NULL) not equals (NULL, NULL)
DQDatasetRule(
criticality="error",
name="composite_key_col1_and_col2_is_not_unique",
check_func=check_funcs.is_unique,
columns=["col1", "col2"]
),
# is_unique on multiple columns (composite key), nulls are not distinct
# eg. (1, NULL) equals (1, NULL) and (NULL, NULL) equals (NULL, NULL)
DQDatasetRule(
criticality="error",
name="composite_key_col1_and_col2_is_not_unique_nulls_not_distinct",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
check_func_kwargs={
"nulls_distinct": False
}
),
# is_aggr_not_greater_than check with count aggregation over all rows
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="*",
check_func_kwargs={
"aggr_type": "count",
"limit": 10
},
),
# is_aggr_not_greater_than check with aggregation over col2 (skip nulls)
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"limit": 10
},
),
# is_aggr_not_greater_than check with aggregation over col2 grouped by col3 (skip nulls)
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_greater_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"group_by": ["col3"],
"limit": 10
},
),
# is_aggr_not_less_than check with count aggregation over all rows
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="*",
check_func_kwargs={
"aggr_type": "count",
"limit": 1
},
),
# is_aggr_not_less_than check with aggregation over col2 (skip nulls)
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"limit": 1
},
),
# is_aggr_not_less_than check with aggregation over col2 grouped by col3 (skip nulls)
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_aggr_not_less_than,
column="col2",
check_func_kwargs={
"aggr_type": "count",
"group_by": ["col3"],
"limit": 1
},
),
# foreign_key check using reference DataFrame
DQDatasetRule(
criticality="warn",
check_func=check_funcs.foreign_key,
columns=["col1"],
check_func_kwargs={
"ref_columns": ["ref_col1"],
"ref_df_name": "ref_df_key",
},
# foreign_key check using reference table
DQDatasetRule(
criticality="warn",
check_func=check_funcs.foreign_key,
columns=["col1"],
check_func_kwargs={
"ref_columns": ["ref_col1"],
"ref_table": "catalog1.schema1.ref_table",
},
# foreign_key check on composite key
DQDatasetRule(
criticality="warn",
check_func=check_funcs.foreign_key,
columns=["col1", "col2"],
check_func_kwargs={
"ref_columns": ["ref_col1", "ref_col2"],
"ref_df_name": "ref_df_key",
},
),
# sql_query check
DQDatasetRule(
criticality="error",
check_func=sql_query,
check_func_kwargs={
# the sql query must return all merge_columns and condition column
"query": "SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} GROUP BY col1, col2",
"input_placeholder": "input_view", # name to be used in the sql query as `{{ input_placeholder }}` to refer to the input DataFrame
"merge_columns": ["col1", "col2"], # columns used for merging with the input DataFrame
"condition_column": "condition", # the check fails if this column evaluates to True
"msg": "sql query check failed", # optional
"name": "sql_query_violation", # optional
"negate": False # optional, default False
},
),
]
# apply check to multiple columns
checks = checks + DQForEachColRule(
check_func=check_funcs.is_unique, # 'columns' as first argument
criticality="error",
columns=[["col3", "col5"], ["col1"]] # apply the check for each list of columns
).get_rules()
Providing Reference DataFrames
When applying dataset-level checks, you can optionally provide reference DataFrames.
These reference DataFrames are passed as a dictionary to the apply_checks
and apply_checks_and_split
methods:
ref_df = spark.table("catalog1.schema1.ref_table")
ref_dfs = {"ref_view": ref_df}
valid_and_quarantine_df = dq_engine.apply_checks(df, checks, ref_dfs=ref_dfs)
good_df, quarantine_df = dq_engine.apply_checks_and_split(df, checks, ref_dfs=ref_dfs)
The reference DataFrames are used in selected Dataset-level checks:
-
foreign_key
: required for this check ifref_df_name
argument is specified, e.g.ref_df_name="ref_view"
The value of
ref_df_name
must match the key in theref_dfs
dictionary. -
sql_query
: the reference DataFrames are registered as temporary views and can be used in the sql query.For example, if you have a reference DataFrame named
ref_view
, you can use it in the SQL query as{{ ref_view }}
:SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} input JOIN {{ ref_view }} ref ON input.col1 = ref.ref_col1 GROUP BY col1, col2
You can also use reference table directly in the
sql
query without supplying it as a DataFrame:SELECT col1, col2, SUM(col3) = 0 AS condition FROM {{ input_view }} input JOIN catalog1.schema1.ref_table ref ON input.col1 = ref.ref_col1 GROUP BY col1, col2
Creating Custom Row-level Checks
Using SQL Expression
You can define custom checks using SQL Expression rule (sql_expression
).
Define the check in YAML
- criticality: error
check:
function: sql_expression
arguments:
expression: col1 NOT LIKE '%foo'
msg: col1 ends with 'foo'
SQL Expressions are also useful if you need to make cross-column validation, for example:
- criticality: error
check:
function: sql_expression
arguments:
expression: col1 <= col2
msg: col1 is greater than col2
Define the check using DQX classes
- Python
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx.check_funcs import sql_expression
checks = [
DQRowRule(
criticality="error",
check_func=sql_expression,
check_func_kwargs={
"expression": "col1 NOT LIKE '%foo'",
"msg": "col1 ends with 'foo'"
},
)
]
)
Using Python function
If you need a reusable check or want to implement more complex logic which is challenging to implement using SQL expression, you can define your own custom check function in Python.
A check function is a callable that uses make_condition
to return pyspark.sql.Column
.
Custom check example
- Python
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.check_funcs import make_condition
from databricks.labs.dqx.rule import register_rule
@register_rule("row")
def not_ends_with(column: str, suffix: str) -> Column:
col_expr = F.col(column)
return make_condition(col_expr.endswith(suffix), f"Column {column} ends with {suffix}", f"{column}_ends_with_{suffix}")
Execution of the custom python check programmatically using DQX classes
- Python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx.check_funcs import is_not_null
checks = [
# custom check
DQRowRule(criticality="error", check_func=not_ends_with, column="col1", check_func_kwargs={"suffix": "foo"}),
# built-in check
DQRowRule(criticality="error", check_func=is_not_null, column="col1"),
]
dq_engine = DQEngine(WorkspaceClient())
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks(input_df, checks)
Execution of custom python check using declarative YAML definition
- Python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = yaml.safe_load("""
- criticality: error
check:
function: not_ends_with
arguments:
column: col1
suffix: foo
- criticality: error
check:
function: is_not_null
arguments:
column: col1
""")
dq_engine = DQEngine(WorkspaceClient())
custom_check_functions = {"not_ends_with": not_ends_with} # list of custom check functions
# or include all functions with globals() for simplicity
#custom_check_functions=globals()
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, custom_check_functions)
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)
Custom row-level checks using Window functions (for group of rows)
You can create custom checks that operate on groups of rows within the same DataFrame using window functions. Alternatively, you can implement it using dataset-level checks (see Creating Custom Dataset-level Checks for details) where you can also work across multiple DataFrames.
Let’s look at an example involving sensor readings:
measurement_id | sensor_id | reading_value |
---|---|---|
1 | 1 | 4 |
1 | 2 | 1 |
2 | 2 | 110 |
Requirement: We want to fail all readings from a sensor if any reading for that sensor exceeds a specified threshold (e.g. 100). In the example above, sensor 2 has a reading of 110, which exceeds the threshold. Therefore, we want to report an error for all readings from sensor 2 - for both measurement_id 1 and measurement_id 2.
You can implement this check using a SQL expression or a custom Python function.
Using SQL expression
Define in YAML:
- criticality: error
name: sensor_reading_exceeded
check:
function: sql_expression
arguments:
expression: "MAX(reading_value) OVER (PARTITION BY sensor_id) > 100"
msg: "one of the sensor reading is greater than 100"
negate: true
Use DQX classes:
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx import check_funcs
checks = [
DQRowRule(criticality="error", check_func=sql_expression, name="sensor_reading_exceeded",
check_func_kwargs={
"expression": "MAX(reading_value) OVER (PARTITION BY sensor_id) > 100",
"msg": "one of the sensor reading is greater than 100",
"negate": True
}
)
]
Using a custom python check function
- Python
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.check_funcs import make_condition
from databricks.labs.dqx.rule import register_rule
from databricks.labs.dqx.rule import DQRowRule
from pyspark.sql.window import Window
@register_rule("row")
def sensor_reading_less_than(limit: int) -> Column:
condition = (F.max(F.col("reading_value")).over(Window.partitionBy("sensor_id")) > limit)
return make_condition(condition, "one of the sensor reading is greater than 100", "sensor_reading_exceeded")
# -------------------------------------------
# Define checks using DQX classes
checks = [
# custom check
DQRowRule(criticality="error", name="sensor_reading_exceeded", check_func=sensor_reading_less_than, check_func_kwargs={"limit": 100}),
# any other check ...
]
dq_engine = DQEngine(WorkspaceClient())
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks(input_df, checks)
# -------------------------------------------
# Define checks using YAML declarative approach
checks = yaml.safe_load("""
- criticality: error
name: sensor_reading_exceeded
check:
function: sensor_reading_less_than
arguments:
limit: 100
# any other check ...
""")
custom_check_functions = {"sensor_reading_less_than": sensor_reading_less_than} # list of custom check functions
# or include all functions with globals() for simplicity
#custom_check_functions=globals()
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, custom_check_functions)
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks, custom_check_functions)
Creating Custom Dataset-level Checks
You can perform certain checks for a group of rows using window functions in custom row-level checks as described in the previous section. For more complex scenarios you should use dataset-level checks. They are useful if you need to make aggregation level checks within the DataFrame or across multiple DataFrames. Similar to row-level checks, the results of the dataset-level quality checks are reported for each individual row in the result columns.
Custom dataset-level checks can be created using sql_query
check function or by implementing a Python function that returns a condition and a closure function.
The closure function must return a DataFrame with additional column for condition. This would typically be a column that contains boolean values indicating whether the condition is met for each row in the DataFrame.
You can perform arbitrary aggregation logic in the dataset-level functions using the input DataFrame and reference DataFrames or Tables.
When defining a custom dataset-level check in Python, you should join the reference DataFrames back to the input DataFrame to retain the original records in the output. Any additional columns added to the DataFrame in the closure function will automatically be removed from the results.
Let’s look at an example involving sensor readings:
measurement_id | sensor_id | reading_value |
---|---|---|
1 | 1 | 4 |
1 | 2 | 1 |
2 | 2 | 110 |
and a sensor specification:
sensor_id | min_threshold |
---|---|
1 | 5 |
2 | 100 |
Requirement: We want to fail all readings from a sensor if any reading for that sensor exceeds a specified threshold from the sensor specification table. In the example above, sensor 2 has a reading of 110, which exceeds the threshold. Therefore, we want to report an error for all readings from sensor 2 - for both measurement_id 1 and measurement_id 2.
This can be implemented using sql_query
dataset-level check.
However, if you need a reusable check, it may be better to implement a custom python dataset-level check function.
Using SQL Query
Define checks with DQX classes:
- Python
query = """
WITH joined AS (
SELECT
sensor.*,
COALESCE(specs.min_threshold, 100) AS effective_threshold
FROM {{ sensor }} sensor
LEFT JOIN {{ sensor_specs }} specs
ON sensor.sensor_id = specs.sensor_id
)
SELECT
sensor_id,
MAX(CASE WHEN reading_value > effective_threshold THEN 1 ELSE 0 END) = 1 AS condition
FROM joined
GROUP BY sensor_id
"""
checks = [
DQDatasetRule(
criticality="error",
check_func=sql_query,
check_func_kwargs={
"query": query,
"merge_columns": ["sensor_id"],
"condition_column": "condition", # the check fails if this column evaluates to True
"msg": "one of the sensor reading is greater than limit",
"name": "sensor_reading_check",
"input_placeholder": "sensor", # view name to access the input DataFrame on which the check is applied
},
),
# any other checks ...
]
sensor_df = spark.table("catalog1.schema1.sensor_readings") # input DataFrame with sensor readings
sensor_specs_df = spark.table("catalog1.schema1.sensor_specs") # reference DataFrame with sensor specifications
ref_dfs = {"sensor_specs": sensor_specs_df}
valid_and_quarantine_df = dq_engine.apply_checks(sensor_df, checks, ref_dfs=ref_dfs)
Define checks declaratively in YAML:
- Python
checks = yaml.safe_load(
"""
- criticality: error
check:
function: sql_query
arguments:
merge_columns:
- sensor_id
condition_column: condition
msg: one of the sensor reading is greater than limit
name: sensor_reading_check
negate: false
input_placeholder: sensor
query: |
WITH joined AS (
SELECT
sensor.*,
COALESCE(specs.min_threshold, 100) AS effective_threshold
FROM {{ sensor }} sensor
LEFT JOIN {{ sensor_specs }} specs
ON sensor.sensor_id = specs.sensor_id
)
SELECT
sensor_id,
MAX(CASE WHEN reading_value > effective_threshold THEN 1 ELSE 0 END) = 1 AS condition
FROM joined
GROUP BY sensor_id
"""
)
ref_dfs = {"sensor_specs": sensor_specs_df}
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(sensor_df, checks, ref_dfs=ref_dfs)
Using Python function
- Python
import uuid
from databricks.labs.dqx.rule import register_rule
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as F
from collections.abc import Callable
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQDatasetRule
from databricks.labs.dqx.check_funcs import make_condition
@register_rule("dataset") # must be registered as dataset-level check
def sensor_reading_less_than(default_limit: int) -> tuple[Column, Callable]:
# make sure any column added to the dataframe is unique
condition_col = "condition" + uuid.uuid4().hex
def apply(df: DataFrame, ref_dfs: dict[str, DataFrame]) -> DataFrame:
"""
Validates that all readings per sensor are above sensor-specific threshold.
If any are not, flags all readings of that sensor as failed.
The closure function must take as arguments:
* df: DataFrame
* (Optional) spark: SparkSession
* (Optional) ref_dfs: dict[str, DataFrame]
"""
sensor_specs_df = ref_dfs["sensor_specs"] # Should contain: sensor_id, min_threshold
# you can also read from a Table directly:
# sensor_specs_df = spark.table("catalog.schema.sensor_specs")
# Join sensor readings with specs
sensor_df = df.join(sensor_specs_df, on="sensor_id", how="left")
sensor_df = sensor_df.withColumn("effective_threshold", F.coalesce(F.col("min_threshold"), F.lit(default_limit)))
# Check if any sensor reading is below the spec-defined min_threshold per sensor
aggr_df = (
sensor_df
.groupBy("sensor_id")
.agg(
(F.max(F.when(F.col("reading_value") > F.col("effective_threshold"), 1).otherwise(0)) == 1)
.alias(condition_col)
)
)
# Join back to input DataFrame
return df.join(aggr_df, on="sensor_id", how="left")
return (
make_condition(
condition=F.col(condition_col), # check if condition column evaluates to True
message=f"one of the sensor reading is greater than limit",
alias="sensor_reading_check",
),
apply
)
Implementation using spark.sql
:
- Python
import uuid
from databricks.labs.dqx.rule import register_rule
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as F
from collections.abc import Callable
from databricks.labs.dqx.check_funcs import make_condition
@register_rule("dataset") # must be registered as dataset-level check
def sensor_reading_less_than(default_limit: int) -> tuple[Column, Callable]:
# make sure any column added to the dataframe and registered temp views are unique
# in case the check / function is applied multiple times
unique_str = uuid.uuid4().hex
condition_col = "condition_" + unique_str
def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) -> DataFrame:
# Register the main and reference DataFrames as temporary views
sensor_view_unique = "sensor_" + unique_str
df.createOrReplaceTempView(sensor_view_unique)
sensor_specs_view_unique = "sensor_specs_" + unique_str
ref_dfs["sensor_specs"].createOrReplaceTempView(sensor_specs_view_unique)
# Perform the check
query = f"""
WITH joined AS (
SELECT
sensor.*,
COALESCE(specs.min_threshold, {default_limit}) AS effective_threshold
FROM {sensor_view_unique} sensor
-- we could also access Table directly: catalog.schema.sensor_specs
LEFT JOIN {sensor_specs_view_unique} specs
ON sensor.sensor_id = specs.sensor_id
),
aggr AS (
SELECT
sensor_id,
MAX(CASE WHEN reading_value > effective_threshold THEN 1 ELSE 0 END) = 1 AS {condition_col}
FROM joined
GROUP BY sensor_id
)
-- join back to the input DataFrame to retain original records
SELECT
sensor.*,
aggr.{condition_col}
FROM {sensor_view_unique} sensor
LEFT JOIN aggr
ON sensor.sensor_id = aggr.sensor_id
"""
return spark.sql(query)
return (
make_condition(
condition=F.col(condition_col), # check if condition column is True
message=f"one of the sensor reading is greater than limit",
alias="sensor_reading_check",
),
apply
)
Execution of the custom python dataset check programmatically using DQX classes
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQDatasetRule
checks = [
DQDatasetRule(criticality="error", check_func=sensor_reading_less_than, name="sensor_reading_exceeded",
check_func_kwargs={
"default_limit": 100
}
),
# any other checks ...
]
dq_engine = DQEngine(WorkspaceClient())
# Pass reference DataFrame with sensor specifications
ref_dfs = {"sensor_specs": sensor_specs_df}
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantine_df = dq_engine.apply_checks_and_split(sensor_df, checks, ref_dfs=ref_dfs)
# Option 2: apply quality rules on the dataframe and report issues as additional columns
valid_and_quarantine_df = dq_engine.apply_checks(sensor_df, checks, ref_dfs=ref_dfs)
Execution of custom python check using declarative YAML definition
- Python
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = yaml.safe_load("""
- criticality: error
check:
function: sensor_reading_less_than
arguments:
default_limit: 100
# any other checks ...
""")
dq_engine = DQEngine(WorkspaceClient())
custom_check_functions = {"sensor_reading_less_than": sensor_reading_less_than} # list of custom check functions
# or include all functions with globals() for simplicity
#custom_check_functions=globals()
# Pass reference DataFrame with sensor specifications
ref_dfs = {"sensor_specs": sensor_specs_df}
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(sensor_df, checks, custom_check_functions, ref_dfs=ref_dfs)
display(valid_and_quarantine_df)
Applying checks on Multiple Data Sets
Using Row-level checks
Row-level checks are intended to operate on a single DataFrame as input. If your validation logic requires multiple DataFrames or Tables as input, you can join them before applying the row-level checks. However, this may not be efficient for large datasets or complex validation logic.
Examples
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
sensor_df.createOrReplaceTempView("sensor")
sensor_specs_df.createOrReplaceTempView("sensor_specs")
# Combine sensor readings with sensor specifications
input_df = spark.sql(f"""
SELECT
sensor.*,
COALESCE(min_threshold, 100) AS effective_threshold
FROM sensor
LEFT JOIN sensor_specs
ON sensor.sensor_id = sensor_specs.sensor_id
""")
# Define and apply row-level check
checks = yaml.safe_load("""
- criticality: error
name: sensor_reading_exceeded
check:
function: sql_expression
arguments:
expression: MAX(reading_value) OVER (PARTITION BY sensor_id) > 100
msg: one of the sensor reading is greater than 100
negate: true
""")
dq_engine = DQEngine(WorkspaceClient())
valid_and_quarantine_df = dq_engine.apply_checks_by_metadata(input_df, checks)
display(valid_and_quarantine_df)
Using Dataset-level checks
If you need to validate values across multiple DataFrames/Tables as part of the check without combining them beforehand, you can define custom dataset-level checks.
See Creating Custom Dataset-level Checks for details on how to create custom dataset-level checks.
Applying filters on checks
You can apply checks to a part of the DataFrame by using a filter
.
For example, to check that a column a
is not null only when a column b
is positive, you can define the check as follows:
Using YAML:
# row-level check
- criticality: error
filter: col2 > 0
check:
function: is_not_null
arguments:
column: col1
# dataset-level check
- criticality: error
filter: col2 > 0
check:
function: is_unique
arguments:
columns:
- col1
Using DQX classes:
- Python
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.check_funcs import is_not_null, is_unique
checks = [
DQRowRule(criticality="error", check_func=is_not_null, column="col1", filter="col2 > 0"),
DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1"], filter="col2 > 0")
]
When using dataset-level checks, the filter condition is applied before aggregation, ensuring that the check operates only on the relevant subset of rows rather than on the aggregated results.
Detecting Personally Identifiable Information (PII) using a Python function
You can write custom check to detect Personally Identifiable Information (PII) in the input data. To use PII-detection libraries (e.g. Presidio),
install any required dependencies, define a Spark UDF to detect PII, and use make_condition
to call the UDF as a DQX check. See DQX for PII Detection Notebook
for a full example.
- Python
from pyspark.sql.functions import concat_ws, col, lit, pandas_udf
from pyspark.sql import Column
from databricks.labs.dqx.row_checks import make_condition
from presidio_analyzer import AnalyzerEngine
# Create the Presidio analyzer:
analyzer = AnalyzerEngine()
# Create a wrapper function to generate the entity mapping results:
def get_entity_mapping(data: str) -> str | None:
# Run the Presidio analyzer to detect PII in the string:
results = analyzer.analyze(
text=data,
entities=["PERSON", "EMAIL_ADDRESS"],
language='en',
score_threshold=0.5,
)
# Validate and return the results:
results = [
result.to_dict()
for result in results.entity_mapping()
if result.score >= 0.5
]
if results != []:
return json.dumps(results)
return None
# Register a pandas UDF to run the analyzer:
@pandas_udf('string')
def contains_pii(batch: pd.Series) -> pd.Series:
# Apply `get_entity_mapping` to each value:
return batch.map(get_entity_mapping)
def does_not_contain_pii(column: str) -> Column:
# Define a PII detection expression calling the pandas UDF:
pii_info = contains_pii(col(column))
# Return the DQX condition that uses the PII detection expression:
return make_condition(
pii_info.isNotNull(),
concat_ws(
' ',
lit(column),
lit('contains pii with the following info:'),
pii_info
),
f'{column}_contains_pii'
)