Query Results Cookbook
This cookbook is a practical guide for querying DQX result tables - summary metrics, output, quarantine, and checks - to trace errors and warnings across runs, rows, and check definitions. It will help you go from a failing run to the rows that failed, link those rows back to the rules that flagged them, and monitor quality trends across runs. It assumes you've already run DQX at least once.
Before you start, take a minute with the Table Relationships diagram in the schema reference. It is important because every query here touches on one of the relationships shown there.
The examples below use these placeholder table names. Please replace them with your actual catalog, schema, and table names:
catalog.schema.output_table- output tablecatalog.schema.quarantine_table- quarantine table (when configured)catalog.schema.checks_table- checks storage tablecatalog.schema.summary_metrics- summary metrics tablecatalog.schema.my_table- the input table being checked
The default error and warning column names are _errors and _warnings.
If you customized them, adjust the queries accordingly.
Where to start
Find what you already have in the table below, then jump to the recipe that uses it as a starting point:
| What you have | What you typically need | Go to |
|---|---|---|
run_id | The rows that failed in that run | 2. Summary to Row-Level Errors |
run_id | The rule set (the collection of rules applied together in that run) | 4. Summary to Checks Table |
run_id | All checks that failed in the run | 5. Which Checks Failed for a Run |
run_id | Every rule that was active (passed or failed) | 6. Which Rules Were Applied in a Run |
| Failing row | The run it came from | 3. Row Details |
| Failing row | The check definition that caused the failure | 3. Row Details |
| Time window | Quality trends across runs | 7. Run Tracking Over Time |
| Table name (multi-table run) | Runs for that specific table only | 8. Scoping a Multi-Table Run to One Table |
| Shared checks table | Rules grouped by run_config_name | 9. Checks by Run Config |
| Failing check | The first run it appeared in | 10. First Failure per Check |
| Check name | All quarantine rows that failed that check | 11. All Failures per Check |
| Run or time window | Wide summary view with metrics as columns | 12. Summary Metrics Pivot |
New to the tables? Start with Recipe 1 - A quick peek at each table for orientation.
Use this table to find the right place to look depending on your setup:
| What I'm looking for | With quarantine configured | Without quarantine |
|---|---|---|
| Rows with errors | quarantine_table | output_table where _errors IS NOT NULL |
| Rows with warnings | quarantine_table | output_table where _warnings IS NOT NULL |
| Clean rows (no errors and no warnings) | output_table | output_table where both are null |
The recipes below assume quarantine is configured. If it isn't, substitute output_table for quarantine_table and add the filter from the table above.
1. A quick peek at each table
Goal: Get familiar with the shape and grain of each table before writing any joins.
When to use: You are new to the DQX result tables, or unsure what a single row in each one represents.
Returns: Five sample rows from each of the four result tables.
Before any joins, it is good to look at what is actually in each table. One row of each table tells you the grain of what a single row represents, which is the foundation for every aggregation you will write later.
- SQL
- Python
-- Summary metrics: one row per metric per run
SELECT * FROM catalog.schema.summary_metrics LIMIT 5;
-- Output: rows without any errors (no _errors or _warnings columns)
SELECT * FROM catalog.schema.output_table LIMIT 5;
-- Quarantine: rows with any errors or warnings, with _errors and _warnings populated
SELECT * FROM catalog.schema.quarantine_table LIMIT 5;
-- Checks: one row per check definition
SELECT * FROM catalog.schema.checks_table LIMIT 5;
display(spark.table("catalog.schema.summary_metrics").limit(5))
display(spark.table("catalog.schema.output_table").limit(5))
display(spark.table("catalog.schema.quarantine_table").limit(5))
display(spark.table("catalog.schema.checks_table").limit(5))
What to notice:
summary_metricsis in long format, one row per metric per run. Summary metrics are not collected by default - you must enable them explicitly. To get a wide view, pivot onmetric_name(Recipe 12).- Output and quarantine rows contain your original data plus two array columns:
_errorsand_warnings. Each array element is a struct describing one failed DQX check. - The checks table has a
rule_fingerprint(a hash of one individual rule's definition - its function, arguments, and criticality) and arule_set_fingerprint(a hash of the entire rule set - the collection of rules applied together in a single run). Both fingerprints appear inside each error and warning struct, which is how recipes 3 and 4 join back to the checks table.
2. Summary to Row-Level Errors
Goal: Given a run in the summary metrics table, use its run_id to find the rows that failed.
When to use: A run looks bad in the summary table and you want the individual rows behind those counts.
Returns: One row per failed check occurrence - the original row joined to the exploded _errors (or _warnings) struct.
Start by getting a run_id from the summary table. Every recipe that takes a run_id as input begins here.
- SQL
- Python
-- Step 1: list recent runs (newest first) and copy a run_id from the output.
SELECT
run_id,
run_time,
output_location,
quarantine_location,
rule_set_fingerprint
FROM catalog.schema.summary_metrics
ORDER BY run_time DESC;
-- Step 2: replace <run_id> with a value from the query above.
SELECT t.*, e.*
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS e
WHERE e.run_id = '<run_id>';
-- Use the same pattern with _warnings to inspect warning-level issues.
SELECT t.*, w.*
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._warnings) AS w
WHERE w.run_id = '<run_id>';
import pyspark.sql.functions as F
# Step 1: list recent runs (newest first); the example takes the most recent, but you can pick any run_id.
runs = (
spark.table("catalog.schema.summary_metrics")
.orderBy(F.col("run_time").desc())
)
display(runs)
latest = runs.first()
run_id = latest["run_id"]
quarantine_table = latest["quarantine_location"]
# spark.table() works when quarantine_location is a table name (e.g. catalog.schema.table).
# If it is a storage path (/Volumes/..., s3://..., abfss://...), use spark.read.format(...).load(quarantine_table) instead.
# Step 2: explode errors and warnings for that run.
errors_df = (
spark.table(quarantine_table)
.select(F.explode(F.col("_errors")).alias("result"))
.select(F.expr("result.*"))
.filter(F.col("run_id") == run_id)
)
warnings_df = (
spark.table(quarantine_table)
.select(F.explode(F.col("_warnings")).alias("result"))
.select(F.expr("result.*"))
.filter(F.col("run_id") == run_id)
)
display(errors_df)
display(warnings_df)
3. Row Details
To Run Summary
Goal: From a failing row, look up the run metadata in the summary table.
When to use: You are looking at a output / quarantined row and need its run context - timing, input/output locations, and fingerprints.
Returns: The summary-metrics rows for every run that produced at least one error or warning.
- SQL
- Python
SELECT sm.*
FROM catalog.schema.summary_metrics sm
INNER JOIN (
SELECT DISTINCT e.run_id
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS e
UNION
SELECT DISTINCT w.run_id
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._warnings) AS w
) runs ON sm.run_id = runs.run_id
ORDER BY sm.run_time DESC;
import pyspark.sql.functions as F
quarantine_df = spark.table("catalog.schema.quarantine_table")
error_run_ids = (
quarantine_df
.select(F.explode(F.col("_errors")).alias("result"))
.select(F.expr("result.*"))
.select("run_id")
)
warning_run_ids = (
quarantine_df
.select(F.explode(F.col("_warnings")).alias("result"))
.select(F.expr("result.*"))
.select("run_id")
)
run_ids_df = error_run_ids.union(warning_run_ids).distinct()
summary_df = spark.table("catalog.schema.summary_metrics")
result_df = summary_df.join(run_ids_df, on="run_id")
display(result_df)
To Checks Table
Goal: From a failing row, get the full check definition (function, arguments, filter) that produced the failure.
When to use: You have a failing row and want the exact rule that flagged it, not just the error message.
Returns: One row per error, each with its check function, arguments, filter, and criticality from the checks table.
Rules defined with for_each_column are stored as a single row in the checks table but expand into one error per column at runtime, each with its own rule_fingerprint. These per-column fingerprints do not match the parent row in the checks table, so the joins below will not return those rules. To find the originating rule, list all active rules for the run with Recipe 6, look for the ones where check.for_each_column IS NOT NULL, and match them by check.function and column membership.
- SQL
- Python
SELECT
e.name AS error_name,
e.message,
e.columns,
c.check.function,
c.check.arguments,
c.filter,
c.criticality
FROM (
SELECT errors.*
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS errors
) e
INNER JOIN catalog.schema.checks_table c
ON e.rule_fingerprint = c.rule_fingerprint
AND e.rule_set_fingerprint = c.rule_set_fingerprint;
import pyspark.sql.functions as F
errors_df = (
spark.table("catalog.schema.quarantine_table")
.select(F.explode(F.col("_errors")).alias("result"))
.select(F.expr("result.*"))
)
checks_df = spark.table("catalog.schema.checks_table")
result_df = (
errors_df.alias("e")
.join(checks_df.alias("c"), on=["rule_fingerprint", "rule_set_fingerprint"])
.select(
F.col("e.name").alias("error_name"),
F.col("e.message"),
F.col("e.columns"),
F.col("c.check.function"),
F.col("c.check.arguments"),
F.col("c.filter"),
F.col("c.criticality"),
)
)
display(result_df)
4. Summary to Checks Table
Goal: From a summary metrics run, find the exact rule set that was applied.
When to use: You know a run_id and want the complete set of rules that ran, regardless of whether they failed.
Returns: All checks-table rows belonging to that run's rule_set_fingerprint.
checks_location in the summary metrics table is null when checks were passed inline rather than loaded from a storage table. The queries below join on rule_set_fingerprint only, so they work either way - but you will need to know your checks table name upfront if checks_location is null.
- SQL
- Python
-- Get the rule set used in a given run
SELECT c.*
FROM catalog.schema.checks_table c
INNER JOIN (
SELECT DISTINCT checks_location, rule_set_fingerprint
FROM catalog.schema.summary_metrics
WHERE run_id = '<run_id>'
) sm
ON c.rule_set_fingerprint = sm.rule_set_fingerprint;
import pyspark.sql.functions as F
# Get rule_set_fingerprint from a specific run
summary_df = spark.table("catalog.schema.summary_metrics")
run_info = (
summary_df
.filter(F.col("run_id") == "<run_id>")
.select("rule_set_fingerprint")
.first()
)
# Load the matching checks
checks_df = (
spark.table("catalog.schema.checks_table")
.filter(F.col("rule_set_fingerprint") == run_info["rule_set_fingerprint"])
)
display(checks_df)
You can also load checks by fingerprint using DQX:
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import TableChecksStorageConfig
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks(
config=TableChecksStorageConfig(
location="catalog.schema.checks_table",
rule_set_fingerprint="<fingerprint>",
)
)
5. Which Checks Failed for a Run
Goal: For a given run, list all checks that had errors or warnings.
When to use: You want a quick per-check error/warning tally for one run without touching the row-level tables.
Returns: One row per failing check with its error and warning counts.
The check_metrics field in the summary table already contains a per-check breakdown - no joins needed for a quick summary.
- SQL
- Python
SELECT
cm.check_name,
cm.error_count,
cm.warning_count
FROM catalog.schema.summary_metrics
LATERAL VIEW explode(from_json(metric_value, 'array<struct<check_name:string,error_count:bigint,warning_count:bigint>>')) AS cm
WHERE run_id = '<run_id>'
AND metric_name = 'check_metrics'
AND (cm.error_count > 0 OR cm.warning_count > 0);
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, LongType
import pyspark.sql.functions as F
check_metrics_schema = ArrayType(StructType([
StructField("check_name", StringType()),
StructField("error_count", LongType()),
StructField("warning_count", LongType()),
]))
failed_checks_df = (
spark.table("catalog.schema.summary_metrics")
.filter((F.col("run_id") == "<run_id>") & (F.col("metric_name") == "check_metrics"))
.select(F.explode(F.from_json(F.col("metric_value"), check_metrics_schema)).alias("cm"))
.select("cm.*")
.filter((F.col("error_count") > 0) | (F.col("warning_count") > 0))
)
display(failed_checks_df)
If you also need the full check definition (function, arguments, filter), join to the checks table.
With Check Definitions
Returns: One row per distinct failing check, with its function, arguments, and filter (null for for_each_column rules - see the note below).
- SQL
- Python
SELECT
e.name,
e.rule_fingerprint,
c.check.function,
c.check.arguments,
c.filter
FROM (
SELECT DISTINCT errors.name, errors.rule_fingerprint, errors.rule_set_fingerprint
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS errors
WHERE errors.run_id = '<run_id>'
) e
LEFT JOIN catalog.schema.checks_table c
ON e.rule_fingerprint = c.rule_fingerprint
AND e.rule_set_fingerprint = c.rule_set_fingerprint;
import pyspark.sql.functions as F
errors_df = (
spark.table("catalog.schema.quarantine_table")
.select(F.explode(F.col("_errors")).alias("result"))
.select(F.expr("result.*"))
.filter(F.col("run_id") == "<run_id>")
)
checks_df = spark.table("catalog.schema.checks_table")
failed_checks_df = (
errors_df
.select(
F.col("name").alias("error_name"),
"rule_fingerprint",
"rule_set_fingerprint",
)
.distinct()
.join(checks_df, on=["rule_fingerprint", "rule_set_fingerprint"], how="left")
)
display(failed_checks_df)
6. Which Rules Were Applied in a Run
Goal: List all rules that were active for a given run, whether or not they produced failures.
When to use: Auditing what actually ran for a run, including the checks that passed.
Returns: One row per rule in that run's rule set, with name, criticality, function, arguments, and filter.
- SQL
- Python
SELECT c.name, c.criticality, c.check.function, c.check.arguments, c.filter
FROM catalog.schema.checks_table c
WHERE c.rule_set_fingerprint = (
SELECT DISTINCT rule_set_fingerprint
FROM catalog.schema.summary_metrics
WHERE run_id = '<run_id>'
)
ORDER BY c.name;
import pyspark.sql.functions as F
rsf = (
spark.table("catalog.schema.summary_metrics")
.filter(F.col("run_id") == "<run_id>")
.select("rule_set_fingerprint")
.first()["rule_set_fingerprint"]
)
rules_df = (
spark.table("catalog.schema.checks_table")
.filter(F.col("rule_set_fingerprint") == rsf)
.select("name", "criticality", "check.function", "check.arguments", "filter")
.orderBy("name")
)
display(rules_df)
7. Run Tracking Over Time
Goal: Monitor data quality trends across runs with error rates, warning rates, and row counts.
When to use: Tracking quality over time, or building a monitoring dashboard or alert.
Returns: One row per run with input/valid/error/warning row counts and the error rate as a percentage.
- SQL
- Python
SELECT
run_id,
run_time,
input_location,
MAX(CASE WHEN metric_name = 'input_row_count' THEN CAST(metric_value AS BIGINT) END) AS input_rows,
MAX(CASE WHEN metric_name = 'valid_row_count' THEN CAST(metric_value AS BIGINT) END) AS valid_rows,
MAX(CASE WHEN metric_name = 'error_row_count' THEN CAST(metric_value AS BIGINT) END) AS error_rows,
MAX(CASE WHEN metric_name = 'warning_row_count' THEN CAST(metric_value AS BIGINT) END) AS warning_rows,
ROUND(
MAX(CASE WHEN metric_name = 'error_row_count' THEN CAST(metric_value AS DOUBLE) END) /
NULLIF(MAX(CASE WHEN metric_name = 'input_row_count' THEN CAST(metric_value AS DOUBLE) END), 0) * 100,
2
) AS error_rate_pct
FROM catalog.schema.summary_metrics
GROUP BY run_id, run_time, input_location
ORDER BY run_time DESC;
import pyspark.sql.functions as F
summary_df = spark.table("catalog.schema.summary_metrics")
trend_df = (
summary_df.filter(
F.col("metric_name").isin("input_row_count", "valid_row_count", "error_row_count", "warning_row_count")
)
.groupBy("run_id", "run_time", "input_location")
.pivot("metric_name")
.agg(F.first(F.col("metric_value").cast("double")))
.withColumn(
"error_rate_pct",
F.round(F.col("error_row_count") / F.col("input_row_count") * 100, 2),
)
.orderBy(F.col("run_time").desc())
)
display(trend_df)
Use Databricks SQL Alerts on the run tracking query to get notified when error rates exceed a threshold. See Summary Metrics for more on monitoring best practices.
8. Scoping a Multi-Table Run to One Table
Goal: Summary metrics centralize data quality results in a single table, so that table typically holds results for many input tables at once. Filter by input_location to scope any query down to a single target table's runs.
When to use: Your metrics table covers many input tables and you want to focus on just one.
Returns: The distinct input tables tracked, and the runs for a chosen table (newest first).
This is the entry point for Recipes 2, 5, 6, and 7 in a multi-table setup. Without this filter, trend queries aggregate across all tables and produce meaningless mixed rates.
- SQL
- Python
-- List all distinct input tables tracked in the metrics table
SELECT DISTINCT input_location
FROM catalog.schema.summary_metrics
ORDER BY input_location;
-- List runs for one specific input table, newest first; copy a run_id from the output.
SELECT
run_id,
run_time,
input_location,
output_location,
quarantine_location,
checks_location,
rule_set_fingerprint
FROM catalog.schema.summary_metrics
WHERE input_location = 'catalog.schema.my_table'
ORDER BY run_time DESC;
import pyspark.sql.functions as F
summary_df = spark.table("catalog.schema.summary_metrics")
# List all tracked input tables
summary_df.select("input_location").distinct().orderBy("input_location").display()
# List runs for a specific table, newest first; the example takes the most recent, but you can pick any run_id.
table_runs = (
summary_df
.filter(F.col("input_location") == "catalog.schema.my_table")
.orderBy(F.col("run_time").desc())
)
display(table_runs)
latest = table_runs.first()
run_id = latest["run_id"]
9. Checks by Run Config
Goal: When multiple input tables share a single checks table (grouped by run_config_name), list the available run configs and load the rules for a specific one.
When to use: Rules for serveral tables are stored in the same checks table keyed by run_config_name, and you want the rules for one of them.
Returns: The distinct run_config_name values, and the rules stored under a chosen one.
run_config_name is the logical group identifier written by save_checks and used as a filter by load_checks - it is a regular column, not a partition. The default value is "default". When checks for different tables are saved to the same checks table with distinct run_config_name values (e.g. the table name), you must use the same value at load time or load_checks returns empty.
- SQL
- Python
-- List all run_config_name values in a shared checks table
SELECT DISTINCT run_config_name
FROM catalog.schema.checks_table
ORDER BY run_config_name;
-- Load checks for a specific run config
SELECT *
FROM catalog.schema.checks_table
WHERE run_config_name = 'catalog.schema.my_table'
ORDER BY name;
import pyspark.sql.functions as F
checks_df = spark.table("catalog.schema.checks_table")
# List all run configs
checks_df.select("run_config_name").distinct().orderBy("run_config_name").display()
# Load checks for a specific run config
checks_df.filter(F.col("run_config_name") == "catalog.schema.my_table").display()
Use load_checks with the matching run_config_name to get the rules as DQRule objects:
from databricks.labs.dqx.engine import DQEngine
from databricks.labs.dqx.config import TableChecksStorageConfig
from databricks.sdk import WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())
checks = dq_engine.load_checks(
config=TableChecksStorageConfig(
location="catalog.schema.checks_table",
run_config_name="catalog.schema.my_table",
)
)
10. First Failure per Check
Goal: For each check, find the earliest run in which it produced an error. Use this to identify when a data quality regression was introduced.
When to use: A check is failing now and you want to know when it first started, to correlate with an upstream change.
Returns: One row per check with its earliest failing run_time and the run_id of that first failure.
This extends Recipe 7. Instead of the current error rate, you want the first run_time a particular check failed - useful for correlating regressions with upstream changes.
- SQL
- Python
SELECT
cm.check_name,
MIN(sm.run_time) AS first_failure_time,
MIN_BY(sm.run_id, sm.run_time) AS first_failure_run_id
FROM catalog.schema.summary_metrics sm
LATERAL VIEW explode(from_json(metric_value, 'array<struct<check_name:string,error_count:bigint,warning_count:bigint>>')) AS cm
WHERE metric_name = 'check_metrics'
AND cm.error_count > 0
GROUP BY cm.check_name
ORDER BY first_failure_time DESC;
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, LongType
import pyspark.sql.functions as F
check_metrics_schema = ArrayType(StructType([
StructField("check_name", StringType()),
StructField("error_count", LongType()),
StructField("warning_count", LongType()),
]))
first_failures_df = (
spark.table("catalog.schema.summary_metrics")
.filter(F.col("metric_name") == "check_metrics")
.select(
"run_id",
"run_time",
F.explode(F.from_json(F.col("metric_value"), check_metrics_schema)).alias("cm"),
)
.select("run_id", "run_time", F.col("cm.check_name"), F.col("cm.error_count"))
.filter(F.col("error_count") > 0)
.groupBy("check_name")
.agg(
F.min("run_time").alias("first_failure_time"),
F.min_by("run_id", "run_time").alias("first_failure_run_id"),
)
.orderBy(F.col("first_failure_time").desc())
)
display(first_failures_df)
11. All Failures per Check
Goal: Starting from the per-check counts in the summary metrics table, drill into the quarantine table to get every row that failed each check.
When to use: You need the actual offending rows for a check, not just the counts - for triage or to share examples.
Returns: First, failure counts per check per run; then every quarantine row that failed one chosen check.
Recipe 5 reads the check_metrics field in the summary table to tell you how many errors and warnings each check produced per run. This recipe takes the next step: it reconciles those counts against the actual rows in the quarantine table and lets you pull the failing rows.
First, count the failures per check directly from the quarantine table. Grouping the exploded _errors by e.name and e.run_id gives the row-level equivalent of the summary error_count, so the totals should match Recipe 5 for the same run.
- SQL
- Python
SELECT
e.run_id,
e.name AS check_name,
COUNT(*) AS failed_rows
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS e
GROUP BY e.run_id, e.name
ORDER BY e.run_id, failed_rows DESC;
import pyspark.sql.functions as F
failures_per_check_df = (
spark.table("catalog.schema.quarantine_table")
.select(F.explode(F.col("_errors")).alias("e"))
.groupBy(F.col("e.run_id").alias("run_id"), F.col("e.name").alias("check_name"))
.agg(F.count("*").alias("failed_rows"))
.orderBy("run_id", F.col("failed_rows").desc())
)
display(failures_per_check_df)
Then drill into a single check to get the failing rows. Replace <your_check_name> with the value from the check_name column above (the same value Recipe 5 surfaces under e.name). Both tabs return the same shape: every original column (including _errors and _warnings) plus the run and check identifiers from the matching error.
- SQL
- Python
SELECT
t.*,
e.run_id,
e.run_time,
e.name AS check_name,
e.rule_fingerprint
FROM catalog.schema.quarantine_table t
LATERAL VIEW explode(t._errors) AS e
WHERE e.name = '<your_check_name>'
ORDER BY e.run_time DESC;
import pyspark.sql.functions as F
# Replace with the check name you want to inspect, same value as e.name in Recipe 5.
check_name = "<your_check_name>"
failures_df = (
spark.table("catalog.schema.quarantine_table")
.withColumn("e", F.explode(F.col("_errors")))
.filter(F.col("e.name") == check_name)
.select(
"*",
F.col("e.run_id"),
F.col("e.run_time"),
F.col("e.name").alias("check_name"),
F.col("e.rule_fingerprint"),
)
.drop("e")
.orderBy(F.col("run_time").desc())
)
display(failures_df)
12. Summary Metrics Pivot
Goal: Transform the long-format summary metrics table into a wide view with one row per run and metric values as columns.
When to use: Building a wide dashboard table, or comparing the row-count metrics side by side instead of in long format.
Returns: One row per run (or per day) with each metric as its own column.
summary_metrics stores one row per metric per run. Pivoting turns those rows into columns, making it straightforward to compare input_row_count, error_row_count, warning_row_count, and valid_row_count side by side. The second query groups by day instead of individual run - useful for dashboards and trend charts that extend Recipe 7.
- SQL
- Python
-- One row per run, metrics as columns.
SELECT *
FROM (
SELECT run_id, run_time, metric_name, CAST(metric_value AS BIGINT) AS metric_value
FROM catalog.schema.summary_metrics
WHERE metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count', 'valid_row_count')
)
PIVOT (
MAX(metric_value)
FOR metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count', 'valid_row_count')
)
ORDER BY run_time DESC;
-- Same shape but grouped by day. Each metric uses MAX, i.e. the largest single run that day
-- (useful for peak/capacity views). Swap MAX for SUM if you want the total volume across all
-- runs in the day instead. Change the DATE_TRUNC granularity as needed.
SELECT
DATE_TRUNC('day', run_time) AS run_day,
MAX(CASE WHEN metric_name = 'input_row_count' THEN CAST(metric_value AS BIGINT) END) AS input_row_count,
MAX(CASE WHEN metric_name = 'error_row_count' THEN CAST(metric_value AS BIGINT) END) AS error_row_count,
MAX(CASE WHEN metric_name = 'warning_row_count' THEN CAST(metric_value AS BIGINT) END) AS warning_row_count,
MAX(CASE WHEN metric_name = 'valid_row_count' THEN CAST(metric_value AS BIGINT) END) AS valid_row_count
FROM catalog.schema.summary_metrics
WHERE metric_name IN ('input_row_count', 'error_row_count', 'warning_row_count', 'valid_row_count')
GROUP BY run_day
ORDER BY run_day DESC;
import pyspark.sql.functions as F
metrics = ["input_row_count", "error_row_count", "warning_row_count", "valid_row_count"]
# Row counts per run: one row per run_id and run_time, metrics spread as columns.
pivot_df = (
spark.table("catalog.schema.summary_metrics")
.filter(F.col("metric_name").isin(metrics))
.withColumn("metric_value", F.col("metric_value").cast("long"))
.groupBy("run_id", "run_time")
.pivot("metric_name", metrics)
.agg(F.max("metric_value"))
.orderBy(F.col("run_time").desc())
)
display(pivot_df)
# Grouped by day. F.max gives the largest single run that day (peak/capacity view).
# Swap F.max for F.sum to get the total volume across all runs in the day instead.
# Change "day" to "week" or "month" as needed.
daily_df = (
spark.table("catalog.schema.summary_metrics")
.filter(F.col("metric_name").isin(metrics))
.withColumn("metric_value", F.col("metric_value").cast("long"))
.withColumn("run_day", F.date_trunc("day", F.col("run_time")))
.groupBy("run_day")
.pivot("metric_name", metrics)
.agg(F.max("metric_value"))
.orderBy(F.col("run_day").desc())
)
display(daily_df)