impulse_query_engine.analyze.query.solvers.query_solver
QuerySolver
class QuerySolver(ABC)
Abstract base class for query solvers.
Defines a 6-stage filter pipeline that all solvers must implement: filter_container_tags -> filter_container_metrics -> filter_channel_tags -> filter_channel_metrics -> filter_candidates -> solve.
filter_container_metrics must return a DataFrame that includes all
columns needed for container dimensions and event bounds (e.g.
container_id, start_ts/stop_ts or start_dt/stop_dt),
not only container_id.
filter_container_tags
def filter_container_tags(spark, query) -> DataFrame
Abstract method to filter containers by tags.
Arguments:
spark(SparkSession): Spark session used for query execution.query(QueryBuilder): Query object containing filters and db info.
Raises:
NotImplementedError: If not implemented by subclass.
Returns:
pyspark.sql.DataFrame: DataFrame filtered by container tags.
filter_container_metrics
def filter_container_metrics(spark,
query,
container_df,
pre_filtered_containers_df=None) -> DataFrame
Abstract method to filter containers by metrics.
Arguments:
spark(SparkSession): Spark session used for query execution.query(QueryBuilder): Query object containing filters and db info.container_df(pyspark.sql.DataFrame): DataFrame from filter_container_tags stage.pre_filtered_containers_df(pyspark.sql.DataFrame): Pre-filtered containers for incremental processing. When provided, restricts processing to only these containers.
Raises:
NotImplementedError: If not implemented by subclass.
Returns:
pyspark.sql.DataFrame: DataFrame containing filtered container metrics.
filter_channel_tags
def filter_channel_tags(spark, db: MeasurementDB, container_df,
selectors) -> DataFrame
Stage 3: Filter channels by measurements and tags.
Arguments:
spark(SparkSession): Spark session used for query execution.db(MeasurementDB): Measurement database for table access.container_df(pyspark.sql.DataFrame): DataFrame containing container information.selectors(list[TimeSeriesSelector]): Non-aliased (direct) selectors extracted from the query.
Raises:
NotImplementedError: If not implemented by subclass.
Returns:
pyspark.sql.DataFrame: DataFrame containing filtered channel tags.
filter_channel_metrics
def filter_channel_metrics(spark, db: MeasurementDB, channel_df,
selectors) -> DataFrame
Stage 4: Filter channels by metrics.
Arguments:
spark(SparkSession): Spark session used for query execution.db(MeasurementDB): Measurement database for table access.channel_df(pyspark.sql.DataFrame): DataFrame containing channel information.selectors(list[TimeSeriesSelector]): Non-aliased (direct) selectors extracted from the query.
Raises:
NotImplementedError: If not implemented by subclass.
Returns:
pyspark.sql.DataFrame: DataFrame with (container_id, channel_id, selector_ids)
where selector_ids is an array column.
filter_aliased_channel_metrics
def filter_aliased_channel_metrics(spark, db: MeasurementDB, container_df,
selectors) -> DataFrame
Resolve aliased channel selections via the channel_mapping table.
Arguments:
spark(SparkSession): Spark session used for query execution.db(MeasurementDB): Measurement database for table access.container_df(pyspark.sql.DataFrame): DataFrame containing filtered container IDs.selectors(list[TimeSeriesSelector]): Aliased selectors extracted from the query.
Returns:
pyspark.sql.DataFrame: DataFrame with (container_id, channel_id, selector_ids)
where selector_ids is an array column.
resolve_channel_selections
def resolve_channel_selections(spark, channel_metrics_df,
aliased_channel_metrics_df) -> DataFrame
Union direct and aliased channel metrics, combining selector_ids.
Only called when aliased selectors are present. The default
implementation raises NotImplementedError; solvers that support
aliasing (e.g. KeyValueStoreSolver) must override this.
Arguments:
spark(SparkSession): Spark session used for query execution.channel_metrics_df(pyspark.sql.DataFrame): Direct channel metrics withselector_idsarray column.aliased_channel_metrics_df(pyspark.sql.DataFrame): Aliased channel metrics withselector_idsarray column.
Returns:
pyspark.sql.DataFrame: Merged DataFrame with (container_id, channel_id, selector_ids).
filter_candidates
def filter_candidates(query, channel_df) -> DataFrame
Stage 5: Select best channel candidate.
Arguments:
query(QueryBuilder): Query object containing filters and db info.channel_df(pyspark.sql.DataFrame): DataFrame containing channel information.
Returns:
pyspark.sql.DataFrame: DataFrame containing selected channel candidates.
solve
def solve(query, channels_df, selections, dtypes)
Stage 6: Solve query.
Arguments:
query(QueryBuilder): Query object containing database and filter information.channels_df(pyspark.sql.DataFrame): DataFrame containing channel information.selections(list): List of selection expressions to apply.dtypes(list): List of data types for each selection.
Returns:
pyspark.sql.DataFrame: DataFrame containing results for each container.