Skip to main content

Reference

Quality rules

This page provides a reference for the quality rule functions (checks) available in DQX.

Quality rule functions (checks)

The following quality rules / functions are currently available:

CheckDescriptionArguments
is_not_nullCheck if input column is not nullcol_name: column name to check
is_not_emptyCheck if input column is not emptycol_name: column name to check
is_not_null_and_not_emptyCheck if input column is not null or emptycol_name: column name to check; trim_strings: boolean flag to trim spaces from strings
value_is_in_listCheck if the provided value is present in the input column.col_name: column name to check; allowed: list of allowed values
value_is_not_null_and_is_in_listCheck if provided value is present if the input column is not nullcol_name: column name to check; allowed: list of allowed values
is_not_null_and_not_empty_arrayCheck if input array column is not null or emptycol_name: column name to check
is_in_rangeCheck if input column is in the provided range (inclusive of both boundaries)col_name: column name to check; min_limit: min limit; max_limit: max limit
is_not_in_rangeCheck if input column is not within defined range (inclusive of both boundaries)col_name: column name to check; min_limit: min limit value; max_limit: max limit value
not_less_thanCheck if input column is not less than the provided limitcol_name: column name to check; limit: limit value
not_greater_thanCheck if input column is not greater than the provided limitcol_name: column name to check; limit: limit value
is_valid_dateCheck if input column is a valid datecol_name: column name to check; date_format: date format (e.g. 'yyyy-mm-dd')
is_valid_timestampCheck if input column is a valid timestampcol_name: column name to check; timestamp_format: timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss')
not_in_futureCheck if input column defined as date is not in the future (future defined as current_timestamp + offset)col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used
not_in_near_futureCheck if input column defined as date is not in the near future (near future defined as grater than current timestamp but less than current timestamp + offset)col_name: column name to check; offset: offset to use; curr_timestamp: current timestamp, if not provided current_timestamp() function is used
is_older_than_n_daysCheck if input column is older than n number of dayscol_name: column name to check; days: number of days; curr_date: current date, if not provided current_date() function is used
is_older_than_col2_for_n_daysCheck if one column is not older than another column by n number of dayscol_name1: first column name to check; col_name2: second column name to check; days: number of days
regex_matchCheck if input column matches a given regexcol_name: column name to check; regex: regex to check; negate: if the condition should be negated (true) or not
sql_expressionCheck if input column is matches the provided sql expression, eg. a = 'str1', a > bexpression: sql expression to check; msg: optional message to output; name: optional name of the resulting column; negate: if the condition should be negated

You can check implementation details of the rules here.

Apply filters on checks

You can apply checks to a part of the DataFrame by using a filter. For example, to ensure that a column a is not null only when a column b is positive, you can define the check as follows:

- criticality: error
filter: b > 0
check:
function: is_not_null
arguments:
col_name: a

Creating your own checks

Use sql expression

If a check that you need does not exist in DQX, you can define them using sql expression rule (sql_expression), for example:

- criticality: error
check:
function: sql_expression
arguments:
expression: col1 LIKE '%foo'
msg: col1 ends with 'foo'

Sql expression is also useful if you want to make cross-column validation, for example:

- criticality: error
check:
function: sql_expression
arguments:
expression: a > b
msg: a is greater than b

Define custom check functions

If you need a reusable check or need to implement a more complicated logic you can define your own check functions. A check is a function available from 'globals' that returns pyspark.sql.Column, for example:

import pyspark.sql.functions as F
from pyspark.sql import Column
from databricks.labs.dqx.col_functions import make_condition

def ends_with_foo(col_name: str) -> Column:
column = F.col(col_name)
return make_condition(column.endswith("foo"), f"Column {col_name} ends with foo", f"{col_name}_ends_with_foo")

and use the function as a check:

import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *

checks = yaml.safe_load("""
- criticality: error
check:
function: ends_with_foo
arguments:
col_name: col1
""")

dq_engine = DQEngine(WorkspaceClient())

# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks, globals())

# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks, globals())

You can see all existing DQX checks here.

Feel free to submit a PR to DQX with your own check so that other can benefit from it (see contribution guide).

DQ engine methods

Performing data quality checks using DQX requires creating DQEngine object.

The following table outlines the available methods and their functionalities:

CheckDescriptionArguments
apply_checksApplies quality checks to the DataFrame and returns a DataFrame with reporting columns.df: DataFrame to check; checks: List of checks to the DataFrame. Each check is an instance of DQRule class.
apply_checks_and_splitApplies quality checks to the DataFrame and returns valid and invalid (quarantine) DataFrames with reporting columns.df: DataFrame to check; checks: List of checks to apply to the DataFrame. Each check is an instance of DQRule class.
apply_checks_by_metadataApplies quality checks defined as a dictionary to the DataFrame and returns a DataFrame with reporting columns.df: DataFrame to check. checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module).
apply_checks_by_metadata_and_splitApplies quality checks defined as a dictionary and returns valid and invalid (quarantine) DataFrames.df: DataFrame to check; checks: List of dictionaries describing checks. glbs: Optional dictionary with functions mapping (e.g., globals() of the calling module).
validate_checksValidates the provided quality checks to ensure they conform to the expected structure and types.checks: List of checks to validate; glbs: Optional dictionary of global functions that can be used.
get_invalidRetrieves records from the DataFrame that violate data quality checks (records with warnings and errors).df: Input DataFrame.
get_validRetrieves records from the DataFrame that pass all data quality checks.df: Input DataFrame.
load_checks_from_local_fileLoads quality rules from a local file (supports YAML and JSON).path: Path to a file containing the checks.
save_checks_in_local_fileSaves quality rules to a local file in YAML format.checks: List of checks to save; path: Path to a file containing the checks.
load_checks_from_workspace_fileLoads checks from a file (JSON or YAML) stored in the Databricks workspace.workspace_path: Path to the file in the workspace.
load_checks_from_installationLoads checks from the workspace installation configuration file (checks_file field).run_config_name: Name of the run config to use; product_name: Name of the product/installation directory; assume_user: If True, assume user installation.
save_checks_in_workspace_fileSaves checks to a file (YAML) in the Databricks workspace.checks: List of checks to save; workspace_path: Destination path for the checks file in the workspace.
save_checks_in_installationSaves checks to the installation folder as a YAML file.checks: List of checks to save; run_config_name: Name of the run config to use; assume_user: If True, assume user installation.
load_run_configLoads run configuration from the installation folder.run_config_name: Name of the run config to use; assume_user: If True, assume user installation.

Testing Applications Using DQX

Standard testing with DQEngine

Testing applications that use DQEngine requires proper initialization of the Databricks workspace client. Detailed guidance on authentication for the workspace client is available here.

For testing, we recommend:

  • pytester fixtures to setup Databricks remote Spark session and workspace client. For pytester to be able to authenticate to a workspace you need to use debug_env_name fixture. We recommend using the ~/.databricks/debug-env.json file to store different sets of environment variables (see more details below).
  • chispa for asserting Spark DataFrames.

These libraries are also used internally for testing DQX.

Example test:

from chispa.dataframe_comparer import assert_df_equality
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRule


@pytest.fixture
def debug_env_name():
return "ws" # Specify the name of the target environment from ~/.databricks/debug-env.json


def test_dq(ws, spark): # use ws and spark pytester fixtures to initialize workspace client and spark session
schema = "a: int, b: int, c: int"
expected_schema = schema + ", _errors: map<string,string>, _warnings: map<string,string>"
test_df = spark.createDataFrame([[1, 3, 3]], schema)

checks = [
DQRule(name="col_a_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
]

dq_engine = DQEngine(ws)
df = dq_engine.apply_checks(test_df, checks)

expected_df = spark.createDataFrame([[1, 3, 3, None, None]], expected_schema)
assert_df_equality(df, expected_df)

Setting up Databricks workspace client authentication in a terminal

If you want to run the tests from your local machine in the terminal, you need to set up the following environment variables:

export DATABRICKS_HOST=https://<workspace-url>
export DATABRICKS_CLUSTER_ID=<cluster-id>

# Authenticate to Databricks using OAuth generated for a service principal (recommended)
export DATABRICKS_CLIENT_ID=<oauth-client-id>
export DATABRICKS_CLIENT_SECRET=<oauth-client-secret>

# Optionally enable serverless compute to be used for the tests
export DATABRICKS_SERVERLESS_COMPUTE_ID=auto

We recommend using OAuth access token generated for a service principal to authenticate with Databricks as presented above. Alternatively, you can authenticate using PAT token by setting the DATABRICKS_TOKEN environment variable. However, we do not recommend this method, as it is less secure than OAuth.

Setting up Databricks workspace client authentication in an IDE

If you want to run the tests from your IDE, you must setup .env or ~/.databricks/debug-env.json file (see instructions). The name of the debug environment that you must define is ws (see debug_env_name fixture in the example above).

Minimal Configuration

Create the ~/.databricks/debug-env.json with the following content, replacing the placeholders:

{
"ws": {
"DATABRICKS_CLIENT_ID": "<oauth-client-id>",
"DATABRICKS_CLIENT_SECRET": "<oauth-client-secret>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>"
}
}

You must provide an existing cluster which will be auto-started for you as part of the tests.

We recommend using OAuth access token generated for a service principal to authenticate with Databricks as presented above. Alternatively, you can authenticate using PAT token by providing the DATABRICKS_TOKEN field. However, we do not recommend this method, as it is less secure than OAuth.

Running Tests on Serverless Compute

To run the integration tests on serverless compute, add the DATABRICKS_SERVERLESS_COMPUTE_ID field to your debug configuration:

{
"ws": {
"DATABRICKS_CLIENT_ID": "<oauth-client-id>",
"DATABRICKS_CLIENT_SECRET": "<oauth-client-secret>",
"DATABRICKS_HOST": "https://<workspace-url>",
"DATABRICKS_CLUSTER_ID": "<databricks-cluster-id>",
"DATABRICKS_SERVERLESS_COMPUTE_ID": "auto"
}
}

When DATABRICKS_SERVERLESS_COMPUTE_ID is set the DATABRICKS_CLUSTER_ID is ignored, and tests run on serverless compute.

Local testing with DQEngine

If workspace-level access is unavailable in your testing environment, you can perform local testing by installing the latest pyspark package and mocking the workspace client.

Note: This approach should be treated as experimental! It does not offer the same level of testing as the standard approach and it is only applicable to selected methods. We strongly recommend following the standard testing procedure outlined above, which includes proper initialization of the workspace client.

Example test:

from unittest.mock import MagicMock
from databricks.sdk import WorkspaceClient
from pyspark.sql import SparkSession
from chispa.dataframe_comparer import assert_df_equality
from databricks.labs.dqx.col_functions import is_not_null_and_not_empty
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRule


def test_dq():
spark = SparkSession.builder.master("local[*]").getOrCreate() # create spark local session
ws = MagicMock(spec=WorkspaceClient, **{"catalogs.list.return_value": []}) # mock the workspace client

schema = "a: int, b: int, c: int"
expected_schema = schema + ", _errors: map<string,string>, _warnings: map<string,string>"
test_df = spark.createDataFrame([[1, None, 3]], schema)

checks = [
DQRule(name="col_a_is_null_or_empty", criticality="warn", check=is_not_null_and_not_empty("a")),
DQRule(name="col_b_is_null_or_empty", criticality="error", check=is_not_null_and_not_empty("b")),
]

dq_engine = DQEngine(ws)
df = dq_engine.apply_checks(test_df, checks)

expected_df = spark.createDataFrame(
[[1, None, 3, {"col_b_is_null_or_empty": "Column b is null or empty"}, None]], expected_schema
)
assert_df_equality(df, expected_df)