Skip to main content

Report

The Report class is the central entry point for defining and executing an analysis. It loads configuration, sets up the data source and sink, and orchestrates computation and persistence.

Creating a Report

from databricks.sdk import WorkspaceClient
from mda_reporting.core.report import Report

ws = WorkspaceClient()

# From a JSON config file
my_report = Report(name="my_report", spark=spark, workspace_client=ws, config_path="./config/config.json")

# From a dictionary
my_report = Report(name="my_report", spark=spark, workspace_client=ws, config=config_dict)
ParameterTypeDescription
namestrName of the report. Used to generate a unique report_id.
sparkSparkSessionActive Spark session for data processing.
workspace_clientWorkspaceClientAuthenticated Databricks workspace client.
configdict, optionalConfiguration as a Python dictionary.
config_pathstr, optionalPath to a JSON configuration file.

Either config or config_path must be provided.

Report methods

MethodDescriptionArguments
get_db()Returns the MeasurementDB instance for signal definition.--
get_solver()Returns the active QuerySolver.--
get_sink_config()Returns the active SinkConfig (e.g. UnitySinkConfig with catalog_name, schema_name, table_prefix).--
add_page(page)Adds a Page to the report.page: Page instance.
add_event(event)Registers an Event with the report. All events used by aggregations must be registered.event: Event instance.
determine_report(is_incremental)Computes all events, aggregations, and container dimensions. Results are stored on the report object.is_incremental: bool or None. Mode hint; overridden by config.incremental when present. See Incremental.
persist_results()Writes all computed results (fact and dimension tables) to the configured Gold layer sink.--

Execution workflow

# 1. Define signals, events, aggregations (see sections below)
# 2. Add events and pages
my_report.add_event(my_event)
my_report.add_page(page)

# 3. Compute
my_report.determine_report()

# 4. Persist
my_report.persist_results()

determine_report() validates that every event referenced by an aggregation has been registered with add_event() before computation begins.


Configuration

Configuration is defined as JSON (or an equivalent Python dictionary) and validated using Pydantic models.

Configuration schema

{
"source": {
"container_metrics_table": "catalog.schema.container_metrics",
"channel_metrics_table": "catalog.schema.channel_metrics",
"channels_uri": "catalog.schema.channels",
"container_tags_table": "catalog.schema.container_tags",
"channel_tags_table": "catalog.schema.channel_tags"
},
"unity_sink": {
"catalog": "my_catalog",
"schema": "gold",
"table_prefix": "my_report"
},
"query_engine": {
"solver": "DeltaSolver"
},
"container_filters": {
"tag_filters": [
[
{ "tag_name": "uut_id", "comparator": "==", "value": "ABC123", "cast_type": "string" }
]
],
"metric_filters": [
[
{ "column_name": "start_dt", "comparator": ">=", "value": "2025-04-27T05:20:54.000Z", "value_type": "timestamp" },
{ "column_name": "stop_dt", "comparator": "<=", "value": "2025-04-27T06:00:00.000Z", "value_type": "timestamp" }
]
]
},
"measurement_dimensions": ["container_id", "vehicle_key", "start_ts", "stop_ts"]
}

Source

Defines the Silver layer input tables. All table names must follow Unity Catalog naming: catalog.schema.table.

FieldTypeRequiredDescription
container_metrics_tablestrYesContainer metadata (start/stop times, duration).
channel_metrics_tablestrYesChannel-level statistics (min, max, mean, sample count).
channels_uristrYesRaw time-series data (timestamps + values).
container_tags_tablestrNoKey-value tags for containers.
channel_tags_tablestrNoKey-value tags for channels.

Unity Sink

Defines where Gold layer output tables are written.

FieldTypeRequiredDescription
catalogstrYesTarget Unity Catalog name.
schemastrYesTarget schema name.
table_prefixstrYesPrefix for all generated table names.

Output tables are named {table_prefix}_{entity}, for example my_report_histogram_fact.

Query Engine

FieldTypeDefaultDescription
solverstr"BasicNarrowSolver""BasicNarrowSolver", "DeltaSolver", or "KeyValueStoreSolver".
data_typestr"RLE""RLE" (intervals [tstart, tend)) or "RAW" (raw timestamps; converted to RLE before aggregation).
drop_implausible_databoolfalseWhen true, drops rows where is_plausible = false. Requires data_type = "RAW"; combining with "RLE" raises a validation error.
project_idstrnullRequired when solver = "KeyValueStoreSolver".
parent_idstrnullOptional parent-entity filter on the concept-entities table (KVS solver only), e.g. "uut_concept".
entity_maps_tostr"uut_id"How entity_id in concept-entities maps to container_metrics: "uut_id" (1-to-many) or "container_id" (1-to-1).
solver_configSolverConfignullColumn-name overrides for custom silver schemas. See SolverConfig below.

SolverConfig

Maps the solver's internal column names to the actual silver-layer column names. Only needed when the silver schema diverges from the defaults.

FieldTypeDefaultDescription
container_id_colstr"container_id"Column identifying a container.
channel_id_colslist[str]["container_id", "channel_id"]Columns that together uniquely identify a channel.
channel_data_mappingdict[str, str]{"tstart": "tstart", "tend": "tend", "value": "value"}Maps internal keys (tstart, tend, value) to silver column names on channels_uri.
container_meta_data_mappingdict[str, str]{"project_id": "project_id"}Maps internal keys (project_id) to silver column names on container_metrics_table.
entity_id_colstr"entity_id"Entity id column in concept-entities / KVS tables.
parent_id_colstr"parent_id"Parent-entity id column in concept-entities / KVS tables.

Incremental (optional)

Incremental processing lets determine_report() skip containers and definitions it already handled on a prior run. Turn it on via incremental.enabled in config, or pass is_incremental=True at call time.

On each run

  1. Compare every event and aggregation against its stored definition_hash in the gold dimension table. Classify each as changed (hash differs, or it's brand new) or unchanged (hash matches).
  2. For unchanged definitions, process only the containers that are new or have newer silver data than gold. Skip the rest.
  3. For changed definitions, reprocess all containers that match the report's filters.
  4. Persist via Delta MERGE on natural keys for unchanged definitions; replace atomically via replaceWhere on visual_id or event_id for changed ones.

Mode resolution

Report._resolve_is_incremental picks the mode in this order:

  1. No gold layer yet? Run full. Nothing to compare against.
  2. config.incremental set? config.incremental.enabled wins.
  3. Otherwise the is_incremental argument to determine_report() wins.
  4. Neither set? Run full.

The first run of a new report is always full. Subsequent runs pick up where the last one left off.

What counts as a definition change

Only the hashed attributes matter. Anything else is cosmetic and won't trigger reprocessing.

TypeHashed
BasicEventexpr string
ContainerEventname
Histogrambase_expr, bins, event
Histogram2Dx_expr, y_expr, x_bins, y_bins, event
StatsAggregatorinput_expressions, statistics, event

Renaming an aggregation, tweaking the description, or swapping channel_name or units keeps the hash stable. No reprocessing.

Container-update detection

ContainerUpsertDetector.detect_upserted_containers finds two things and unions them:

  • New containers: silver rows that don't exist in gold (left anti-join on container_id).
  • Updated containers: silver rows where silver_last_modified_column is newer than the matching gold gold_last_modified_column. If either column is missing from its side, update detection is silently skipped and only new containers get picked up.

Config fields

FieldTypeDefaultDescription
enabledboolfalseTurns incremental processing on.
silver_last_modified_columnstr"timestamp"Silver-side column used to detect container updates.
gold_last_modified_columnstr"_created_at"Gold-side column used to detect prior-run freshness.

Operational notes

  • A single run can be partly incremental: one event is changed (full reprocess), another is unchanged (upserted containers only), a newly added aggregation is brand new (also full reprocess). Each entity walks its own path.
  • replaceWhere is atomic per fact table. When a definition changes, all rows for that visual_id or event_id get deleted and rewritten in one transaction. No intermediate inconsistent state, but there is a brief rewrite window.
  • MERGE keeps existing rows that don't conflict, so unchanged definitions accumulate rows for new containers without rewriting the old ones.

Container Filters (optional)

Restricts the set of containers that are processed. Filters are expressed in disjunctive normal form — each inner list is AND-combined, and the outer list is OR-combined. Two independent filter families:

  • tag_filters — applied on container_tags_table (EAV key/value model).
  • metric_filters — applied on container_metrics_table (columnar model).
{
"container_filters": {
"tag_filters": [
[
{ "tag_name": "uut_id", "comparator": "==", "value": "ABC123", "cast_type": "string" }
]
],
"metric_filters": [
[
{ "column_name": "start_dt", "comparator": ">=", "value": "2025-04-27T05:20:54.000Z", "value_type": "timestamp" },
{ "column_name": "stop_dt", "comparator": "<=", "value": "2025-04-27T06:00:00.000Z", "value_type": "timestamp" }
]
]
}
}

TagFilter:

FieldTypeRequiredDescription
tag_namestrYesTag key to filter on.
comparatorstrYesOne of ==, !=, >, >=, <, <=.
valueanyYesExpected value; must match cast_type.
cast_typestrNostring (default), int, double, or timestamp (ISO-format string).

MetricFilter:

FieldTypeRequiredDescription
column_namestrYesColumn on container_metrics_table to filter on (e.g. start_dt, stop_dt, uut_id).
comparatorstrYesOne of ==, !=, >, >=, <, <=.
valueanyYesExpected value.
value_typestrNoWhen provided, validates/converts the value (string, int, double, timestamp).

Measurement Dimensions

A list of dimension columns to include in the measurement_dimension table. Available values:

DimensionDescription
container_idContainer identifier.
uut_idUnit under test identifier.
vehicle_keyVehicle identifier.
file_nameSource measurement file name.
source_file_pathFull path to the source file.
start_tsMeasurement start timestamp.
stop_tsMeasurement stop timestamp.
environmentRecording environment (e.g. PUMA, datalogger).
project_idProject identifier.

Default: ["container_id", "vehicle_key", "start_ts", "stop_ts"]