Skip to main content

Feature Engineering

The system transforms raw sensor data into feature vectors suitable for anomaly detection. Two parallel feature pipelines exist: legacy 1Hz features (6 dimensions) and batch 100Hz features (16 dimensions).

Legacy Features (1Hz): 6 Dimensions

Feature Column List

# From detector.py:48-62
# Base features (Phase 4)
BASE_FEATURE_COLUMNS = [
    'voltage_rolling_mean_1h',
    'current_spike_count',
    'power_factor_efficiency_score',
    'vibration_intensity_rms',
]

# Derived features (Phase 1 Enhancement)
DERIVED_FEATURE_COLUMNS = [
    'voltage_stability',        # abs(voltage - 230.0)
    'power_vibration_ratio',    # vibration_rms / (power_factor + 0.01)
]

# All features (6 total)
FEATURE_COLUMNS = BASE_FEATURE_COLUMNS + DERIVED_FEATURE_COLUMNS

1. Voltage Rolling Mean (1 Hour)

What it measures: Average voltage over the past hour (smooths out transient spikes).
# From calculator.py:23-58
def calculate_voltage_rolling_mean(
    df: pd.DataFrame,
    evaluation_idx: int,
    window: str = WINDOW_DURATION  # "1h"
) -> Optional[float]:
    # Get window data (past-only, including current point)
    # Approximate 1 hour = 60 points at 1 point/minute
    window_start = max(0, evaluation_idx - 59)  # -59 to include 60 points total
    window_data = df['voltage_v'].iloc[window_start:evaluation_idx + 1]
    
    if len(window_data) < 2:  # Need at least 2 points for meaningful mean
        return None
    
    # Calculate mean using vectorized Pandas
    mean_value = window_data.mean()
    return float(mean_value) if not pd.isna(mean_value) else None
Formula: voltage_rolling_mean_1h=160i=t59tVi\text{voltage\_rolling\_mean\_1h} = \frac{1}{60} \sum_{i=t-59}^{t} V_i Healthy Range: 230V ± 10V (Indian Grid nominal)

2. Current Spike Count

What it measures: Number of current readings exceeding 2σ above the local window mean (not global baseline).
# From calculator.py:61-103
def calculate_current_spike_count(
    df: pd.DataFrame,
    evaluation_idx: int,
    window: str = WINDOW_DURATION,
    sigma_threshold: float = 2.0
) -> Optional[int]:
    # Get window data (past only)
    window_start = max(0, evaluation_idx - 60)
    window_data = df['current_a'].iloc[window_start:evaluation_idx + 1]
    
    if len(window_data) < 3:  # Need at least 3 points for meaningful σ
        return None
    
    # Calculate LOCAL window statistics
    local_mean = window_data.mean()
    local_std = window_data.std()
    
    if pd.isna(local_std) or local_std == 0:
        return 0  # No variation = no spikes
    
    # Count values above threshold
    threshold = local_mean + (sigma_threshold * local_std)
    spike_count = (window_data > threshold).sum()
    return int(spike_count)
Formula: spike_count={Ii:Ii>μlocal+2σlocal}\text{spike\_count} = \left| \{ I_i : I_i > \mu_{\text{local}} + 2\sigma_{\text{local}} \} \right| Healthy Range: 0-2 spikes per window (occasional is normal)

3. Power Factor Efficiency Score

What it measures: Normalized power factor (0-1 scale).
# From calculator.py:106-133
def calculate_power_factor_efficiency_score(
    power_factor: float
) -> Optional[float]:
    if power_factor is None or math.isnan(power_factor):
        return None
    
    # Clamp to valid range
    pf = max(0.0, min(1.0, power_factor))
    
    # Direct mapping: PF is already 0-1, monotonic
    # Score = PF (linear, no transformation needed)
    # This preserves interpretability: score of 0.85 means PF was 0.85
    return round(pf, 4)
Formula: efficiency_score=clamp(PF,0.0,1.0)\text{efficiency\_score} = \text{clamp}(\text{PF}, 0.0, 1.0) Healthy Range: 0.85 - 0.95 (industrial motors)

4. Vibration Intensity RMS

What it measures: Root Mean Square of vibration over the past hour (energy-based metric).
# From calculator.py:136-173
def calculate_vibration_rms(
    df: pd.DataFrame,
    evaluation_idx: int,
    window: str = WINDOW_DURATION
) -> Optional[float]:
    # Get window data (past only)
    window_start = max(0, evaluation_idx - 60)
    window_data = df['vibration_g'].iloc[window_start:evaluation_idx + 1]
    
    if len(window_data) < 2:
        return None
    
    # Calculate RMS using vectorized operations
    squared = window_data ** 2
    mean_squared = squared.mean()
    
    if pd.isna(mean_squared):
        return None
    
    rms = np.sqrt(mean_squared)
    return round(float(rms), 6)
Formula: RMS=1ni=1nvi2\text{RMS} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} v_i^2} Healthy Range: 0.10 - 0.20g (depends on asset type)

5. Voltage Stability

What it measures: Absolute deviation from Indian Grid nominal voltage (230V).
# From detector.py:65
NOMINAL_VOLTAGE = 230.0

# From detector.py:148-152
if 'voltage_rolling_mean_1h' in result.columns:
    result['voltage_stability'] = abs(result['voltage_rolling_mean_1h'] - NOMINAL_VOLTAGE)
else:
    result['voltage_stability'] = 0.0
Formula: voltage_stability=Vrolling_mean230.0\text{voltage\_stability} = |V_{\text{rolling\_mean}} - 230.0| Healthy Range: < 10V (±4.3%)

6. Power Vibration Ratio

What it measures: Interaction term capturing relationship between vibration and power factor.
# From detector.py:154-161
if 'vibration_intensity_rms' in result.columns and 'power_factor_efficiency_score' in result.columns:
    result['power_vibration_ratio'] = (
        result['vibration_intensity_rms'] / 
        (result['power_factor_efficiency_score'] + 0.01)  # Epsilon prevents division by zero
    )
else:
    result['power_vibration_ratio'] = 0.0
Formula: power_vibration_ratio=vibration_rmsPF_score+0.01\text{power\_vibration\_ratio} = \frac{\text{vibration\_rms}}{\text{PF\_score} + 0.01} Healthy Range: 0.15 - 0.25 (depends on asset)
High vibration combined with low power factor often indicates mechanical misalignment or bearing wear. This interaction term helps the model detect such patterns.

Batch Features (100Hz): 16 Dimensions

Feature Column List

# From batch_features.py:30-48
SIGNAL_COLUMNS = ["voltage_v", "current_a", "power_factor", "vibration_g"]
STAT_NAMES = ["mean", "std", "peak_to_peak", "rms"]

def get_batch_feature_names() -> List[str]:
    names = []
    for signal in SIGNAL_COLUMNS:
        for stat in STAT_NAMES:
            names.append(f"{signal}_{stat}")
    return names

BATCH_FEATURE_NAMES: List[str] = get_batch_feature_names()
# [
#   'voltage_v_mean', 'voltage_v_std', 'voltage_v_peak_to_peak', 'voltage_v_rms',
#   'current_a_mean', 'current_a_std', 'current_a_peak_to_peak', 'current_a_rms',
#   'power_factor_mean', 'power_factor_std', 'power_factor_peak_to_peak', 'power_factor_rms',
#   'vibration_g_mean', 'vibration_g_std', 'vibration_g_peak_to_peak', 'vibration_g_rms'
# ]
BATCH_FEATURE_COUNT: int = 16

Extraction Logic (Vectorized NumPy)

# From batch_features.py:51-95
def extract_batch_features(raw_points: List[Dict[str, Any]]) -> Optional[Dict[str, float]]:
    """
    Extract a 16-dimensional feature vector from a 1-second batch of raw points.
    
    Args:
        raw_points: List of 50-200 raw sensor dicts, each containing
                    voltage_v, current_a, power_factor, vibration_g.
    
    Returns:
        Dict mapping feature name → float value, or None if batch too small.
    """
    if not raw_points or len(raw_points) < 10:
        return None
    
    import numpy as np
    features: Dict[str, float] = {}
    
    for signal in SIGNAL_COLUMNS:
        # Extract signal values as NumPy array (vectorized)
        values = np.array(
            [p.get(signal, 0.0) for p in raw_points],
            dtype=np.float64,
        )
        
        # Mean
        mean_val = float(np.mean(values))
        features[f"{signal}_mean"] = mean_val
        
        # Standard Deviation (ddof=0 for population std, consistent with training)
        std_val = float(np.std(values, ddof=0))
        features[f"{signal}_std"] = std_val
        
        # Peak-to-Peak (Max - Min)
        p2p_val = float(np.max(values) - np.min(values))
        features[f"{signal}_peak_to_peak"] = p2p_val
        
        # RMS (Root Mean Square)
        rms_val = float(np.sqrt(np.mean(values ** 2)))
        features[f"{signal}_rms"] = rms_val
    
    return features

Statistical Features (4 per Signal)

Mean

μ=1ni=1nxi\mu = \frac{1}{n} \sum_{i=1}^{n} x_i What it captures: Average value over the 1-second window (same as 1Hz downsampling).

Standard Deviation

σ=1ni=1n(xiμ)2\sigma = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (x_i - \mu)^2} What it captures: Variance/noise within the window. Critical for jitter detection.
Jitter Detection Key:A jitter fault has normal mean but high standard deviation. This is why the batch model (which includes std as a feature) achieves 99.6% F1, while the legacy model (which only sees the mean) achieves 78.1% F1.

Peak-to-Peak

P2P=max(x)min(x)\text{P2P} = \max(x) - \min(x) What it captures: Transient spikes/dips within the window. Sensitive to single-point anomalies.

RMS (Root Mean Square)

RMS=1ni=1nxi2\text{RMS} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} x_i^2} What it captures: Energy content (especially important for vibration analysis).

Complete 16-Feature Example

Healthy 1-second window (100 points):
{
  # Voltage (V)
  'voltage_v_mean': 230.2,
  'voltage_v_std': 0.5,
  'voltage_v_peak_to_peak': 2.1,
  'voltage_v_rms': 230.3,
  
  # Current (A)
  'current_a_mean': 12.3,
  'current_a_std': 0.15,
  'current_a_peak_to_peak': 0.6,
  'current_a_rms': 12.31,
  
  # Power Factor
  'power_factor_mean': 0.92,
  'power_factor_std': 0.01,
  'power_factor_peak_to_peak': 0.04,
  'power_factor_rms': 0.92,
  
  # Vibration (g)
  'vibration_g_mean': 0.15,
  'vibration_g_std': 0.02,        # Low variance = healthy
  'vibration_g_peak_to_peak': 0.08,
  'vibration_g_rms': 0.151
}
Jitter fault window:
{
  # ... (voltage/current unchanged)
  
  # Vibration (g) — Jitter fault
  'vibration_g_mean': 0.15,       # Mean still normal!
  'vibration_g_std': 0.17,        # ← 8.5× higher (ANOMALY)
  'vibration_g_peak_to_peak': 0.55, # ← Large transients
  'vibration_g_rms': 0.22         # ← Higher energy
}
The batch model flags this as anomalous due to the abnormal std and peak_to_peak values.

Feature Scaling

Both pipelines use StandardScaler to normalize features before training:
# From detector.py:226
self._scaler = StandardScaler()
features_scaled = self._scaler.fit_transform(feature_matrix)
Why?
  • Features have different units (V vs A vs g)
  • StandardScaler ensures all features contribute equally to anomaly scoring
  • Formula: z=xμσz = \frac{x - \mu}{\sigma}
The scaler is fitted on healthy data only during calibration, then frozen. This ensures that “normal” is always defined by the original baseline, not drifting data.

Cold-Start Handling

Both pipelines return None (NaN) for incomplete windows:
# From calculator.py:53
if len(window_data) < 2:  # Need at least 2 points for meaningful mean
    return None

# From batch_features.py:66
if not raw_points or len(raw_points) < 10:
    return None
This prevents false zeros and allows the system to distinguish between “no data yet” and “zero value”.
From ENGINEERING_LOG.md Phase 4:
NaN for Incomplete Windows: During the first hour of operation, there isn’t enough data for a 1-hour rolling mean. Instead of returning 0 (which would be falsely reassuring), we return None. This propagates as NaN in the feature vector and is explicitly handled downstream.
False zeros would cause the ML model to see abnormally low feature values during startup, triggering false alarms.

Feature Computation Workflow

Legacy Pipeline (1Hz)

# From calculator.py:176-199
def compute_all_features(
    df: pd.DataFrame,
    evaluation_idx: int,
    current_power_factor: float
) -> dict:
    """Compute all contract-mandated features for a single evaluation point."""
    return {
        "voltage_rolling_mean_1h": calculate_voltage_rolling_mean(df, evaluation_idx),
        "current_spike_count": calculate_current_spike_count(df, evaluation_idx),
        "power_factor_efficiency_score": calculate_power_factor_efficiency_score(current_power_factor),
        "vibration_intensity_rms": calculate_vibration_rms(df, evaluation_idx),
    }
Derived features (voltage_stability, power_vibration_ratio) are computed inside the detector:
# From detector.py:132-163
def _compute_derived_features(self, data):
    result = data.copy()
    result['voltage_stability'] = abs(result['voltage_rolling_mean_1h'] - NOMINAL_VOLTAGE)
    result['power_vibration_ratio'] = (
        result['vibration_intensity_rms'] / 
        (result['power_factor_efficiency_score'] + 0.01)
    )
    return result

Batch Pipeline (100Hz)

# From batch_features.py:113-139
def extract_multi_window_features(
    raw_points: List[Dict[str, Any]],
    window_size: int = 100,
) -> List[Dict[str, float]]:
    """
    Slice a long stream of raw points into non-overlapping 1-second windows
    and extract batch features for each window.
    """
    results: List[Dict[str, float]] = []
    n = len(raw_points)
    
    for start in range(0, n - window_size + 1, window_size):
        window = raw_points[start : start + window_size]
        feat = extract_batch_features(window)
        if feat is not None:
            results.append(feat)
    
    return results
Used during calibration to convert historical 100Hz data into training feature rows.

Next Steps

Build docs developers (and LLMs) love