Skip to main content

Overview

Streaming inference enables real-time processing of patient data by breaking large datasets into manageable chunks. This approach reduces latency and memory usage compared to batch processing while maintaining prediction accuracy.

Batch vs Streaming Processing

The platform supports two processing modes:
  • Batch processing: Process entire dataset at once, higher memory usage
  • Streaming processing: Process data in chunks, lower latency per row

Comparing Performance

from real_time.streaming import compare_batch_vs_streaming

def process_fn(df):
    # Your processing logic
    return model.predict_proba(df)[:, 1]

metrics = compare_batch_vs_streaming(
    df=patient_data,
    process_fn=process_fn,
    chunk_size=100
)

print(f"Batch time: {metrics['batch_time_s']:.2f}s")
print(f"Stream latency per row: {metrics['stream_latency_ms_per_row']:.2f}ms")
print(f"Throughput: {metrics['stream_throughput_rows_per_s']:.0f} rows/s")

Chunk-Based Processing

The streaming system processes data using a generator pattern that yields chunks of configurable size:
from real_time.streaming import stream_dataframe

# Stream data in chunks of 50 rows
for chunk in stream_dataframe(df, chunk_size=50):
    # Process each chunk independently
    results = model.predict(chunk)
Source: real_time/streaming.py:14-16

Processing with Metrics

Track performance metrics during streaming:
from real_time.streaming import process_stream, StreamMetrics

def risk_assessment(chunk):
    return model.predict_proba(chunk)[:, 1]

results, metrics = process_stream(
    df=patient_data,
    chunk_size=100,
    process_fn=risk_assessment
)

print(f"Latency: {metrics.latency_ms:.2f}ms per row")
print(f"Throughput: {metrics.throughput_rows_per_s:.0f} rows/s")
The StreamMetrics dataclass tracks:
  • latency_ms: Average milliseconds per row processed
  • throughput_rows_per_s: Rows processed per second
Source: real_time/streaming.py:8-11

Real-Time Inference Pipeline

Run streaming inference on patient features:
from real_time.inference import run_streaming_inference
import pandas as pd

# Load your trained model
model = load_model('risk_predictor.pkl')

# Patient features
X = pd.DataFrame({
    'age': [45, 62, 38],
    'blood_pressure': [120, 145, 110],
    'heart_rate': [72, 88, 65]
})

# Run streaming inference
results = run_streaming_inference(
    X=X,
    model=model,
    chunk_size=100
)

print(results)
Output:
   risk_probability  risk_label
0          0.234567           0
1          0.678901           1
2          0.123456           0
Source: real_time/inference.py:8-21

Throughput Optimization

Choosing Chunk Size

Chunk size affects throughput and latency:
  • Small chunks (10-50): Lower latency, more overhead
  • Medium chunks (100-500): Balanced performance
  • Large chunks (1000+): Higher throughput, increased latency

Example Optimization

import time
import pandas as pd
from real_time.streaming import process_stream

def benchmark_chunk_size(df, model, chunk_sizes):
    results = []
    for chunk_size in chunk_sizes:
        start = time.perf_counter()
        _, metrics = process_stream(
            df=df,
            chunk_size=chunk_size,
            process_fn=lambda x: model.predict_proba(x)[:, 1]
        )
        results.append({
            'chunk_size': chunk_size,
            'latency_ms': metrics.latency_ms,
            'throughput': metrics.throughput_rows_per_s
        })
    return pd.DataFrame(results)

# Test different chunk sizes
optimization = benchmark_chunk_size(
    df=patient_data,
    model=risk_model,
    chunk_sizes=[10, 50, 100, 500, 1000]
)

print(optimization)

Implementation Details

The streaming processor:
  1. Splits input DataFrame into chunks using iloc slicing
  2. Processes each chunk through the provided function
  3. Measures elapsed time and calculates per-row latency
  4. Computes throughput as rows per second
  5. Concatenates results maintaining index order
Source: real_time/streaming.py:19-29

Best Practices

  • Start with chunk_size=100 and adjust based on metrics
  • Monitor latency_ms for real-time requirements
  • Use compare_batch_vs_streaming() to validate performance gains
  • Process chunks independently to enable parallel processing
  • Maintain sorted index in output for result consistency

Build docs developers (and LLMs) love