databricks.labs.dqx.anomaly.core
Core ML operations for row anomaly detection.
Contains the fundamental building blocks for training and scoring:
- Feature engineering (prepare_training_features, prepare_engineered_pandas)
- Model training (fit_sklearn_model, fit_isolation_forest)
- Scoring (score_with_model, score_with_ensemble_models)
- Metrics computation (compute_validation_metrics, compute_score_quantiles)
- Baseline statistics (compute_baseline_statistics)
All functions work with distributed Spark DataFrames and sklearn models. MLflow model registration is handled by mlflow_registry.py.
sample_df
def sample_df(df: DataFrame, columns: list[str],
params: AnomalyParams) -> tuple[DataFrame, int, bool]
Sample DataFrame for training.
Arguments:
df- Input DataFramecolumns- Columns to include in sampleparams- Training parameters with sample_fraction and max_rows
Returns:
Tuple of (sampled DataFrame, row count, truncated flag)
train_validation_split
def train_validation_split(
df: DataFrame, params: AnomalyParams) -> tuple[DataFrame, DataFrame]
Split DataFrame into training and validation sets.
prepare_training_features
def prepare_training_features(
train_df: DataFrame, feature_columns: list[str],
params: AnomalyParams) -> tuple[pd.DataFrame, SparkFeatureMetadata]
Prepare training features using Spark-based feature engineering.
Analyzes column types, applies transformations (one-hot encoding, frequency maps, etc.), and collects the result to the driver as a pandas DataFrame.
Returns:
- train_pandas: pandas DataFrame with engineered numeric features
- feature_metadata: Transformation metadata for distributed scoring
fit_sklearn_model
def fit_sklearn_model(
train_pandas: pd.DataFrame,
params: AnomalyParams) -> tuple[Pipeline, dict[str, Any]]
Train sklearn IsolationForest pipeline on pre-engineered pandas DataFrame.
Returns:
- pipeline: sklearn Pipeline (RobustScaler + IsolationForest)
- hyperparams: Model configuration for MLflow tracking
fit_isolation_forest
def fit_isolation_forest(
train_df: DataFrame, feature_columns: list[str], params: AnomalyParams
) -> tuple[Pipeline, dict[str, Any], SparkFeatureMetadata]
Train IsolationForest model with distributed feature engineering.
Feature engineering runs on Spark, then the model trains on the driver.
Returns:
- pipeline: sklearn Pipeline (RobustScaler + IsolationForest)
- hyperparams: Model configuration for MLflow tracking
- feature_metadata: Transformation metadata for distributed scoring
score_with_model
def score_with_model(model: Pipeline, df: DataFrame, feature_cols: list[str],
feature_metadata: SparkFeatureMetadata) -> DataFrame
Score DataFrame using scikit-learn model with distributed pandas UDF.
Feature engineering is applied in Spark before the pandas UDF. This enables distributed inference across the Spark cluster.
score_with_ensemble_models
def score_with_ensemble_models(
models: list[Pipeline], df: DataFrame, feature_cols: list[str],
feature_metadata: SparkFeatureMetadata) -> DataFrame
Score DataFrame using an ensemble of models and return mean scores.
compute_validation_metrics
def compute_validation_metrics(
model: Pipeline, val_df: DataFrame, feature_cols: list[str],
feature_metadata: SparkFeatureMetadata) -> dict[str, float]
Compute validation metrics and distribution statistics.
compute_score_quantiles
def compute_score_quantiles(
model: Pipeline, df: DataFrame, feature_cols: list[str],
feature_metadata: SparkFeatureMetadata) -> dict[str, float]
Compute score quantiles from the training score distribution.
compute_score_quantiles_ensemble
def compute_score_quantiles_ensemble(
models: list[Pipeline], df: DataFrame, feature_cols: list[str],
feature_metadata: SparkFeatureMetadata) -> dict[str, float]
Compute score quantiles using ensemble mean scores.
compute_baseline_statistics
def compute_baseline_statistics(
train_df: DataFrame,
columns: list[str]) -> dict[str, dict[str, float]]
Compute baseline distribution statistics for drift detection.
Arguments:
train_df- Training DataFramecolumns- Feature columns to compute statistics for
Returns:
Dictionary mapping column names to their baseline statistics
aggregate_ensemble_metrics
def aggregate_ensemble_metrics(
all_metrics: list[dict[str, float]]) -> dict[str, float]
Aggregate metrics across ensemble members (mean and std).
Arguments:
all_metrics- List of metric dictionaries, one per ensemble member
Returns:
Dictionary with aggregated metrics (mean and std for each metric)
prepare_engineered_pandas
def prepare_engineered_pandas(
train_df: DataFrame,
feature_metadata: SparkFeatureMetadata) -> pd.DataFrame
Prepare engineered pandas DataFrame from Spark DataFrame.
Applies feature engineering transformations and collects to pandas. Used for MLflow signature inference.
Arguments:
train_df- Training Spark DataFramefeature_metadata- Feature engineering metadata from training
Returns:
Pandas DataFrame with engineered features