Skip to main content

Overview

The unified streaming pipeline provides batch and real-time inference orchestration with async queue-based backpressure handling. It simulates production streaming workloads with configurable jitter, queue sizes, and worker pools.

Architecture

  • Batch Mode: Synchronous inference on full dataset with CSV output
  • Stream Mode: Sequential row-by-row inference with JSONL output
  • Async Stream Mode: Producer-consumer pattern with asyncio queues
    • Producer: Ingests rows with simulated jitter
    • Consumers: Worker pool processes queued events
    • Backpressure: Queue drops events when full

Command-Line Interface

python real_time_pipelines/unified_pipeline.py [OPTIONS]

Options

  • --mode {batch,stream,async_stream,both}: Inference mode (default: both)
  • --jitter-ms FLOAT: Event arrival jitter in milliseconds (default: 5.0)
  • --queue-max INT: Maximum queue size for async mode (default: 32)
  • --workers INT: Number of consumer workers for async mode (default: 2)

Usage Examples

Run All Modes

python real_time_pipelines/unified_pipeline.py --mode both
Generates:
  • artifacts/batch_inference.csv
  • artifacts/stream_inference.jsonl
  • artifacts/stream_inference_async.jsonl
  • artifacts/streaming_metrics.json

Async Stream with Custom Configuration

python real_time_pipelines/unified_pipeline.py \
  --mode async_stream \
  --jitter-ms 8 \
  --queue-max 64 \
  --workers 4
Simulates bursty traffic with:
  • 8ms average inter-arrival time
  • 64-event queue buffer
  • 4 parallel consumer workers

Batch Only

python real_time_pipelines/unified_pipeline.py --mode batch
Generates only artifacts/batch_inference.csv.

Stream Only

python real_time_pipelines/unified_pipeline.py --mode stream
Generates only artifacts/stream_inference.jsonl.

Batch Inference

Synchronous batch prediction on full dataset (real_time_pipelines/unified_pipeline.py:22-28):
def batch_inference(model, X: pd.DataFrame, out_path: Path) -> None:
    probs = model.predict_proba(X)[:, 1]
    out = X.copy()
    out["score"] = probs
    out["prediction"] = (probs >= 0.5).astype(int)
    out.to_csv(out_path, index=False)
Output: artifacts/batch_inference.csv
student_country,days_on_platform,minutes_watched,...,score,prediction
US,45,1200.5,...,0.7234,1
CA,30,800.0,...,0.4512,0

Synchronous Streaming

Row-by-row inference with timestamp tracking (real_time_pipelines/unified_pipeline.py:31-42):
def stream_inference(model, X: pd.DataFrame, out_path: Path) -> None:
    with out_path.open("w", encoding="utf-8") as f:
        for _, row in X.iterrows():
            x1 = row.to_frame().T
            score = float(model.predict_proba(x1)[:, 1][0])
            record = {
                "timestamp": time.time(),
                "score": score,
                "prediction": int(score >= 0.5),
            }
            f.write(json.dumps(record) + "\n")
Output: artifacts/stream_inference.jsonl
{"timestamp": 1709550930.123, "score": 0.7234, "prediction": 1}
{"timestamp": 1709550930.456, "score": 0.4512, "prediction": 0}

Async Stream Simulation

Producer-consumer pattern with queue-based backpressure (real_time_pipelines/unified_pipeline.py:45-108).

Producer

Ingests rows with Gaussian jitter:
async def producer():
    for _, row in X.iterrows():
        produced += 1
        evt = {"t_ingest": time.perf_counter(), "row": row.to_dict()}
        try:
            queue.put_nowait(evt)
        except asyncio.QueueFull:
            dropped += 1
        await asyncio.sleep(max(0.0, random.gauss(jitter_ms / 1000.0, jitter_ms / 4000.0)))
Jitter simulation:
  • Mean: jitter_ms / 1000 seconds
  • Std dev: jitter_ms / 4000 seconds
  • Drops events when queue is full (tracks dropped count)

Consumer Pool

Multiple workers process queue concurrently:
async def consumer():
    while True:
        evt = await queue.get()
        if evt is None:
            break
        row_df = pd.DataFrame([evt["row"]])
        score = float(model.predict_proba(row_df)[:, 1][0])
        consumed += 1
        latencies.append((time.perf_counter() - evt["t_ingest"]) * 1000)
        record = {
            "timestamp": time.time(),
            "queue_latency_ms": latencies[-1],
            "score": score,
            "prediction": int(score >= 0.5),
        }
        # Write to file
        queue.task_done()
Worker pool created with:
consumer_tasks = [asyncio.create_task(consumer()) for _ in range(workers)]

Orchestration

producer_task = asyncio.create_task(producer())
consumer_tasks = [asyncio.create_task(consumer()) for _ in range(workers)]

await producer_task
await queue.join()
for _ in range(workers):
    await queue.put(None)  # Sentinel to stop consumers
await asyncio.gather(*consumer_tasks)
Output: artifacts/stream_inference_async.jsonl
{"timestamp": 1709550930.123, "queue_latency_ms": 12.34, "score": 0.7234, "prediction": 1}
{"timestamp": 1709550930.456, "queue_latency_ms": 8.56, "score": 0.4512, "prediction": 0}

Streaming Metrics

Async mode generates metrics report: artifacts/streaming_metrics.json
{
  "produced": 64,
  "consumed": 62,
  "dropped": 2,
  "drop_rate": 0.03125,
  "queue_latency_ms_p50": 10.23,
  "queue_latency_ms_p95": 45.67,
  "workers": 2,
  "queue_max": 32,
  "jitter_ms": 5.0
}
Metrics:
  • produced: Total events produced
  • consumed: Total events processed
  • dropped: Events dropped due to queue full
  • drop_rate: dropped / produced
  • queue_latency_ms_p50: Median queue latency
  • queue_latency_ms_p95: 95th percentile queue latency
  • workers: Number of consumer workers
  • queue_max: Maximum queue size
  • jitter_ms: Inter-arrival jitter

Data Source

The pipeline loads test data from trained artifacts:
config = load_config("config.yaml")
model_path = Path(config["artifacts"]["model_dir"]) / config["artifacts"]["model_file"]
model = joblib.load(model_path)

df = load_dataset(config)
_, X_test, _, _ = split_data(df, config)
X_slice = X_test.head(64).copy()  # Default 64 rows
Model must be trained first:
python -m src.train

Operational Considerations

Queue Size (--queue-max)

  • Larger queue: Higher backpressure tolerance, more memory usage
  • Smaller queue: Lower latency, higher drop rate under load
  • Tune based on burst traffic patterns

Worker Count (--workers)

  • More workers: Higher throughput until CPU saturation
  • Fewer workers: Lower concurrency, higher queue latency
  • Monitor CPU usage and latency percentiles

Jitter (--jitter-ms)

  • Higher jitter: Simulates bursty traffic, tests backpressure handling
  • Lower jitter: Steady traffic, lower drop rate
  • Use realistic production inter-arrival distributions

Test Mode

Test mode limits resources for CI environments:
if test_mode_enabled():
    args.jitter_ms = float(min(args.jitter_ms, 1.0))
    args.queue_max = int(min(args.queue_max, test_int("TEST_QUEUE_MAX", 8)))
    args.workers = int(min(args.workers, test_int("TEST_STREAM_WORKERS", 1)))
    max_rows = test_int("TEST_STREAM_ROWS", 16)
Environment variables:
  • TEST_MODE=1: Enable test mode
  • TEST_QUEUE_MAX=8: Max queue size
  • TEST_STREAM_WORKERS=1: Max workers
  • TEST_STREAM_ROWS=16: Max rows to process

Limitations

  • Static replay: Simulation replays static test rows, not diverse production events
  • Application-level metrics: Captures Python-level latency, not kernel/network latency
  • In-memory queue: Real production systems may use Kafka, RabbitMQ, or SQS
  • Fixed threshold: Uses 0.5 threshold, production may require dynamic thresholds

Performance Tuning

High Throughput

python real_time_pipelines/unified_pipeline.py \
  --mode async_stream \
  --jitter-ms 2 \
  --queue-max 128 \
  --workers 8

Low Latency

python real_time_pipelines/unified_pipeline.py \
  --mode async_stream \
  --jitter-ms 10 \
  --queue-max 16 \
  --workers 4

Stress Test

python real_time_pipelines/unified_pipeline.py \
  --mode async_stream \
  --jitter-ms 1 \
  --queue-max 8 \
  --workers 1
Monitor drop_rate and queue_latency_ms_p95 in metrics.

Build docs developers (and LLMs) love