Skip to main content

Overview

Latency optimization ensures the early warning system responds quickly to critical patient conditions. The platform automatically adjusts batch sizes based on hardware constraints to minimize detection latency while maximizing throughput.

Measuring Latency

Per-Row Latency

Track processing time per data row:
from real_time.streaming import process_stream
import pandas as pd

def process_patient_data(chunk):
    return model.predict_proba(chunk)[:, 1]

results, metrics = process_stream(
    df=patient_data,
    chunk_size=100,
    process_fn=process_patient_data
)

print(f"Per-row latency: {metrics.latency_ms:.3f}ms")
print(f"Total throughput: {metrics.throughput_rows_per_s:.1f} rows/s")
The latency calculation:
latency_ms = (elapsed_seconds / total_rows) * 1000
Source: real_time/streaming.py:27

Detection Latency

Measure time from event occurrence to alert:
from anomaly_detection.early_warning import evaluate_detection_latency
import pandas as pd

# Real patient events (1 = critical condition)
events = pd.Series([0, 0, 0, 1, 1, 0])

# Model anomaly scores
scores = pd.Series([0.1, 0.3, 0.5, 0.8, 0.9, 0.4])

# Data timestamps
timestamps = pd.date_range('2025-01-01 08:00', periods=6, freq='1min')

# Calculate detection latency
latency_s = evaluate_detection_latency(
    scores=scores,
    ground_truth_events=events,
    timestamps=timestamps
)

print(f"Detection latency: {latency_s}s")
Returns:
  • Time in seconds from first event to first alert
  • inf if no alerts after event
  • nan if no events in ground truth
Source: anomaly_detection/early_warning.py:17-26

Hardware-Aware Optimization

Auto-Adjusting Batch Size

The system automatically adjusts batch size based on hardware constraints:
from utils.hardware import HardwareProfile, auto_adjust_batch_size

# Define hardware constraints
profile = HardwareProfile(
    memory_limit_mb=512,
    compute_budget=2000
)

# Automatically determine optimal batch size
batch_size = auto_adjust_batch_size(
    initial_batch=64,
    feature_count=20,
    profile=profile
)

print(f"Optimized batch size: {batch_size}")
Source: evaluation/early_warning_experiment.py:30

Hardware Profiles

Define resource constraints:
from dataclasses import dataclass

@dataclass
class HardwareProfile:
    memory_limit_mb: int     # Available RAM in MB
    compute_budget: int      # Computation budget (ops)
The system uses these constraints to:
  • Adjust batch sizes dynamically
  • Prevent memory overflow
  • Balance latency vs throughput
  • Maximize resource utilization

Latency Under Constraints

Running Constrained Experiments

from evaluation.early_warning_experiment import (
    ConstraintScenario,
    run_hardware_constrained_early_warning_experiment
)
from pathlib import Path

# Test multiple constraint scenarios
scenarios = [
    # Low resources, high latency
    ConstraintScenario(
        memory_limit_mb=256,
        compute_budget=500,
        stream_interval_ms=200
    ),
    # Medium resources
    ConstraintScenario(
        memory_limit_mb=512,
        compute_budget=2000,
        stream_interval_ms=100
    ),
    # High resources, low latency
    ConstraintScenario(
        memory_limit_mb=2048,
        compute_budget=10000,
        stream_interval_ms=50
    ),
]

results, artifacts = run_hardware_constrained_early_warning_experiment(
    df=patient_data,
    feature_cols=['heart_rate', 'blood_pressure', 'temperature'],
    target_col='diagnosis',
    scenarios=scenarios,
    output_dir=Path('output/latency_optimization')
)

# Analyze latency results
print(results[[
    'stream_interval_ms',
    'effective_batch_size',
    'detection_latency_s',
    'compute_utilization'
]].sort_values('detection_latency_s'))
Source: evaluation/early_warning_experiment.py:23-61

Understanding Results

Key metrics from constraint experiments:
# Lower latency with more resources
print(f"Detection latency: {results['detection_latency_s'].mean():.2f}s")

# Batch size adjusted to constraints
print(f"Effective batch size: {results['effective_batch_size'].mean():.1f}")

# Resource efficiency
print(f"Compute utilization: {results['compute_utilization'].mean():.2%}")

# Quality vs speed tradeoff
print(f"Detection quality: {results['detection_quality'].mean():.3f}")

Latency vs Accuracy Tradeoff

The experiment generates visualizations:
import matplotlib.pyplot as plt

# Latency vs Accuracy scatter plot
plt.scatter(
    results['detection_latency_s'],
    results['prediction_accuracy'],
    c=results['compute_budget'],
    cmap='viridis'
)
plt.xlabel('Detection latency (s)')
plt.ylabel('Prediction accuracy')
plt.colorbar(label='Compute budget')
plt.title('Latency vs Accuracy under Hardware Constraints')
plt.show()
Source: evaluation/early_warning_experiment.py:79-88

Resource Score Analysis

Calculate resource score to understand performance:
# Resource score formula
resource_score = (
    results['memory_limit_mb'] * 
    results['compute_budget'] / 
    results['stream_interval_ms']
)

# Plot resource vs quality
plt.scatter(
    resource_score,
    results['detection_quality'],
    c=results['false_positive_rate'],
    cmap='magma_r'
)
plt.xlabel('Resource score (memory*compute/stream_interval)')
plt.ylabel('Detection quality (accuracy - 0.5*FPR)')
plt.colorbar(label='False positive rate')
plt.show()
Source: evaluation/early_warning_experiment.py:91-99

First Alert Latency

Track time to first critical alert:
from anomaly_detection.early_warning import simulate_early_warning
import pandas as pd

scores = pd.Series([0.2, 0.4, 0.5, 0.85, 0.9])
timestamps = pd.date_range('2025-01-01 00:00', periods=5, freq='30s')

alert_info = simulate_early_warning(
    scores=scores,
    timestamps=timestamps,
    threshold=0.8
)

if alert_info['first_alert_latency_s'] != float('inf'):
    print(f"First alert after {alert_info['first_alert_latency_s']}s")
else:
    print("No alerts triggered")
Source: anomaly_detection/early_warning.py:7-14

Optimization Strategies

1. Tune Chunk Size

# Small chunks = lower latency, more overhead
quick_results, quick_metrics = process_stream(
    df=patient_data,
    chunk_size=10,  # Low latency
    process_fn=process_fn
)

# Large chunks = higher throughput, more latency
fast_results, fast_metrics = process_stream(
    df=patient_data,
    chunk_size=500,  # High throughput
    process_fn=process_fn
)

print(f"Quick latency: {quick_metrics.latency_ms:.2f}ms")
print(f"Fast throughput: {fast_metrics.throughput_rows_per_s:.0f} rows/s")

2. Hardware-Aware Processing

from utils.hardware import compute_utilization, HardwareProfile

# Calculate resource utilization
profile = HardwareProfile(memory_limit_mb=1024, compute_budget=5000)
operations = len(patient_data) * len(feature_cols)
utilization = compute_utilization(operations=operations, profile=profile)

print(f"Resource utilization: {utilization:.1%}")
Source: evaluation/early_warning_experiment.py:46

3. Stream Interval Tuning

Adjust how frequently new data arrives:
  • Low interval (10-50ms): Real-time monitoring, high load
  • Medium interval (100-200ms): Balanced performance
  • High interval (500ms+): Reduced load, acceptable for non-critical monitoring

Best Practices

  • Start with chunk_size=100 and adjust based on latency_ms metrics
  • Use auto_adjust_batch_size() for hardware-constrained environments
  • Monitor detection_latency_s for critical conditions (target <5s)
  • Balance detection_quality with compute_utilization
  • Test under realistic constraints before deployment
  • Use streaming for latency-critical applications
  • Plot latency vs accuracy to find optimal operating point
  • Track false positive rate to avoid alert fatigue
  • Set stream_interval based on data arrival rate

Build docs developers (and LLMs) love