Summary Metrics
DQX provides comprehensive functionality to capture and store aggregate statistics about your data quality. This allows you to track data quality trends over time, monitor the health of your data pipelines, and gain insights into the overall quality of your datasets.
Overview
Summary metrics in DQX capture both built-in metrics (automatically calculated) and custom metrics (user-defined SQL expressions) during data quality checking. These metrics are collected using Spark's built-in Observation functionality and can be persisted to tables for historical analysis.
Built-in Metrics
DQX automatically captures the following built-in metrics for every data quality check execution:
| Metric Name | Data Type | Description |
|---|---|---|
input_row_count | int | Total number of input rows processed |
error_row_count | int | Number of rows that failed error-level checks |
warning_row_count | int | Number of rows that triggered warning-level checks |
valid_row_count | int | Number of rows that passed all checks |
Custom Metrics
Users can define custom metrics with Spark SQL expressions. These metrics will be collected in addition to DQX's built-in metrics.
Summary metrics are calculated on all records processed by DQX. Complex aggregations can degrade performance when processing large datasets. Be cautious with operations like DISTINCT on high-cardinality columns.
Example of custom data quality summary metrics:
sum(array_size(_errors)) as total_errors
avg(array_size(_errors)) as errors_avg
count(case when array_size(_errors) > 1) as count_multiple_errors
See the Configuring Custom Metrics section for instructions on setting them up.
Programmatic approach
Accessing Metrics when Applying Checks
Engine methods for applying checks (e.g. apply_checks, apply_checks_by_metadata, apply_checks_and_split, apply_checks_by_metadata_and_split) can optionally return a Spark Observation with one or more output DataFrames.
Data quality metrics can be accessed from the Spark Observation after any action is performed on the output DataFrames.
Metrics are not directly accessible from the returned Spark Observation when data is processed with streaming. Use DQX's built-in methods to persist streaming metrics to an output table. See Writing Metrics to a Table with Streaming for more details.
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.sdk import WorkspaceClient
# Create observer
observer = DQMetricsObserver(name="dq_metrics")
# Create the engine with the optional observer
engine = DQEngine(WorkspaceClient(), observer=observer)
# Apply checks and get metrics
checked_df, observation = engine.apply_checks_by_metadata(df, checks)
# Apply checks, split and get metrics
#valid_df, quarantine_df, observation = engine.apply_checks_by_metadata_and_split(df, checks)
# Trigger an action to populate metrics (e.g., count, save to a table).
# Without triggering an action, metrics will not be populated, and accessing them will result in a stall.
row_count = checked_df.count()
# Access metrics
metrics = observation.get
print(f"Input row count: {metrics['input_row_count']}")
print(f"Error row count: {metrics['error_row_count']}")
print(f"Warning row count: {metrics['warning_row_count']}")
print(f"Valid row count: {metrics['valid_row_count']}")
Writing Metrics to a Table
Engine methods for applying checks (e.g. apply_checks_and_save_in_table, apply_checks_by_metadata_and_save_in_table, save_results_in_table, apply_checks_and_save_in_tables, apply_checks_and_save_in_tables_for_patterns) can write summary metrics into a table automatically.
Metrics can be written to a table in batch or streaming. You can write metrics for different datasets or workloads into a common metrics table to track data quality over time centrally.
The name specified in the DQMetricsObserver is recorded as the run_name column in the metrics table. It is recommended to assign a unique name to the observer for each job or table to enable efficient filtering when centralizing metrics in a single table.
To write metrics directly to a table, users must use a classic compute cluster with Dedicated access mode. This is due to a Spark Connect issue, which will be resolved in future versions.
Writing Metrics to a Table in Batch
Summary metrics can be written to a table when calling DQEngine methods to apply checks and write output data. When the input data is read as a batch source, metrics will be collected and written in batch.
- Python
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient
# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]
# Create the observer
observer = DQMetricsObserver(name="dq_metrics")
# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)
# Create the input config for a batch data source
input_config = InputConfig("main.demo.input_data")
# Create the output, quarantine, and metrics configs
output_config = OutputConfig("main.demo.valid_data")
quarantine_config = OutputConfig("main.demo.quarantine_data") # optional
metrics_config = OutputConfig("main.demo.metrics_data") # optional
# Option 1: Apply checks and save metrics
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)
quarantine_df.count() # Trigger an action to populate metrics (e.g. count, save to a table), otherwise accessing them will result in a stall
engine.save_summary_metrics(
observed_metrics=observation.get,
metrics_config=metrics_config,
input_config=input_config, # used as info only
output_config=output_config, # used as info only
quarantine_config=quarantine_config, # used as info only
checks_location="checks.yml", # used as info only
)
# Option 2: Use End to End method: read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
engine.apply_checks_and_save_in_table(
checks=checks,
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)
Writing Metrics to a Table with Streaming
Summary metrics can also be written in streaming. When the input data is read as a streaming source, metrics will be written for each streaming micro-batch:
Metrics are not directly accessible from the returned Spark Observation when data is processed with streaming.
You must use streaming metrics listener or end-to-end methods that persist the output in tables after quality checks are applied (e.g. e.g. apply_checks_and_save_in_table, apply_checks_by_metadata_and_save_in_table, save_results_in_table, apply_checks_and_save_in_tables, apply_checks_and_save_in_tables_for_patterns).
- Python
import time
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient
# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]
# Create the observer
observer = DQMetricsObserver(name="dq_metrics")
# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)
# Create the input config for a streaming data source
input_config = InputConfig("main.demo.input_data", is_streaming=True)
# Create the output, quarantine, and metrics configs
output_config = OutputConfig(
location="main.demo.valid_data",
trigger={"availableNow": True}, # stop the stream once all data is processed
options={"checkpointLocation": "/tmp/checkpoint/valid_data"} # use a Volume in production for persistence
)
quarantine_config = OutputConfig(
location="main.demo.quarantine_data",
trigger={"availableNow": True}, # stop the stream once all data is processed
options={"checkpointLocation": "/tmp/checkpoint/quarantine_data"} # use a Volume in production for persistence
)
metrics_config = OutputConfig("main.demo.metrics_data") # streaming configuration not required for metrics
# Option 1: Apply checks and save metrics
df = spark.readStream.table(input_config.location)
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)
output_query = valid_df.writeStream.format(output_config.format).outputMode(output_config.mode).options(**output_config.options).trigger(**output_config.trigger).toTable(output_config.location)
quarantine_query = quarantine_df.writeStream.format(quarantine_config.format).outputMode(quarantine_config.mode).options(**quarantine_config.options).trigger(**quarantine_config.trigger).toTable(quarantine_config.location)
listener = engine.get_streaming_metrics_listener(
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config,
target_query_id=quarantine_query.id,
)
# for streaming writing metrics requires a stream listener, observation cannot be accessed directly
# this adds a global listener for the current Spark session so do not add it again if reusing the same session
spark.streams.addListener(listener)
output_query.awaitTermination()
quarantine_query.awaitTermination()
# Option 2: Use End-to-End method: read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
# Output and quarantine data will be written in streaming and summary metrics will be written for each micro-batch
engine.apply_checks_and_save_in_table(
checks=checks,
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)
Saving Results and Metrics to a Table
Summary metrics can also be written to a table when calling save_results_in_table. After applying checks, pass the Spark Observation and output DataFrame(s) with the appropriate output configuration.
This is supported for both batch and streaming.
- Python
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import OutputConfig
from databricks.sdk import WorkspaceClient
# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]
# Create the observer
observer = DQMetricsObserver(name="dq_metrics")
# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)
# Apply checks, split and get metrics
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)
# Apply checks and get metrics
#checked_df, observation = engine.apply_checks(df, checks)
# Create the output, quarantine, and metrics configs
output_config = OutputConfig("main.demo.valid_data")
quarantine_config = OutputConfig("main.demo.quarantine_data") # optional
metrics_config = OutputConfig("main.demo.metrics_data") # optional
# Write the data to valid and quarantine tables, and write metrics to the metrics table
engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
observation=observation,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)
Configuring Custom Metrics
Custom metrics are collected in addition to the built-in metrics.
Pass custom metrics as Spark SQL expressions when creating the DQMetricsObserver. Custom metrics should be defined as Spark SQL expressions with column aliases and will be accessible by their alias.
- Python
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.config import InputConfig, OutputConfig
# Define custom metrics
custom_metrics = [
"sum(array_size(_errors)) as total_check_errors",
"sum(array_size(_warnings)) as total_check_warnings",
]
# Create the observer with custom metrics
observer = DQMetricsObserver(
name="business_metrics",
custom_metrics=custom_metrics
)
# Create the engine with the optional observer
engine = DQEngine(WorkspaceClient(), observer=observer)
# Apply checks and get metrics
checked_df, observation = engine.apply_checks_by_metadata(df, checks)
# Trigger an action to populate metrics (e.g., count, save to a table).
# Without triggering an action, metrics will not be populated, and accessing them will result in a stall.
checked_df.count() # Example action to ensure metrics are computed
# Access metrics
metrics = observation.get
print(f"Input row count: {metrics['input_row_count']}")
print(f"Error row count: {metrics['error_row_count']}")
print(f"Warning row count: {metrics['warning_row_count']}")
print(f"Valid row count: {metrics['valid_row_count']}")
print(f"Total check errors: {metrics['total_check_errors']}")
print(f"Total check warnings: {metrics['total_check_warnings']}")
Workflows Integration
No-Code Approach (Workflows)
When using DQX workflows, summary metrics are automatically configured based on your installation configuration file:
- Installation Configuration: During installation, specify metrics table and custom metrics.
- Automatic Observer Creation: Workflows automatically create
DQMetricsObserverwhen metrics are configured. - Metrics Persistence: Metrics are automatically saved to the configured table after each workflow run. The
run_nameis set to 'dqx' by default.
Configuration File Example
Metrics can be defined in the metrics_config section of your configuration file.
run_configs:
- name: production
input_config:
location: main.raw.sales_data
format: delta
output_config:
location: main.clean.sales_data
format: delta
mode: append
quarantine_config:
location: main.quarantine.sales_data
format: delta
mode: append
metrics_config: # Summary metrics configuration
location: main.analytics.dq_metrics
format: delta
mode: append
checks_location: main.config.quality_checks
# Global custom metrics (applied to all run configs)
custom_metrics:
- "avg(amount) as average_transaction_amount"
- "sum(case when region = 'US' then amount else 0 end) as us_revenue"
- "count(distinct customer_id) as unique_customers"
Once the config is defined you can start the workflows as usual using Databricks CLI, Databricks UI, Databricks API, or via Workflows scheduling:
# Run quality checker workflow with metrics enabled using Databricks CLI
databricks labs dqx apply-checks --run-config "production"
# Run end-to-end workflow with metrics enabled using Databricks CLI
databricks labs dqx e2e --run-config "production"
Metrics Table Schema
Summary metrics can be written and centralized in a delta table. The metrics table contains the following fields:
| Column Name | Column Type | Description |
|---|---|---|
run_id | STRING | Unique run ID recorded in the summary metrics as well as detailed quality checking results to enable cross-referencing. When reusing the same DQEngine and observer instances, the run ID stays the same. Each apply checks execution does not generate a new run ID for the same instance. It is only changed when new engine and observer (if using one) is created. |
run_name | STRING | Name of the metrics observer: name passed to DQMetricsObserver, or 'dqx' when applying checks using Workflows. |
input_location | STRING | Location of the input dataset (table name or file path), if known. |
output_location | STRING | Location of the output dataset (table name or file path). |
quarantine_location | STRING | Location of the quarantine dataset (table name or file path), if used. |
checks_location | STRING | Location where checks are stored (table name or file path), if known. |
metric_name | STRING | Name of the metric (e.g., 'input_row_count'). |
metric_value | STRING | Value of the metric (stored as string). |
run_time | TIMESTAMP | Run timestamp when the summary metrics were calculated. |
error_column_name | STRING | Name of the error column in the output or quarantine table containing per row quality checking results (default: '_errors'). |
warning_column_name | STRING | Name of the warning column in the output or quarantine table containing per row quality checking results (default: '_warnings'). |
user_metadata | MAP[STRING, STRING] | User-defined, run-level metadata. |
Best Practices
Performance Considerations
- Batch Metrics Collection: Collect metrics during regular data processing after output is written.
- Monitor Metrics Overhead: Complex custom metrics may impact processing performance.
Monitoring and Alerting using metrics table
Use cases:
- Track Trends: Monitor metrics over time to identify data quality degradation.
- Set Thresholds: Establish acceptable ranges for error rates and warning counts.
- Alert on Anomalies: Set up alerts when metrics deviate significantly from historical patterns, e.g. by using Databricks SQL Alerts.
The example below shows how you can analyze metrics persisted to a table:
- SQL
/* EXAMPLE: Identify quality degradation */
WITH daily_metrics AS (
SELECT
date_trunc('day', run_time) as run_date,
input_location,
metric_name,
CAST(metric_value AS DOUBLE) as metric_value
FROM
main.analytics.dq_metrics
WHERE
run_time >= current_date - INTERVAL 30 DAYS
AND metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count')
),
pivoted_metrics AS (
SELECT
run_date,
input_location,
MAX(CASE WHEN metric_name = 'input_row_count' THEN metric_value END) as input_count,
MAX(CASE WHEN metric_name = 'error_row_count' THEN metric_value END) as error_count,
MAX(CASE WHEN metric_name = 'warning_row_count' THEN metric_value END) as warning_count
FROM daily_metrics
GROUP BY run_date, input_location
)
SELECT
run_date,
input_location,
avg(error_count * 100.0 / NULLIF(input_count, 0)) as avg_error_rate,
avg(warning_count * 100.0 / NULLIF(input_count, 0)) as avg_warning_rate
FROM
pivoted_metrics
WHERE
input_count > 0
GROUP BY
run_date, input_location
ORDER BY
run_date DESC, input_location
When you need to explore detailed row-level quality results based on summary metrics for troubleshooting, the example below illustrates how to do so:
- Python
import pyspark.sql.functions as F
# fetch any metrics row as an example
metrics_row = spark.table(metrics_table_name).collect()[0]
run_id = metrics_row["run_id"]
output_table_name = metrics_row["output_location"]
# retrieve detailed results
output_df = spark.table(output_table_name)
# extract errors
results_df = output_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))
# extract warnings
results_df = output_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))
# Fetch detailed quality results using the run_id from summary metrics
filtered_results_df = results_df.filter(F.col("run_id") == run_id) # filter, or join
filtered_results_df.show()