User guide
Data Profiling and Quality Rules Generation
Data profiling can be run to profile the input data and generate quality rule candidates with summary statistics. The generated rules/checks are input for the quality checking (see Adding quality checks to the application). In addition, the DLT generator can be used to generated native Delta Live Tables (DLT) expectations.
Data profiling is typically performed as a one-time action for a table to discover the initial set of quality rule candidates. This is not intended to be a continuously repeated or scheduled process, thereby minimizing concerns regarding compute intensity and associated costs.
In Python
Profiling and generating DQX rules/checks:
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
input_df = spark.read.table("catalog1.schema1.table1")
# profile input data
ws = WorkspaceClient()
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles) # with default level "error"
dq_engine = DQEngine(ws)
# save checks in arbitrary workspace location
dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")
# save checks in the installation folder specified in the default run config (only works if DQX is installed in the workspace)
dq_engine.save_checks_in_installation(checks, run_config_name="default")
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)
Using CLI
You can optionally install DQX in the workspace, see the Installation Guide. As part of the installation, a config, dashboards and profiler workflow is installed. The workflow can be run manually in the workspace UI or using the CLI as below.
DQX operates at the moment exclusively at the pySpark dataframe level and does not interact directly with databases or storage systems. DQX does not persist data after performing quality checks, meaning users must handle data storage themselves. Since DQX does not manage the input location, output table, or quarantine table, it is the user's responsibility to store or persist the processed data as needed.
Open the config to check available run configs and adjust the settings if needed:
databricks labs dqx open-remote-config
See example config below:
log_level: INFO
version: 1
run_configs:
- name: default # <- unique name of the run config (default used during installation)
input_location: s3://iot-ingest/raw # <- Input location for profiling (UC table or cloud path)
input_format: delta # <- format, required if cloud path provided
output_table: main.iot.silver # <- output UC table used in quality dashboard
quarantine_table: main.iot.quarantine # <- quarantine UC table used in quality dashboard
checks_file: iot_checks.yml # <- location of the quality rules (checks)
profile_summary_stats_file: iot_profile_summary_stats.yml # <- location of profiling summary stats
warehouse_id: your-warehouse-id # <- warehouse id for refreshing dashboards
- name: another_run_config # <- unique name of the run config
...
Run profiler workflow:
databricks labs dqx profile --run-config "default"
You will find the generated quality rule candidates and summary statistics in the installation folder as defined in the run config. If run config is not provided, the "default" run config will be used. The run config is used to select specific run configuration from the 'config.yml'.
The following DQX configuration from 'config.yml' are used by the profiler workflow:
- 'input_location': input data as a path or a table.
- 'input_format': input data format. Required if input data is a path.
- 'checks_file': relative location of the generated quality rule candidates (default:
checks.yml
) inside installation folder. - 'profile_summary_stats_file': relative location of the summary statistics (default:
profile_summary.yml
) inside installation folder.
Logs are be printed in the console and saved in the installation folder. You can display the logs from the latest profiler workflow run by executing:
databricks labs dqx logs --workflow profiler
Validating quality rules (checks)
If you manually adjust the generated rules or create your own checks, you can validate them before applying.
In Python
from databricks.labs.dqx.engine import DQEngine
status = DQEngine.validate_checks(checks)
print(status)
Note that checks are validated automatically when applied as part of the
apply_checks_by_metadata_and_split
and apply_checks_by_metadata
methods
(see Quality rules defined as config).
Using CLI
Validate checks stored in the installation folder:
databricks labs dqx validate-checks --run-config "default"
The following DQX configuration from 'config.yml' will be used by default:
- 'checks_file': relative location of the quality rules (default:
checks.yml
) inside installation folder.
Adding quality checks to the application
Quality rules defined as config
Quality rules can be stored in yaml
or json
file. Below an example yaml
file defining checks ('checks.yml'):
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
- name: col_col3_is_null_or_empty
criticality: error
check:
function: is_not_null_and_not_empty
arguments:
col_name: col3
- criticality: warn
check:
function: value_is_in_list
arguments:
col_name: col4
allowed:
- 1
- 2
Fields:
criticality
: either "error" (data going only into "bad/quarantine" dataframe) or "warn" (data going into both dataframes).check
: column expression containing "function" (check function to apply), "arguments" (check function arguments), and "col_name" (column name asstr
the check will be applied for) or "col_names" (column names asarray
the check will be applied for).- (optional)
name
for the check: autogenerated if not provided.
Loading and execution methods
Checks can be loaded from a file in the installation folder, workspace, or local file system. If the checks file contains invalid json or yaml syntax, the engine will raise an error.
The checks can be applied using apply_checks_by_metadata_and_split
or apply_checks_by_metadata
methods. The checks are validated automatically as part of these methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use apply_checks_by_metadata_and_split
.
If you want to report issues as additional columns, use apply_checks_by_metadata
.
Method 1: Loading checks from a workspace file in the installation folder
If DQX is installed in the workspace, you can load checks based on the run configuration:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
# load check file specified in the run configuration
checks = dq_engine.load_checks_from_installation(assume_user=True, run_config_name="default")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Method 2: Loading checks from a workspace file
The checks can also be loaded from any file in the Databricks workspace:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks_from_workspace_file(workspace_path="/Shared/App1/checks.yml")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Checks are validated automatically as part of the apply_checks_by_metadata_and_split
and apply_checks_by_metadata
methods.
Method 3: Loading checks from a local file
Checks can also be loaded from a file in the local file system:
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = DQEngine.load_checks_from_local_file("checks.yml")
dq_engine = DQEngine(WorkspaceClient())
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
Quality rules defined as code
Check can be defined in the code and applied using apply_checks_and_split
or apply_checks
methods.
If you want to split the checked data into valid and invalid (quarantined) dataframes, use apply_checks_and_split
.
If you want to report issues as additional columns, use apply_checks
.
Method 1: Using DQX classes
from databricks.labs.dqx.col_functions import is_not_null, is_not_null_and_not_empty, value_is_in_list
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.rule import DQRuleColSet, DQRule
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = DQRuleColSet( # define rule for multiple columns at once
columns=["col1", "col2"],
criticality="error",
check_func=is_not_null).get_rules() + [
DQRule( # define rule for a single column
name="col3_is_null_or_empty",
criticality="error",
check=is_not_null_and_not_empty("col3")),
DQRule( # define rule with a filter
name="col_4_is_null_or_empty",
criticality="error",
filter="col1 < 3",
check=is_not_null_and_not_empty("col4")),
DQRule( # name auto-generated if not provided
criticality="warn",
check=value_is_in_list("col4", ["1", "2"]))
]
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks(input_df, checks)
See details of the check functions here.
Method 2: Using yaml config
import yaml
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = yaml.safe_load("""
- criticality: error
check:
function: is_not_null
arguments:
col_names:
- col1
- col2
- criticality: error
check:
function: is_not_null_and_not_empty
arguments:
col_name: col3
- criticality: error
filter: col1 < 3
check:
function: is_not_null_and_not_empty
arguments:
col_name: col4
- criticality: warn
check:
function: value_is_in_list
arguments:
col_name: col4
allowed:
- 1
- 2
""")
input_df = spark.read.table("catalog1.schema1.table1")
# Option 1: apply quality rules on the dataframe and provide valid and invalid (quarantined) dataframes
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(input_df, checks)
# Option 2: apply quality rules on the dataframe and report issues as additional columns (`_warning` and `_error`)
valid_and_quarantined_df = dq_engine.apply_checks_by_metadata(input_df, checks)
See details of the check functions here.
Integration with DLT (Delta Live Tables)
DLT provides expectations to enforce data quality constraints. However, expectations don't offer detailed insights into why certain checks fail. The example below demonstrates how to integrate DQX with DLT to provide comprehensive quality information. The DQX integration with DLT does not use DLT Expectations but DQX own methods.
Option 1: Apply quality rules and quarantine bad records
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = ... # quality rules / checks
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
# get rows without errors or warnings, and drop auxiliary columns
return dq_engine.get_valid(df)
@dlt.table
def quarantine():
df = dlt.read_stream("bronze_dq_check")
# get only rows with errors or warnings
return dq_engine.get_invalid(df)
Option 2: Apply quality rules and report issues as additional columns
import dlt
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
checks = ... # quality rules / checks
dq_engine = DQEngine(WorkspaceClient())
@dlt.view
def bronze_dq_check():
df = dlt.read_stream("bronze")
return dq_engine.apply_checks_by_metadata(df, checks)
@dlt.table
def silver():
df = dlt.read_stream("bronze_dq_check")
return df
Data Quality Dashboards
Data quality dashboards are automatically installed in dashboards
folder in the workspace installation directory when you install DQX in a Databricks workspace. For more details on the installation process, see the Installation Guide.
The dashboards let you monitor and track data quality issues easily. You can customize them to align with your specific requirements.
By default, dashboards are not scheduled to refresh automatically, thereby minimizing concerns regarding associated cluster costs. When you open a dashboard, you need to refresh it manually to view the latest data. However, you can configure the dashboard to refresh periodically as needed.
To navigate to the dashboards directory in the workspace UI, use the following command:
databricks labs dqx open-dashboards
After executing the command:
- Locate and click on a dashboard file in the workspace UI.
- Open the dashboard, and click
Refresh
to load the latest data.
Note: the dashboards are only using the quarantined data as input as defined during the installation process.
If you change the quarantine table in the run config after the deployment (quarantine_table
field), you need to update the dashboard queries accordingly.
Quality Rules and Creation of Custom Checks
Discover the full list of available data quality rules and learn how to define your own custom checks in our Reference section.
Details on DQX Engine and Workspace Client
To perform data quality checking with DQX, you need to create DQEngine
object.
The engine requires a Databricks workspace client for authentication and interaction with the Databricks workspace.
When running the code on a Databricks workspace, the workspace client is automatically authenticated, whether DQX is used in a notebook, script, or as part of a job/workflow. You only need the following code to create the workspace client if you run DQX on Databricks workspace:
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import DQEngine
ws = WorkspaceClient()
dq_engine = DQEngine(ws)
For external environments, such as CI servers or local machines, you can authenticate to Databricks using any method supported by the Databricks SDK. For detailed instructions, refer to the default authentication flow. If you're using Databricks configuration profiles or Databricks-specific environment variables for authentication, you can easily create the workspace client without needing to provide additional arguments:
ws = WorkspaceClient()
For details on the specific methods available in the engine, visit to the reference section.
Information on testing applications that use DQEngine
can be found here.
Additional Configuration
Customizing Reporting Error and Warning Columns
By default, DQX appends _error
and _warning
reporting columns to the output DataFrame to flag quality issues.
You can customize the names of these reporting columns by specifying additional configurations in the engine.
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.engine import (
DQEngine,
ExtraParams,
)
# customize reporting column names
extra_parameters = ExtraParams(column_names={"errors": "dq_errors", "warnings": "dq_warnings"})
ws = WorkspaceClient()
dq_engine = DQEngine(ws, extra_params=extra_parameters)