databricks.labs.dqx.anomaly.transformers
Feature engineering for row anomaly detection.
Provides column type analysis and Spark-native feature transformations. All transformations are applied in Spark (distributed) for scalability and Spark Connect compatibility (no custom Python class serialization).
ColumnTypeInfo Objects
@dataclass
class ColumnTypeInfo()
Information about a column's type and encoding strategy.
category
'numeric', 'categorical', 'datetime', 'boolean', 'unsupported'
encoding_strategy
'onehot', 'frequency', 'cyclical', 'binary', 'none'
SparkFeatureMetadata Objects
@dataclass
class SparkFeatureMetadata()
Metadata for reconstructing Spark transformations during scoring.
Stores everything needed to apply the same transformations:
- Column types and encoding strategies
- Frequency maps for categorical encoding (high cardinality)
- OneHot distinct values (low cardinality)
- Engineered feature names (in order)
column_infos
Serializable version of ColumnTypeInfo
categorical_frequency_maps
col_name -> (value -> frequency)
onehot_categories
col_name -> [distinct_values] for OneHot encoding
engineered_feature_names
Final feature names after engineering
categorical_cardinality_threshold
Threshold used for categorical encoding
to_json
def to_json() -> str
Serialize to JSON for storage.
from_json
@classmethod
def from_json(cls, json_str: str) -> "SparkFeatureMetadata"
Deserialize from JSON.
reconstruct_column_infos
def reconstruct_column_infos(
feature_metadata: SparkFeatureMetadata) -> list[ColumnTypeInfo]
Reconstruct ColumnTypeInfo objects from SparkFeatureMetadata.
Uses category to set spark_type so that any code (e.g. _classify_column or isinstance checks) that runs on reconstructed infos during scoring behaves correctly.
Arguments:
feature_metadata- SparkFeatureMetadata with column information
Returns:
List of ColumnTypeInfo objects
ColumnTypeClassifier Objects
class ColumnTypeClassifier()
Analyzes DataFrame schema and categorizes columns for feature engineering.
Categories:
- numeric: int, long, float, double, decimal
- categorical: string (with reasonable cardinality)
- datetime: date, timestamp, timestampNTZ
- boolean: boolean
- unsupported: array, map, struct, binary, etc.
analyze_columns
def analyze_columns(
df: DataFrame,
columns: list[str]) -> tuple[list[ColumnTypeInfo], list[str]]
Analyze columns and return type information and warnings.
Returns:
Tuple of (column_type_infos, warnings)
apply_feature_engineering
def apply_feature_engineering(
df: DataFrame,
column_infos: list[ColumnTypeInfo],
categorical_cardinality_threshold: int = 20,
frequency_maps: dict[str, dict[str, float]] | None = None,
onehot_categories: dict[str, list[str]] | None = None
) -> tuple[DataFrame, SparkFeatureMetadata]
Apply feature engineering transformations in Spark (distributed).
Returns:
- DataFrame with engineered numeric features
- Metadata for reconstructing transformations during scoring
Transformations applied:
- Categorical: OneHot (low-card) or Frequency encoding (high-card)
- Datetime: Extract hour_sin/cos, dow_sin/cos, month_sin/cos, is_weekend
- Boolean: Map to 0/1
- Numeric: Keep as-is
- Null indicators: Add column_is_null for columns with nulls
- Imputation: Fill nulls with 0 (numeric), "MISSING" (categorical), epoch (datetime), 0 (boolean)
Arguments:
df- Input DataFrame with original columnscolumn_infos- Column type information from ColumnTypeClassifiercategorical_cardinality_threshold- Threshold for OneHot vs Frequency encodingfrequency_maps- Pre-computed frequency maps (for scoring). If None, compute from df (for training).onehot_categories- Pre-computed OneHot distinct values (for scoring). If None, compute from df (for training).