Skip to main content

Summary Metrics

DQX provides comprehensive functionality to capture and store aggregate statistics about your data quality. This allows you to track data quality trends over time, monitor the health of your data pipelines, and gain insights into the overall quality of your datasets.

Overview

Summary metrics in DQX capture both built-in metrics (automatically calculated) and custom metrics (user-defined SQL expressions) during data quality checking. These metrics are collected using Spark's built-in Observation functionality and can be persisted to tables for historical analysis.

Built-in Metrics

DQX automatically captures the following built-in metrics for every data quality check execution:

Metric NameData TypeDescription
input_row_countintTotal number of input rows processed
error_row_countintNumber of rows that failed error-level checks
warning_row_countintNumber of rows that triggered warning-level checks
valid_row_countintNumber of rows that passed all checks

Custom Metrics

Users can define custom metrics with Spark SQL expressions. These metrics will be collected in addition to DQX's built-in metrics.

Avoid Expensive Operations

Summary metrics are calculated on all records processed by DQX. Complex aggregations can degrade performance when processing large datasets. Be cautious with operations like DISTINCT on high-cardinality columns.

Example of custom data quality summary metrics:

sum(array_size(_errors)) as total_errors
avg(array_size(_errors)) as errors_avg
count(case when array_size(_errors) > 1) as count_multiple_errors

See the Configuring Custom Metrics section for instructions on setting them up.

Programmatic approach

Accessing Metrics when Applying Checks

Engine methods for applying checks (e.g. apply_checks, apply_checks_by_metadata, apply_checks_and_split, apply_checks_by_metadata_and_split) can optionally return a Spark Observation with one or more output DataFrames. Data quality metrics can be accessed from the Spark Observation after any action is performed on the output DataFrames.

Streaming metrics

Metrics are not directly accessible from the returned Spark Observation when data is processed with streaming. Use DQX's built-in methods to persist streaming metrics to an output table. See Writing Metrics to a Table with Streaming for more details.

from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.sdk import WorkspaceClient

# Create observer
observer = DQMetricsObserver(name="dq_metrics")

# Create the engine with the optional observer
engine = DQEngine(WorkspaceClient(), observer=observer)

# Apply checks and get metrics
checked_df, observation = engine.apply_checks_by_metadata(df, checks)

# Apply checks, split and get metrics
#valid_df, quarantine_df, observation = engine.apply_checks_by_metadata_and_split(df, checks)

# Trigger an action to populate metrics (e.g., count, save to a table).
# Without triggering an action, metrics will not be populated, and accessing them will result in a stall.
row_count = checked_df.count()

# Access metrics
metrics = observation.get
print(f"Input row count: {metrics['input_row_count']}")
print(f"Error row count: {metrics['error_row_count']}")
print(f"Warning row count: {metrics['warning_row_count']}")
print(f"Valid row count: {metrics['valid_row_count']}")

Writing Metrics to a Table

Engine methods for applying checks (e.g. apply_checks_and_save_in_table, apply_checks_by_metadata_and_save_in_table, save_results_in_table, apply_checks_and_save_in_tables, apply_checks_and_save_in_tables_for_patterns) can write summary metrics into a table automatically. Metrics can be written to a table in batch or streaming. You can write metrics for different datasets or workloads into a common metrics table to track data quality over time centrally.

The name specified in the DQMetricsObserver is recorded as the run_name column in the metrics table. It is recommended to assign a unique name to the observer for each job or table to enable efficient filtering when centralizing metrics in a single table.

Compute requirements

To write metrics directly to a table, users must use a classic compute cluster with Dedicated access mode. This is due to a Spark Connect issue, which will be resolved in future versions.

Writing Metrics to a Table in Batch

Summary metrics can be written to a table when calling DQEngine methods to apply checks and write output data. When the input data is read as a batch source, metrics will be collected and written in batch.

from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient

# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]

# Create the observer
observer = DQMetricsObserver(name="dq_metrics")

# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)

# Create the input config for a batch data source
input_config = InputConfig("main.demo.input_data")

# Create the output, quarantine, and metrics configs
output_config = OutputConfig("main.demo.valid_data")
quarantine_config = OutputConfig("main.demo.quarantine_data") # optional
metrics_config = OutputConfig("main.demo.metrics_data") # optional

# Option 1: Apply checks and save metrics
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)
quarantine_df.count() # Trigger an action to populate metrics (e.g. count, save to a table), otherwise accessing them will result in a stall
engine.save_summary_metrics(
observed_metrics=observation.get,
metrics_config=metrics_config,
input_config=input_config, # used as info only
output_config=output_config, # used as info only
quarantine_config=quarantine_config, # used as info only
checks_location="checks.yml", # used as info only
)

# Option 2: Use End to End method: read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
engine.apply_checks_and_save_in_table(
checks=checks,
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)

Writing Metrics to a Table with Streaming

Summary metrics can also be written in streaming. When the input data is read as a streaming source, metrics will be written for each streaming micro-batch:

Supported methods

Metrics are not directly accessible from the returned Spark Observation when data is processed with streaming. You must use streaming metrics listener or end-to-end methods that persist the output in tables after quality checks are applied (e.g. e.g. apply_checks_and_save_in_table, apply_checks_by_metadata_and_save_in_table, save_results_in_table, apply_checks_and_save_in_tables, apply_checks_and_save_in_tables_for_patterns).

import time
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import InputConfig, OutputConfig
from databricks.sdk import WorkspaceClient

# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]

# Create the observer
observer = DQMetricsObserver(name="dq_metrics")

# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)

# Create the input config for a streaming data source
input_config = InputConfig("main.demo.input_data", is_streaming=True)

# Create the output, quarantine, and metrics configs
output_config = OutputConfig(
location="main.demo.valid_data",
trigger={"availableNow": True}, # stop the stream once all data is processed
options={"checkpointLocation": "/tmp/checkpoint/valid_data"} # use a Volume in production for persistence
)
quarantine_config = OutputConfig(
location="main.demo.quarantine_data",
trigger={"availableNow": True}, # stop the stream once all data is processed
options={"checkpointLocation": "/tmp/checkpoint/quarantine_data"} # use a Volume in production for persistence
)
metrics_config = OutputConfig("main.demo.metrics_data") # streaming configuration not required for metrics

# Option 1: Apply checks and save metrics
df = spark.readStream.table(input_config.location)
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)
output_query = valid_df.writeStream.format(output_config.format).outputMode(output_config.mode).options(**output_config.options).trigger(**output_config.trigger).toTable(output_config.location)
quarantine_query = quarantine_df.writeStream.format(quarantine_config.format).outputMode(quarantine_config.mode).options(**quarantine_config.options).trigger(**quarantine_config.trigger).toTable(quarantine_config.location)

listener = engine.get_streaming_metrics_listener(
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config,
target_query_id=quarantine_query.id,
)
# for streaming writing metrics requires a stream listener, observation cannot be accessed directly
# this adds a global listener for the current Spark session so do not add it again if reusing the same session
spark.streams.addListener(listener)

output_query.awaitTermination()
quarantine_query.awaitTermination()

# Option 2: Use End-to-End method: read the data, apply the checks, write data to valid and quarantine tables, and write metrics to the metrics table
# Output and quarantine data will be written in streaming and summary metrics will be written for each micro-batch
engine.apply_checks_and_save_in_table(
checks=checks,
input_config=input_config,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)

Saving Results and Metrics to a Table

Summary metrics can also be written to a table when calling save_results_in_table. After applying checks, pass the Spark Observation and output DataFrame(s) with the appropriate output configuration. This is supported for both batch and streaming.

from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule
from databricks.labs.dqx.config import OutputConfig
from databricks.sdk import WorkspaceClient

# Define the checks
checks = [
DQRowRule(
criticality="warn",
check_func=check_funcs.is_not_null,
column="col3",
),
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["col1", "col2"],
),
DQRowRule(
name="email_invalid_format",
criticality="error",
check_func=check_funcs.regex_match,
column="email",
check_func_kwargs={"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"},
),
]

# Create the observer
observer = DQMetricsObserver(name="dq_metrics")

# Create the engine with the metrics observer
engine = DQEngine(WorkspaceClient(), observer=observer)

# Apply checks, split and get metrics
valid_df, quarantine_df, observation = engine.apply_checks_and_split(df, checks)

# Apply checks and get metrics
#checked_df, observation = engine.apply_checks(df, checks)

# Create the output, quarantine, and metrics configs
output_config = OutputConfig("main.demo.valid_data")
quarantine_config = OutputConfig("main.demo.quarantine_data") # optional
metrics_config = OutputConfig("main.demo.metrics_data") # optional

# Write the data to valid and quarantine tables, and write metrics to the metrics table
engine.save_results_in_table(
output_df=valid_df,
quarantine_df=quarantine_df,
observation=observation,
output_config=output_config,
quarantine_config=quarantine_config,
metrics_config=metrics_config
)

Configuring Custom Metrics

Custom metrics are collected in addition to the built-in metrics. Pass custom metrics as Spark SQL expressions when creating the DQMetricsObserver. Custom metrics should be defined as Spark SQL expressions with column aliases and will be accessible by their alias.

from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.metrics_observer import DQMetricsObserver
from databricks.labs.dqx.config import InputConfig, OutputConfig

# Define custom metrics
custom_metrics = [
"sum(array_size(_errors)) as total_check_errors",
"sum(array_size(_warnings)) as total_check_warnings",
]

# Create the observer with custom metrics
observer = DQMetricsObserver(
name="business_metrics",
custom_metrics=custom_metrics
)

# Create the engine with the optional observer
engine = DQEngine(WorkspaceClient(), observer=observer)

# Apply checks and get metrics
checked_df, observation = engine.apply_checks_by_metadata(df, checks)

# Trigger an action to populate metrics (e.g., count, save to a table).
# Without triggering an action, metrics will not be populated, and accessing them will result in a stall.
checked_df.count() # Example action to ensure metrics are computed

# Access metrics
metrics = observation.get
print(f"Input row count: {metrics['input_row_count']}")
print(f"Error row count: {metrics['error_row_count']}")
print(f"Warning row count: {metrics['warning_row_count']}")
print(f"Valid row count: {metrics['valid_row_count']}")
print(f"Total check errors: {metrics['total_check_errors']}")
print(f"Total check warnings: {metrics['total_check_warnings']}")

Workflows Integration

No-Code Approach (Workflows)

When using DQX workflows, summary metrics are automatically configured based on your installation configuration file:

  1. Installation Configuration: During installation, specify metrics table and custom metrics.
  2. Automatic Observer Creation: Workflows automatically create DQMetricsObserver when metrics are configured.
  3. Metrics Persistence: Metrics are automatically saved to the configured table after each workflow run. The run_name is set to 'dqx' by default.

Configuration File Example

Metrics can be defined in the metrics_config section of your configuration file.

run_configs:
- name: production
input_config:
location: main.raw.sales_data
format: delta
output_config:
location: main.clean.sales_data
format: delta
mode: append
quarantine_config:
location: main.quarantine.sales_data
format: delta
mode: append
metrics_config: # Summary metrics configuration
location: main.analytics.dq_metrics
format: delta
mode: append
checks_location: main.config.quality_checks

# Global custom metrics (applied to all run configs)
custom_metrics:
- "avg(amount) as average_transaction_amount"
- "sum(case when region = 'US' then amount else 0 end) as us_revenue"
- "count(distinct customer_id) as unique_customers"

Once the config is defined you can start the workflows as usual using Databricks CLI, Databricks UI, Databricks API, or via Workflows scheduling:

# Run quality checker workflow with metrics enabled using Databricks CLI
databricks labs dqx apply-checks --run-config "production"

# Run end-to-end workflow with metrics enabled using Databricks CLI
databricks labs dqx e2e --run-config "production"

Metrics Table Schema

Summary metrics can be written and centralized in a delta table. The metrics table contains the following fields:

Column NameColumn TypeDescription
run_idSTRINGUnique run ID recorded in the summary metrics as well as detailed quality checking results to enable cross-referencing. When reusing the same DQEngine and observer instances, the run ID stays the same. Each apply checks execution does not generate a new run ID for the same instance. It is only changed when new engine and observer (if using one) is created.
run_nameSTRINGName of the metrics observer: name passed to DQMetricsObserver, or 'dqx' when applying checks using Workflows.
input_locationSTRINGLocation of the input dataset (table name or file path), if known.
output_locationSTRINGLocation of the output dataset (table name or file path).
quarantine_locationSTRINGLocation of the quarantine dataset (table name or file path), if used.
checks_locationSTRINGLocation where checks are stored (table name or file path), if known.
metric_nameSTRINGName of the metric (e.g., 'input_row_count').
metric_valueSTRINGValue of the metric (stored as string).
run_timeTIMESTAMPRun timestamp when the summary metrics were calculated.
error_column_nameSTRINGName of the error column in the output or quarantine table containing per row quality checking results (default: '_errors').
warning_column_nameSTRINGName of the warning column in the output or quarantine table containing per row quality checking results (default: '_warnings').
user_metadataMAP[STRING, STRING]User-defined, run-level metadata.

Best Practices

Performance Considerations

  1. Batch Metrics Collection: Collect metrics during regular data processing after output is written.
  2. Monitor Metrics Overhead: Complex custom metrics may impact processing performance.

Monitoring and Alerting using metrics table

Use cases:

  1. Track Trends: Monitor metrics over time to identify data quality degradation.
  2. Set Thresholds: Establish acceptable ranges for error rates and warning counts.
  3. Alert on Anomalies: Set up alerts when metrics deviate significantly from historical patterns, e.g. by using Databricks SQL Alerts.

The example below shows how you can analyze metrics persisted to a table:

/* EXAMPLE: Identify quality degradation */
WITH daily_metrics AS (
SELECT
date_trunc('day', run_time) as run_date,
input_location,
metric_name,
CAST(metric_value AS DOUBLE) as metric_value
FROM
main.analytics.dq_metrics
WHERE
run_time >= current_date - INTERVAL 30 DAYS
AND metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count')
),
pivoted_metrics AS (
SELECT
run_date,
input_location,
MAX(CASE WHEN metric_name = 'input_row_count' THEN metric_value END) as input_count,
MAX(CASE WHEN metric_name = 'error_row_count' THEN metric_value END) as error_count,
MAX(CASE WHEN metric_name = 'warning_row_count' THEN metric_value END) as warning_count
FROM daily_metrics
GROUP BY run_date, input_location
)
SELECT
run_date,
input_location,
avg(error_count * 100.0 / NULLIF(input_count, 0)) as avg_error_rate,
avg(warning_count * 100.0 / NULLIF(input_count, 0)) as avg_warning_rate
FROM
pivoted_metrics
WHERE
input_count > 0
GROUP BY
run_date, input_location
ORDER BY
run_date DESC, input_location

When you need to explore detailed row-level quality results based on summary metrics for troubleshooting, the example below illustrates how to do so:

import pyspark.sql.functions as F

# fetch any metrics row as an example
metrics_row = spark.table(metrics_table_name).collect()[0]
run_id = metrics_row["run_id"]
output_table_name = metrics_row["output_location"]

# retrieve detailed results
output_df = spark.table(output_table_name)

# extract errors
results_df = output_df.select(
F.explode(F.col("_errors")).alias("result"),
).select(F.expr("result.*"))

# extract warnings
results_df = output_df.select(
F.explode(F.col("_warnings")).alias("result"),
).select(F.expr("result.*"))

# Fetch detailed quality results using the run_id from summary metrics
filtered_results_df = results_df.filter(F.col("run_id") == run_id) # filter, or join
filtered_results_df.show()