Reference
Quality rules
This page provides a reference for the quality rule functions (checks) available in DQX.
Quality rule functions (checks)
The following quality rules / functions are currently available:
Check | Description | Arguments |
---|---|---|
is_not_null | Check if input column is not null | col_name: column name to check |
is_not_empty | Check if input column is not empty | col_name: column name to check |
is_not_null_and_not_empty | Check if input column is not null or empty | col_name: column name to check; trim_strings: boolean flag to trim spaces from strings |
value_is_in_list | Check if the provided value is present in the input column. | col_name: column name to check; allowed: list of allowed values |
value_is_not_null_and_is_in_list | Check if provided value is present if the input column is not null | col_name: column name to check; allowed: list of allowed values |
is_not_null_and_not_empty_array | Check if input array column is not null or empty | col_name: column name to check |
is_in_range | Check if input column is in the provided range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit; max_limit: max limit |
is_not_in_range | Check if input column is not within defined range (inclusive of both boundaries) | col_name: column name to check; min_limit: min limit value; max_limit: max limit value |
not_less_than | Check if input column is not less than the provided limit | col_name: column name to check; limit: limit value |
not_greater_than | Check if input column is not greater than the provided limit | col_name: column name to check; limit: limit value |
is_valid_date | Check if input column is a valid date | col_name: column name to check; date_format: date format (e.g. 'yyyy-mm-dd') |
is_valid_timestamp | Check if input column is a valid timestamp | col_name: column name to check; timestamp_format: timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') |
not_in_future | Check if input column defined as date is not in the future (future defined as current_timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
not_in_near_future | Check if input column defined as date is not in the near future (near future defined as grater than current timestamp but less than current timestamp + offset) | col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used |
is_older_than_n_days | Check if input column is older than n number of days | col_name: column name to check; days: number of days; curr_date: current date, if not provided current_date() function is used |
is_older_than_col2_for_n_days | Check if one column is not older than another column by n number of days | col_name1: first column name to check; col_name2: second column name to check; days: number of days |
regex_match | Check if input column matches a given regex | col_name: column name to check; regex: regex to check; negate: if the condition should be negated (true) or not |
sql_expression | Check if input column is matches the provided sql expression, eg. a = 'str1', a > b | expression: sql expression to check; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negated |
You can check implementation details of the rules here.
Apply filters on checks
You can apply checks to a part of the DataFrame by using a filter
.
For example, to ensure that a column a
is not null only when a column b
is positive, you can define the check as follows:
- criticality: error
filter: b > 0
check:
function: is_not_null
arguments:
col_name: a
Creating your own checks
Use sql expression
If a check that you need does not exist in DQX, you can define them using sql expression rule (sql_expression
),
for example:
- criticality: error
check:
function: sql_expression
arguments:
expression: col1 LIKE '%foo'
msg: col1 ends with 'foo'
Sql expression is also useful if you want to make cross-column validation, for example:
- criticality: error
check:
function: sql_expression
arguments:
expression: a > b
msg: a is greater than b
Define custom check functions
If you need a reusable check or need to implement a more complicated logic
you can define your own check functions. A check is a function available from 'globals' that returns pyspark.sql.Column
, for example:
import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.col_functions import make_condition
def ends_with_foo(col_name: str) -> Column:
column = F.col(col_name)
return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")
and use the function as a check:
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *
checks = yaml.safe_load("""
- criticality: error
check:
function: ends_with_foo
arguments:
col_name: col1
""")
dq_engine = DQEngine(WorkspaceClient())
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, globals())
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())
You can see all existing DQX checks here.
Feel free to submit a PR to DQX with your own check so that other can benefit from it (see contribution guide).
DQ engine methods
Performing data quality checks using DQX requires creating DQEngine object.
The following table outlines the available methods and their functionalities:
Check | Description | Arguments |
---|---|---|
apply_checks | Applies quality checks to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check; checks: List of checks to the DataFrame. Each check is an instance of DQRule class. |
apply_checks_and_split | Applies quality checks to the DataFrame and returns valid and invalid (quarantine) DataFrames with reporting columns. | df: DataFrame to check; checks: List of checks to apply to the DataFrame. Each check is an instance of DQRule class. |
apply_checks_by_metadata | Applies quality checks defined as a dictionary to the DataFrame and returns a DataFrame with reporting columns. | df: DataFrame to check. checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module). |
apply_checks_by_metadata_and_split | Applies quality checks defined as a dictionary and returns valid and invalid (quarantine) DataFrames. | df: DataFrame to check; checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module). |
validate_checks | Validates the provided quality checks to ensure they conform to the expected structure and types. | checks: List of checks to validate; glbs: Optional dictionary of global functions that can be used. |
get_invalid | Retrieves records from the DataFrame that violate data quality checks (records with warnings and errors). | df: Input DataFrame. |
get_valid | Retrieves records from the DataFrame that pass all data quality checks. | df: Input DataFrame. |
load_checks_from_local_file | Loads quality rules from a local file (supports YAML and JSON). | path: Path to a file containing the checks. |
save_checks_in_local_file | Saves quality rules to a local file in YAML format. | checks: List of checks to save; path: Path to a file containing the checks. |
load_checks_from_workspace_file | Loads checks from a file (JSON or YAML) stored in the Databricks workspace. | workspace_path: Path to the file in the workspace. |
load_checks_from_installation | Loads checks from the workspace installation configuration file (checks_file field). | run_config_name: Name of the run config to use; product_name: Name of the product/installation directory; assume_user: If True, assume user installation. |
save_checks_in_workspace_file | Saves checks to a file (YAML) in the Databricks workspace. | checks: List of checks to save; workspace_path: Destination path for the checks file in the workspace. |
save_checks_in_installation | Saves checks to the installation folder as a YAML file. | checks: List of checks to save; run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
load_run_config | Loads run configuration from the installation folder. | run_config_name: Name of the run config to use; assume_user: If True, assume user installation. |
Testing Applications Using DQX
Standard testing with DQEngine
Testing applications that use DQEngine requires proper initialization of the Databricks workspace client. Detailed guidance on authentication for the workspace client is available here.
For testing, we recommend:
- pytester fixtures to setup Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace you need to use debug_env_name fixture. We recommend using the
~/.databricks/debug-env.json
file to store different sets of environment variables (see more details below). - chispa for asserting Spark DataFrames.
These libraries are also used internally for testing DQX.
Example test:
from chispa.dataframe_comparer import assert_df_equality
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRule
@pytest.fixture
def debug_env_name():
return "ws" # Specify the name of the target environment from ~/.databricks/debug-env.json
def test_dq(ws, spark): # use ws and spark pytester fixtures to initialize workspace client and spark session
schema = "a: int, b: int, c: int"
expected_schema = schema + ", _errors: map<string,string>, _warnings: map<string,string>"
test_df = spark.createDataFrame([[1, 3, 3]], schema)
checks = [
DQRule(name="col_a_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
]
dq_engine = DQEngine(ws)
df = dq_engine.apply_checks(test_df, checks)
expected_df = spark.createDataFrame([[1, 3, 3, None, None]], expected_schema)
assert_df_equality(df, expected_df)
Setting up Databricks workspace client authentication in a terminal
If you want to run the tests from your local machine in the terminal, you need to set up the following environment variables:
export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>
# Authenticate to Databricks using OAuth generated for a service principal (recommended)
export DATABRICKS_CLIENT_ID=<oauth-client-id>
export DATABRICKS_CLIENT_SECRET=<oauth-client-secret>
# Optionally enable serverless compute to be used for the tests
export DATABRICKS_SERVERLESS_COMPUTE_ID=auto
We recommend using OAuth access token generated for a service principal to authenticate with Databricks as presented above.
Alternatively, you can authenticate using PAT token by setting the DATABRICKS_TOKEN
environment variable. However, we do not recommend this method, as it is less secure than OAuth.
Setting up Databricks workspace client authentication in an IDE
If you want to run the tests from your IDE, you must setup .env
or ~/.databricks/debug-env.json
file
(see instructions).
The name of the debug environment that you must define is ws
(see debug_env_name
fixture in the example above).
Minimal Configuration
Create the ~/.databricks/debug-env.json
with the following content, replacing the placeholders:
{
"ws": {
"DATABRICKS_CLIENT_ID": "<oauth-client-id>",
"DATABRICKS_CLIENT_SECRET": "<oauth-client-secret>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>"
}
}
You must provide an existing cluster which will be auto-started for you as part of the tests.
We recommend using OAuth access token generated for a service principal to authenticate with Databricks as presented above.
Alternatively, you can authenticate using PAT token by providing the DATABRICKS_TOKEN
field. However, we do not recommend this method, as it is less secure than OAuth.
Running Tests on Serverless Compute
To run the integration tests on serverless compute, add the DATABRICKS_SERVERLESS_COMPUTE_ID
field to your debug configuration:
{
"ws": {
"DATABRICKS_CLIENT_ID": "<oauth-client-id>",
"DATABRICKS_CLIENT_SECRET": "<oauth-client-secret>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>",
"DATABRICKS_SERVERLESS_COMPUTE_ID": "auto"
}
}
When DATABRICKS_SERVERLESS_COMPUTE_ID
is set the DATABRICKS_CLUSTER_ID
is ignored, and tests run on serverless compute.
Local testing with DQEngine
If workspace-level access is unavailable in your testing environment, you can perform local testing by installing the latest pyspark
package and mocking the workspace client.
Note: This approach should be treated as experimental! It does not offer the same level of testing as the standard approach and it is only applicable to selected methods. We strongly recommend following the standard testing procedure outlined above, which includes proper initialization of the workspace client.
Example test:
from unittest.mock import MagicMock
from databricks.sdk import WorkspaceClient
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRule
def test_dq():
spark = SparkSession.builder.master("local[*]").getOrCreate() # create spark local session
ws = MagicMock(spec=WorkspaceClient, **{"catalogs.list.return_value": []}) # mock the workspace client
schema = "a: int, b: int, c: int"
expected_schema = schema + ", _errors: map<string,string>, _warnings: map<string,string>"
test_df = spark.createDataFrame([[1, None, 3]], schema)
checks = [
DQRule(name="col_a_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
]
dq_engine = DQEngine(ws)
df = dq_engine.apply_checks(test_df, checks)
expected_df = spark.createDataFrame(
[[1, None, 3, {"col_b_is_null_or_empty": "Column b is null or empty"}, None]], expected_schema
)
assert_df_equality(df, expected_df)