Skip to main content

Overview

The monitoring module creates comprehensive summaries of model prediction streams, tracking alert rates, risk distributions, performance metrics, and potential data drift. It’s designed for real-time monitoring of production ML systems.

Function Signature

def build_monitoring_summary(
    alert_flags: pd.Series,
    risk_probabilities: pd.Series,
    stream_latency_ms_per_row: float,
) -> dict[str, float | int | str | None]:
    """
    Build a monitoring summary from prediction stream data.
    
    Args:
        alert_flags: Boolean series indicating which samples triggered alerts
        risk_probabilities: Predicted risk probabilities for each sample
        stream_latency_ms_per_row: Average inference latency per sample
    
    Returns:
        Dictionary with monitoring metrics including timestamp, counts,
        rates, drift estimates, and performance data
    """
Source: deployment/monitoring.py:7

Usage Example

import pandas as pd
from deployment.monitoring import build_monitoring_summary

# Generate predictions on streaming data
risk_probs = pd.Series([0.23, 0.89, 0.45, 0.92, 0.15, 0.78])

# Define alert threshold (e.g., 75% risk)
alert_threshold = 0.75
alert_flags = risk_probs > alert_threshold

# Calculate per-row latency
total_latency_ms = 42.5
stream_latency_ms_per_row = total_latency_ms / len(risk_probs)

# Build monitoring summary
summary = build_monitoring_summary(
    alert_flags=alert_flags,
    risk_probabilities=risk_probs,
    stream_latency_ms_per_row=stream_latency_ms_per_row
)

print(summary)

Output Structure

The function returns a dictionary with the following keys:

timestamp_utc

Type: str ISO 8601 formatted UTC timestamp indicating when the summary was generated.
"2026-03-04T14:32:15.123456+00:00"

samples_observed

Type: int Total number of samples in the prediction batch.

alert_count

Type: int Number of samples that triggered alerts (where alert_flags == True).

alert_rate

Type: float Proportion of samples triggering alerts: alert_count / samples_observed. Useful for:
  • Detecting unusual spikes in high-risk predictions
  • Triggering operational alerts when rate exceeds thresholds
  • Capacity planning for clinical review workflows

mean_risk_probability

Type: float Average predicted risk probability across all samples.

risk_probability_std

Type: float Standard deviation of risk probabilities (population std, ddof=0). High variance indicates diverse risk profiles in the batch.

estimated_data_drift

Type: float Simple drift indicator computed as abs(mean_risk_probability - 0.5).
  • Near 0.0: Predictions centered around 50% (balanced)
  • Near 0.5: Predictions skewed toward 0% or 100% (potential drift)
Note: This is a heuristic. For production systems, implement dedicated drift detection (e.g., KL divergence, PSI).

stream_latency_ms_per_row

Type: float Average inference latency per sample in milliseconds.

first_alert_index

Type: int | None Index of the first sample that triggered an alert. None if no alerts occurred. Useful for:
  • Identifying when in a batch alerts began
  • Time-series analysis of alert patterns
  • Quick access to first high-risk case

Example Output

{
    'timestamp_utc': '2026-03-04T14:32:15.123456+00:00',
    'samples_observed': 6,
    'alert_count': 3,
    'alert_rate': 0.5,
    'mean_risk_probability': 0.57,
    'risk_probability_std': 0.312,
    'estimated_data_drift': 0.07,
    'stream_latency_ms_per_row': 7.083,
    'first_alert_index': 1
}

Implementation Details

Source code from deployment/monitoring.py:7-26:
def build_monitoring_summary(
    alert_flags: pd.Series,
    risk_probabilities: pd.Series,
    stream_latency_ms_per_row: float,
) -> dict[str, float | int | str | None]:
    alert_count = int(alert_flags.sum())
    total = len(alert_flags)
    first_alert_index = int(alert_flags.idxmax()) if alert_count > 0 else None

    return {
        "timestamp_utc": datetime.now(timezone.utc).isoformat(),
        "samples_observed": total,
        "alert_count": alert_count,
        "alert_rate": float(alert_count / max(total, 1)),
        "mean_risk_probability": float(risk_probabilities.mean()),
        "risk_probability_std": float(risk_probabilities.std(ddof=0)),
        "estimated_data_drift": float(abs(risk_probabilities.mean() - 0.5)),
        "stream_latency_ms_per_row": float(stream_latency_ms_per_row),
        "first_alert_index": first_alert_index,
    }

Integration with CPU Inference

Combine with CPU inference metrics:
from deployment.cpu_inference import run_cpu_inference
from deployment.monitoring import build_monitoring_summary

# Run inference
inference_results = run_cpu_inference(model, X_batch)

# Extract probabilities from model
risk_probs = pd.Series(model.predict_proba(X_batch)[:, 1])
alert_flags = risk_probs > 0.75

# Calculate per-row latency
latency_per_row = inference_results['inference_latency_ms'] / len(X_batch)

# Generate monitoring summary
summary = build_monitoring_summary(
    alert_flags=alert_flags,
    risk_probabilities=risk_probs,
    stream_latency_ms_per_row=latency_per_row
)

Logging and Persistence

Store monitoring summaries for time-series analysis:
import json
from pathlib import Path

# Generate summary
summary = build_monitoring_summary(alert_flags, risk_probs, latency)

# Append to JSONL log
log_path = Path('logs/monitoring.jsonl')
with log_path.open('a') as f:
    f.write(json.dumps(summary) + '\n')

Alert Thresholds

Define alert logic based on your clinical workflow:
# Simple threshold
alert_flags = risk_probs > 0.75

# Composite rule
high_risk = risk_probs > 0.80
moderate_risk = (risk_probs > 0.60) & (risk_probs <= 0.80)
alert_flags = high_risk | (moderate_risk & other_condition)

Drift Detection

The estimated_data_drift metric is a simple heuristic. For production:
# Track drift over time
if summary['estimated_data_drift'] > 0.15:
    print("⚠ Potential distribution shift detected")
    # Trigger model retraining pipeline
    # Log for manual review

Performance Monitoring

Track latency trends:
if summary['stream_latency_ms_per_row'] > 10.0:
    print("⚠ Latency exceeds SLA (10ms/row)")
    # Scale infrastructure
    # Investigate bottlenecks

First Alert Analysis

if summary['first_alert_index'] is not None:
    first_alert_case = X_batch.iloc[summary['first_alert_index']]
    print(f"First alert at index {summary['first_alert_index']}")
    print(f"Case features: {first_alert_case.to_dict()}")

Build docs developers (and LLMs) love