Overview
The unified streaming pipeline provides batch and real-time inference orchestration with async queue-based backpressure handling. It simulates production streaming workloads with configurable jitter, queue sizes, and worker pools.Architecture
- Batch Mode: Synchronous inference on full dataset with CSV output
- Stream Mode: Sequential row-by-row inference with JSONL output
- Async Stream Mode: Producer-consumer pattern with asyncio queues
- Producer: Ingests rows with simulated jitter
- Consumers: Worker pool processes queued events
- Backpressure: Queue drops events when full
Command-Line Interface
Options
--mode {batch,stream,async_stream,both}: Inference mode (default:both)--jitter-ms FLOAT: Event arrival jitter in milliseconds (default:5.0)--queue-max INT: Maximum queue size for async mode (default:32)--workers INT: Number of consumer workers for async mode (default:2)
Usage Examples
Run All Modes
artifacts/batch_inference.csvartifacts/stream_inference.jsonlartifacts/stream_inference_async.jsonlartifacts/streaming_metrics.json
Async Stream with Custom Configuration
- 8ms average inter-arrival time
- 64-event queue buffer
- 4 parallel consumer workers
Batch Only
artifacts/batch_inference.csv.
Stream Only
artifacts/stream_inference.jsonl.
Batch Inference
Synchronous batch prediction on full dataset (real_time_pipelines/unified_pipeline.py:22-28):artifacts/batch_inference.csv
Synchronous Streaming
Row-by-row inference with timestamp tracking (real_time_pipelines/unified_pipeline.py:31-42):artifacts/stream_inference.jsonl
Async Stream Simulation
Producer-consumer pattern with queue-based backpressure (real_time_pipelines/unified_pipeline.py:45-108).Producer
Ingests rows with Gaussian jitter:- Mean:
jitter_ms / 1000seconds - Std dev:
jitter_ms / 4000seconds - Drops events when queue is full (tracks
droppedcount)
Consumer Pool
Multiple workers process queue concurrently:Orchestration
artifacts/stream_inference_async.jsonl
Streaming Metrics
Async mode generates metrics report:artifacts/streaming_metrics.json
produced: Total events producedconsumed: Total events processeddropped: Events dropped due to queue fulldrop_rate:dropped / producedqueue_latency_ms_p50: Median queue latencyqueue_latency_ms_p95: 95th percentile queue latencyworkers: Number of consumer workersqueue_max: Maximum queue sizejitter_ms: Inter-arrival jitter
Data Source
The pipeline loads test data from trained artifacts:Operational Considerations
Queue Size (--queue-max)
- Larger queue: Higher backpressure tolerance, more memory usage
- Smaller queue: Lower latency, higher drop rate under load
- Tune based on burst traffic patterns
Worker Count (--workers)
- More workers: Higher throughput until CPU saturation
- Fewer workers: Lower concurrency, higher queue latency
- Monitor CPU usage and latency percentiles
Jitter (--jitter-ms)
- Higher jitter: Simulates bursty traffic, tests backpressure handling
- Lower jitter: Steady traffic, lower drop rate
- Use realistic production inter-arrival distributions
Test Mode
Test mode limits resources for CI environments:TEST_MODE=1: Enable test modeTEST_QUEUE_MAX=8: Max queue sizeTEST_STREAM_WORKERS=1: Max workersTEST_STREAM_ROWS=16: Max rows to process
Limitations
- Static replay: Simulation replays static test rows, not diverse production events
- Application-level metrics: Captures Python-level latency, not kernel/network latency
- In-memory queue: Real production systems may use Kafka, RabbitMQ, or SQS
- Fixed threshold: Uses 0.5 threshold, production may require dynamic thresholds
Performance Tuning
High Throughput
Low Latency
Stress Test
drop_rate and queue_latency_ms_p95 in metrics.
Related
- Prediction API - REST API for predictions
- Monitoring - Drift detection and alerting