impulse_reporting.config.config_parser
is_valid_table_name
def is_valid_table_name(table_name: str) -> str
Validate if a string is a valid Unity Catalog table name.
Arguments:
table_name(str): The table name to validate. Should be in format 'catalog.schema.table'.
Raises:
ValueError: If the table name does not match the required format or contains invalid characters.
Returns:
str: The validated table name if valid.
is_valid_unity_entity_name
def is_valid_unity_entity_name(entity_name: str) -> str
Validate if a string is a valid Unity Catalog entity name.
Arguments:
entity_name(str): The entity name to validate (catalog, schema, or table prefix).
Raises:
ValueError: If the entity name contains invalid characters.
Returns:
str: The validated entity name if valid.
Solvers
class Solvers(Enum)
Enumeration of available solver types for the query engine.
DEFAULT_SOLVER is the single, unified solver. DELTA_SOLVER and
KEY_VALUE_STORE_SOLVER are deprecated aliases kept so that existing
report configs continue to deserialize; both now resolve to the same
DefaultSolver. They will be removed in a future release.
Arguments:
DEFAULT_SOLVER(str): NoneDELTA_SOLVER(str): Deprecated alias forDEFAULT_SOLVER.KEY_VALUE_STORE_SOLVER(str): Deprecated alias forDEFAULT_SOLVER.
Source
class Source(BaseModel)
Configuration for data source tables in Unity Catalog.
Arguments:
container_tags_table(str): Full Unity Catalog path to the container tags table (narrow/EAV format). Required when filtering by container tags. Omit for wide-only data models that carry container attributes as columns oncontainer_metrics.project_idscoping is independent of this field — it works in both narrow EAV and wide-only data models because it is applied tocontainer_metrics(andchannel_mappingif configured) regardless of whethercontainer_tags_tableis set.container_metrics_table(str): Full Unity Catalog path to the container metrics table.channel_metrics_table(str): Full Unity Catalog path to the channel metrics table.channels_uri(str): Full Unity Catalog path to the channels data table.channel_mapping_table(str): Full Unity Catalog path to the channel mapping table. Required when usingchannel_with_alias()for logical alias resolution.unit_conversion_table(str): Full Unity Catalog path to the unit conversion table. When set together with achannel_mapping_tablewhose rows carrysource_unitandtarget_unitcolumns, the query engine converts time-series values from the source to the target unit duringsolve().
UnitySink
class UnitySink(BaseModel)
Configuration for data sink location in Unity Catalog.
Arguments:
catalog(str): Target catalog name for output tables.schema(str): Target schema name for output tables.table_prefix(str): Prefix to use for generated output table names.
Comparator
class Comparator(str, Enum)
Supported comparison operators for container filters.
CastType
class CastType(str, Enum)
Supported Spark cast types for tag value columns.
TagFilter
class TagFilter(BaseModel)
A single tag-based filter applied on the container_tags_table (EAV).
Arguments:
tag_name(str): The tag key / element_id to filter on.comparator(Comparator): The comparison operator.value(str | int | float | datetime): The expected value. Must match the cast_type: str for STRING, int for INT, int|float for DOUBLE, ISO-format string for TIMESTAMP (automatically parsed to datetime).cast_type(CastType): Spark type to cast the tag value to before comparison.
MetricFilter
class MetricFilter(BaseModel)
A single metric-based filter applied on the container_metrics_table.
Arguments:
column_name(str): The metric column to filter on.comparator(Comparator): The comparison operator.value(str | int | float | datetime): The expected value. When value_type is provided, must match accordingly.value_type(CastType): When provided, validates and/or converts the value to the expected type.
ContainerFilters
class ContainerFilters(BaseModel)
Container-level filters in disjunctive normal form (OR of ANDs).
Each outer list element is a group of filters that are AND-combined. The resulting group expressions are then OR-combined.
Arguments:
tag_filters(list[list[TagFilter]]): Tag-based filter groups (applied on container_tags_table).metric_filters(list[list[MetricFilter]]): Metric-based filter groups (applied on container_metrics_table).
QueryEngine
class QueryEngine(BaseModel)
Configuration for the query engine solver.
Arguments:
-
solver(Solvers, default=Solvers.DEFAULT_SOLVER): The solver type to use for query execution. -
solver_config(SolverConfig): Per-table column name mappings and filter configuration for the solver. Use this when your silver-layer tables use non-default column names or when you need project/toolbox scoping. Key sub-fields: -
project_id(str): Top-level project filter value applied to container_tags, container_metrics, and channel_mapping tables when the corresponding columns exist after column renaming. -
Per-table sections (
container_tags,container_metrics,channel_mapping,channels, etc.) each withcolumn_name_mappingandfiltersdicts.
When omitted, all default column names are used and no project/toolbox filtering is applied.
validate_drop_implausible_data_requires_raw
def validate_drop_implausible_data_requires_raw()
drop_implausible_data=True currently only takes effect with RAW data.
The filter is applied inside the RAW -> RLE conversion path in
IntervalEncoder.prepare_channels_df. RLE input short-circuits that
path and the flag is silently ignored, so we reject the combination at
config validation time.
IncrementalConfig
class IncrementalConfig(BaseModel)
Configuration for incremental processing behavior.
Arguments:
enabled(bool, default=False): Whether incremental processing is enabled.silver_last_modified_column(str, default="timestamp"): Column name in the silver layer used for freshness comparison.gold_last_modified_column(str, default="last_modified"): Column name in the gold layer used for freshness comparison.
ImpulseConfig
class ImpulseConfig(BaseModel)
Main configuration model.
Attributes
source : Source
Configuration for input data sources.
unity_sink : UnitySink
Configuration for output data location.
container_filters : ContainerFilters, optional
Optional container-level filters (tag-based and/or metric-based).
query_engine : QueryEngine, optional
Optional query engine configuration. Defaults to Solvers.DEFAULT_SOLVER.
incremental : IncrementalConfig, optional
Optional incremental processing configuration. Defaults to IncrementalConfig().
measurement_dimensions : list of str, optional
Column names to surface from container_metrics into the
gold-layer measurement_dimension table. Names are matched
after query_engine.solver_config.container_metrics.column_name_mapping
has been applied — i.e. these are the internal (post-mapping)
column names, not the physical silver column names. If a silver
table uses a physical name like my_measurement_id mapped to
container_id, list "container_id" here. Each listed name
lands in the gold table verbatim, so the configured name is also
the gold column name. Defaults to
["container_id", "start_ts", "stop_ts"]. The framework does not
inject any column the user omits — keeping container_id in the
list is recommended because it is the upsert key for incremental
processing and the join key to event-fact tables, but the choice
is the user's.
Examples
config_data = { ... "source": { ... "container_metrics_table": "impulse_demo.silver.container_metric", ... "channel_metrics_table": "impulse_demo.silver.channel_metric", ... "channels_uri": "impulse_demo.silver.channel_data", ... "channel_mapping_table": "impulse_demo.data_model.channel_mapping" ... }, ... "unity_sink": { ... "catalog": "impulse_demo", ... "schema": "silver_refactored", ... "table_prefix": "evaluation" ... }, ... "container_filters": { ... "tag_filters": [ ... [ ... {"tag_name": "uut_id", "comparator": "==", "value": "AA080518", "cast_type": "string"} ... ] ... ], ... "metric_filters": [ ... [ ... {"column_name": "uut_id", "comparator": "==", "value": "AA080518"}, ... {"column_name": "start_ts", "comparator": ">=", "value": "2025-04-27T05:20:54.000Z"} ... ] ... ] ... }, ... "query_engine": { ... "solver": "DefaultSolver", ... "solver_config": { ... "project_id": "my_project", ... "container_tags": { ... "column_name_mapping": {"entity_id": "container_id"}, ... "filters": {"parent_id": "my_parent_id"} ... }, ... "container_metrics": { ... "column_name_mapping": {} ... }, ... "channel_metrics": { ... "column_name_mapping": {} ... }, ... "channel_mapping": { ... "column_name_mapping": {}, ... "filters": {"toolbox_id": "my_toolbox"} ... }, ... "channels": { ... "column_name_mapping": {} ... } ... } ... } ... } config = ImpulseConfig.model_validate(config_data)