User guide
Data Profiling and Quality Rules Generation
Data profiling is run to profile the input data and generate quality rule candidates with summary statistics. The generated rules/checks are input for the quality checking (see Adding quality checks to the application). In addition, the DLT generator can be used to generated native Delta Live Tables (DLT) expectations.
In Python
Profiling and generating DQX rules/checks:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.sdk import WorkspaceClient
df = spark.read.table("catalog1.schema1.table1")
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(df)
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)
Using CLI
You must install DQX in the workspace before (see installation). As part of the installation, profiler workflow is installed. It can be run manually in the workspace UI or using the CLI as below.
Run profiler workflow:
databricks labs dqx profile --run-config "default"
You will find the generated quality rule candidates and summary statistics in the installation folder as defined in the run config. If run config is not provided, the "default" run config will be used. The run config is used to select specific run configuration from the 'config.yml'.
The following DQX configuration from 'config.yml' are used:
- 'input_location': input data as a path or a table.
- 'input_format': input data format. Required if input data is a path.
- 'checks_file': relative location of the generated quality rule candidates (default:
checks.yml
). - 'profile_summary_stats_file': relative location of the summary statistics (default:
profile_summary.yml
).
Logs are be printed in the console and saved in the installation folder. To show the saved logs from the latest profiler workflow run, visit the Databricks workspace UI or execute the following command:
databricks labs dqx logs --workflow profiler
Validating quality rules (checks)
If you manually adjust the generated rules or create your own configuration, you can validate them before using:
In Python
from databricks.labs.dqx.engine import DQEngine
status = DQEngine.validate_checks(checks)
print(status)
The checks validated automatically when applied as part of the
apply_checks_by_metadata_and_split
and apply_checks_by_metadata
methods
(see Quality rules defined as config).
Using CLI
Validate checks stored in the installation folder:
databricks labs dqx validate-checks --run-config "default"
The following DQX configuration from 'config.yml' will be used by default:
- 'checks_file': relative location of the quality rule (default:
checks.yml
).
Adding quality checks to the application
Quality rules defined as config
Quality rules can be stored in yaml
or json
file. Below an example yaml
file defining checks ('checks.yml'):
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
- name: col_col3_is_null_or_empty
criticality: error
check:
function: is_not_null_and_not_empty
arguments:
col_name: col3
- criticality: warn
check:
function: value_is_in_list
arguments:
col_name: col4
allowed:
- 1
- 2
Fields:
criticality
: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both dataframes).check
: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name as str to apply to check for) or "col_names" (column names as array to apply the check for).- (optional)
name
for the check: autogenerated if not provided.
Loading and execution methods
Method 1: load checks from a workspace file in the installation folder
If the tool is installed in the workspace, the config contains path to the checks file:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# use check file specified in the default run configuration in the global installation config ('config.yml')
# can optionally specify the run config and whether to use user installation
checks = dq_engine.load_checks_from_installation(assume_user=True)
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Check are validated automatically as part of the apply_checks_by_metadata_and_split
and apply_checks_by_metadata
methods.
Method 2: load checks from a workspace file
The checks can also be loaded from any file in the Databricks workspace:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks_from_workspace_file("/Shared/App1/checks.yml")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Method 3: load checks from a local file
The checks can also be loaded from a file in the local file system:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = DQEngine.load_checks_from_local_file("checks.yml")
dq_engine = DQEngine(WorkspaceClient())
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Quality rules defined as code
Method 1: using DQX classes
from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine, DQRuleColSet, DQRule
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = DQRuleColSet( # define rule for multiple columns at once
columns=["col1", "col2"],
criticality="error",
check_func=is_not_null).get_rules() + [
DQRule( # define rule for a single column
name='col3_is_null_or_empty',
criticality='error',
check=is_not_null_and_not_empty('col3')),
DQRule( # name auto-generated if not provided
criticality='warn',
check=value_is_in_list('col4', ['1', '2']))
]
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
See details of the check functions here.
Method 2: using yaml config
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = yaml.safe_load("""
- criticality: "error"
check:
function: "is_not_null"
arguments:
col_names:
- "col1"
- "col2"
- criticality: "error"
check:
function: "is_not_null_and_not_empty"
arguments:
col_name: "col3"
- criticality: "warn"
check:
function: "value_is_in_list"
arguments:
col_name: "col4"
allowed:
- 1
- 2
""")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
See details of the check functions here.
Integration with DLT (Delta Live Tables)
DLT provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. The example below demonstrates how to integrate DQX checks with DLT to provide comprehensive information on why quality checks failed. The integration does not use expectations but the DQX checks directly.
Option 1: apply quality rules and quarantine bad records
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = ... # quality rules / checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)
Option 2: apply quality rules as additional columns (_warning
and _error
)
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df
Explore Quality Rules and Create Custom Checks
Discover the full list of available data quality rules and learn how to define your own custom checks in our Reference section.