Skip to main content

Overview

The pipeline provides built-in benchmarking capabilities to measure performance across batch and streaming modes. The benchmark() method automatically runs multiple trials and generates statistical reports with confidence intervals.

Running Benchmarks

Basic Benchmark Execution

from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig

config = PipelineConfig(
    chunk_size=128,
    batch_size=256,
    benchmark_runs=3,  # Number of trials
    random_seed=42
)

runner = RealTimePipelineRunner(config)
results = runner.benchmark("nba2k-full.csv")

Command-Line Benchmarking

cd "NBA Data Preprocessing/task"
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir artifacts_profile \
  --chunk-size 128 \
  --batch-size 256 \
  --max-memory-mb 512 \
  --max-compute-units 0.5 \
  --benchmark-runs 3 \
  --random-seed 42

Benchmark Methodology

Statistical Analysis

The benchmark method performs the following analysis:
  1. Multiple Runs: Executes both batch and streaming modes for the configured number of runs
  2. Bootstrap Confidence Intervals: Computes 95% CI using 400 bootstrap resamples
  3. Permutation Testing: Tests statistical significance between batch and streaming performance
  4. Scalability Analysis: Measures latency and throughput across different data sizes

Implementation Details

From engine.py:324-374:
def benchmark(self, source: str | Path | pd.DataFrame) -> dict:
    runs = []
    for _ in range(self.config.benchmark_runs):
        runs.append({
            'batch': self.run_batch(source),
            'streaming': self.run_streaming(source)
        })

    batch_latencies = np.array([r['batch']['latency_s'] for r in runs])
    stream_latencies = np.array([r['streaming']['latency_s'] for r in runs])
    batch_tp = np.array([r['batch']['throughput_rows_s'] for r in runs])
    stream_tp = np.array([r['streaming']['throughput_rows_s'] for r in runs])

    # Bootstrap confidence intervals
    return {
        'latency_batch': self._bootstrap_ci(batch_latencies),
        'latency_streaming': self._bootstrap_ci(stream_latencies),
        'throughput_batch': self._bootstrap_ci(batch_tp),
        'throughput_streaming': self._bootstrap_ci(stream_tp),
        'significance': {
            'latency_pvalue': self._permutation_pvalue(batch_latencies, stream_latencies),
            'throughput_pvalue': self._permutation_pvalue(batch_tp, stream_tp)
        }
    }

Bootstrap Confidence Intervals

The _bootstrap_ci() method (engine.py:108-132) provides robust statistics:
  • Sample size: Number of observations
  • Mean: Average value across runs
  • Standard deviation: Measure of variability
  • Median: 50th percentile
  • P95: 95th percentile (tail latency)
  • CI95 low/high: 95% confidence interval bounds

Permutation Testing

The _permutation_pvalue() method (engine.py:134-147) determines if performance differences are statistically significant:
  • Uses 1000 permutations by default
  • Computes p-value for mean difference
  • P-value < 0.05 indicates significant difference

Generated Artifacts

Benchmark CSV Files

After running run_all(), several benchmark files are created in output_dir/benchmarks/:

streaming_chunks.csv

Per-chunk metrics for streaming mode:
chunk_id,rows,latency_s,throughput_rows_s,batch_size,chunk_size,memory_before_mb,memory_after_mb,memory_exceeded,retries
1,128,0.045,2844.4,256,128,245.3,268.7,false,0
2,128,0.042,3047.6,256,128,268.7,289.1,false,0
Key columns:
  • chunk_id: Sequential chunk identifier
  • latency_s: Time to process chunk
  • throughput_rows_s: Rows processed per second
  • memory_exceeded: Whether chunk exceeded memory limit
  • retries: Number of retry attempts due to memory pressure

latency_vs_data_size.csv

Scalability analysis across different data sizes:
rows,latency_s
64,0.023
128,0.041
256,0.078
512,0.152

throughput_vs_memory.csv

Memory vs. performance trade-offs:
peak_memory_mb,throughput_rows_s
128.4,2456.7
245.8,3102.3
489.2,3287.1

resource_vs_accuracy.csv

Resource consumption vs. model accuracy:
mode,peak_memory_mb,r2
batch,512.3,0.847
streaming,287.6,0.834

significance_tests.csv

Statistical significance results:
latency_pvalue,throughput_pvalue,latency_mean_delta_s,throughput_mean_delta_rows_s
0.042,0.038,0.123,-145.6
Interpretation:
  • P-values < 0.05 indicate statistically significant differences
  • Negative throughput_mean_delta means streaming is faster
  • Positive latency_mean_delta means streaming has higher latency

Visualization Artifacts

The benchmark process also generates PNG visualizations (see _plot_experiment_results() in engine.py:511-555):
  • latency_vs_accuracy.png: Trade-off between speed and model quality
  • memory_vs_accuracy.png: Memory consumption impact on accuracy
  • latency_memory_accuracy.png: Three-way relationship visualization

Scalability Analysis

The benchmark method tests scalability by running the pipeline on progressively larger subsets:
# From engine.py:334-342
sizes = [min(size_base, s) for s in (64, 128, 256, size_base)]
latency_vs_size = []
for size in sizes:
    sample = source.iloc[:size] if isinstance(source, pd.DataFrame) else self.ingestor.load(source).iloc[:size]
    b = self.run_batch(sample)
    latency_vs_size.append({'rows': size, 'latency_s': b['latency_s']})
This allows you to:
  • Identify performance bottlenecks at different scales
  • Estimate resource requirements for production data sizes
  • Validate linear vs. non-linear scaling behavior

Reproducibility

All benchmarks include a reproducibility manifest (engine.py:149-170):
{
  "random_seed": 42,
  "python_version": "3.11.0",
  "platform": "Linux-5.15.0-x86_64",
  "config": {
    "chunk_size": 128,
    "batch_size": 256,
    "benchmark_runs": 3
  },
  "dependencies": {
    "numpy": "1.24.3",
    "pandas": "2.0.2"
  }
}
This manifest is saved to output_dir/metadata/run_manifest.json and enables:
  • Exact reproduction of benchmark results
  • Environment comparison across systems
  • Version-specific regression testing

Interpreting Results

When Batch Mode is Faster

  • Small datasets that fit comfortably in memory
  • No memory constraints
  • Simpler model training without streaming overhead

When Streaming Mode is Faster

  • Large datasets exceeding available memory
  • Memory-constrained environments
  • Real-time or incremental processing requirements

Key Metrics to Monitor

  1. Latency: Total time to complete processing
  2. Throughput: Rows processed per second
  3. Peak memory: Maximum memory usage during execution
  4. Model accuracy: R² score for regression quality
  5. P-values: Statistical significance of performance differences

Best Practices

  1. Run multiple trials: Set benchmark_runs >= 3 for statistical validity
  2. Control for variability: Use fixed random_seed for reproducibility
  3. Warm up system: Discard first run if system is cold
  4. Isolate workload: Close other applications during benchmarking
  5. Monitor system: Check for background processes affecting results
  6. Document environment: Save reproducibility manifest with results

Next Steps

Build docs developers (and LLMs) love