Skip to main content

Overview

The RealTimePipelineRunner class orchestrates the entire preprocessing pipeline, supporting both batch and streaming execution modes with comprehensive benchmarking and telemetry.

Class Definition

class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:32

Constructor

config
PipelineConfig
required
Pipeline configuration controlling all execution parameters
Initializes all pipeline components:
  • DataIngestor for data loading
  • Preprocessor for cleaning
  • FeatureEngineer for feature creation
  • DataValidator for quality checks
  • HardwareMonitor for telemetry

Methods

run_batch

def run_batch(self, source: str | Path | pd.DataFrame) -> dict
Executes the full pipeline in batch mode (loads entire dataset into memory).
source
str | Path | pd.DataFrame
required
Data source: file path (CSV) or pandas DataFrame
return
dict
Comprehensive report containing:
  • mode: “batch”
  • rows: Number of rows processed
  • latency_s: Total execution time in seconds
  • throughput_rows_s: Processing throughput
  • peak_memory_mb: Peak memory usage
  • energy_estimate_j: Energy consumption estimate
  • telemetry: Hardware metrics
  • model: Model performance metrics (RMSE, R², training time)
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(random_seed=42)
runner = RealTimePipelineRunner(config)

result = runner.run_batch('nba_data.csv')
print(f"Processed {result['rows']} rows in {result['latency_s']:.2f}s")
print(f"Model R²: {result['model']['r2']:.4f}")
print(f"Peak memory: {result['peak_memory_mb']:.2f} MB")

run_streaming

def run_streaming(
    self,
    source: str | Path | pd.DataFrame,
    chunk_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> dict
Executes the pipeline in streaming mode with adaptive chunk processing.
source
str | Path | pd.DataFrame
required
Data source: file path (CSV) or pandas DataFrame
chunk_size
int | None
default:"None"
Override config chunk size for this run
max_memory_mb
int | None
default:"None"
Override config memory limit for this run
max_compute_units
float | None
default:"None"
Override config compute limit for this run
return
dict
Streaming execution report containing:
  • mode: “streaming”
  • rows: Total rows processed
  • latency_s: Total execution time
  • throughput_rows_s: Processing throughput
  • peak_memory_mb: Peak memory usage
  • energy_estimate_j: Energy consumption
  • telemetry: Hardware metrics
  • chunk_metrics: Per-chunk performance data
  • operator_profile_summary_s: Timing breakdown by operation
  • model: Online model performance
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    chunk_size=128,
    max_memory_mb=1024,
    adaptive_chunk_resize=True
)
runner = RealTimePipelineRunner(config)

result = runner.run_streaming(
    'large_nba_data.csv',
    chunk_size=256,  # Override config
    max_memory_mb=2048
)

print(f"Processed {len(result['chunk_metrics'])} chunks")
print(f"Average latency per chunk: {result['latency_s'] / len(result['chunk_metrics']):.3f}s")
print(f"Operator profile: {result['operator_profile_summary_s']}")

benchmark

def benchmark(self, source: str | Path | pd.DataFrame) -> dict
Runs multiple iterations of both batch and streaming modes for statistical analysis.
source
str | Path | pd.DataFrame
required
Data source for benchmarking
return
dict
Benchmark results containing:
  • runs: List of all individual run results
  • latency_batch: Bootstrap confidence intervals for batch latency
  • latency_streaming: Bootstrap confidence intervals for streaming latency
  • throughput_batch: Bootstrap confidence intervals for batch throughput
  • throughput_streaming: Bootstrap confidence intervals for streaming throughput
  • significance: Permutation test p-values comparing modes
  • latency_vs_data_size: Scaling analysis
  • throughput_vs_memory: Resource efficiency analysis
  • resource_vs_accuracy: Accuracy-resource tradeoffs
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=10
)
runner = RealTimePipelineRunner(config)

benchmark_results = runner.benchmark('nba_data.csv')

print("Batch latency:")
print(f"  Mean: {benchmark_results['latency_batch']['mean']:.3f}s")
print(f"  95% CI: [{benchmark_results['latency_batch']['ci95_low']:.3f}, "
      f"{benchmark_results['latency_batch']['ci95_high']:.3f}]")

print("\nStreaming latency:")
print(f"  Mean: {benchmark_results['latency_streaming']['mean']:.3f}s")
print(f"  95% CI: [{benchmark_results['latency_streaming']['ci95_low']:.3f}, "
      f"{benchmark_results['latency_streaming']['ci95_high']:.3f}]")

print(f"\nStatistical significance (p-value): "
      f"{benchmark_results['significance']['latency_pvalue']:.4f}")

run_constraint_experiment

def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict
Tests pipeline performance across different resource constraints (chunk sizes, memory limits, compute limits).
source
str | Path | pd.DataFrame
required
Data source for experiments
return
dict
Experiment results containing:
  • records: List of results for each constraint combination
  • summary: Aggregate statistics including best accuracy, lowest latency, etc.
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    n_jobs=4,  # Run experiments in parallel
    chunk_size=128,
    max_memory_mb=1024,
    max_compute_units=1.0
)
runner = RealTimePipelineRunner(config)

experiments = runner.run_constraint_experiment('nba_data.csv')

print(f"Tested {len(experiments['records'])} configurations")
print(f"Best accuracy: {experiments['summary']['best_accuracy_r2']:.4f}")
print(f"Lowest latency: {experiments['summary']['lowest_latency_s']:.3f}s")
print(f"Max memory used: {experiments['summary']['max_peak_memory_mb']:.2f} MB")

# Find optimal configuration
for record in experiments['records']:
    if record['model_accuracy_r2'] == experiments['summary']['best_accuracy_r2']:
        print(f"\nOptimal config:")
        print(f"  Chunk size: {record['chunk_size']}")
        print(f"  Memory limit: {record['memory_limit_mb']} MB")
        print(f"  Compute limit: {record['compute_limit']}")

run_all

def run_all(self, source: str | Path | pd.DataFrame) -> dict
Executes the complete pipeline including batch, streaming, benchmarking, experiments, and validation.
source
str | Path | pd.DataFrame
required
Data source for comprehensive pipeline execution
return
dict
Comprehensive report containing:
  • dataset_fingerprint: Dataset SHA256 hash and metadata
  • reproducibility: Environment and configuration manifest
  • batch: Batch execution results
  • streaming: Streaming execution results
  • benchmark: Statistical benchmark results
  • constraint_experiment: Resource constraint experiments
  • quality: Data quality and validation reports
  • scaling: Parallel processing statistics
Also writes all artifacts to disk:
  • reports/pipeline_report.json
  • metadata/run_manifest.json
  • benchmarks/*.csv (multiple files)
  • profiles/operator_profile.csv
  • reports/streaming_chunks.jsonl
  • benchmarks/*.png (visualization plots)
Example:
from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
import json

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=5,
    n_jobs=4,
    output_dir=Path('full_pipeline_run')
)

runner = RealTimePipelineRunner(config)
report = runner.run_all('nba_salaries.csv')

print(f"Dataset: {report['dataset_fingerprint']['rows']} rows, "
      f"{report['dataset_fingerprint']['columns']} columns")
print(f"SHA256: {report['dataset_fingerprint']['sha256']}")

print(f"\nBatch performance:")
print(f"  Latency: {report['batch']['latency_s']:.3f}s")
print(f"  Model R²: {report['batch']['model']['r2']:.4f}")

print(f"\nStreaming performance:")
print(f"  Latency: {report['streaming']['latency_s']:.3f}s")
print(f"  Chunks: {len(report['streaming']['chunk_metrics'])}")

print(f"\nData quality:")
print(f"  Outlier rate: {report['quality']['outlier_rate']:.2%}")
print(f"  Drift score: {report['quality']['drift_score']:.4f}")
print(f"  Schema valid: {report['quality']['schema_ok']}")

# Artifacts are saved to full_pipeline_run/
with open('full_pipeline_run/reports/pipeline_report.json') as f:
    saved_report = json.load(f)
    print(f"\nReport saved with {len(saved_report)} sections")

Private Methods

_hardware_adjusted_sizes

def _hardware_adjusted_sizes(
    self,
    rows: int,
    chunk_size: int | None = None,
    batch_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> tuple[int, int]
Calculates adjusted batch and chunk sizes based on hardware constraints. Returns: (adjusted_batch_size, adjusted_chunk_size)

_bootstrap_ci

def _bootstrap_ci(self, arr: np.ndarray, n_bootstrap: int = 400) -> dict[str, float]
Computes bootstrap confidence intervals for performance metrics. Returns dict with: mean, std, median, p95, ci95_low, ci95_high, sample_size

_permutation_pvalue

def _permutation_pvalue(self, a: np.ndarray, b: np.ndarray, n_perm: int = 1000) -> float
Performs permutation test to determine statistical significance between two distributions.

Backward Compatibility

PipelineRunner = RealTimePipelineRunner
The alias PipelineRunner is provided for backward compatibility with older code.

Notes

  • All methods respect the random_seed from config for reproducibility
  • Streaming mode uses online learning (SGDRegressor) for incremental model updates
  • Batch mode uses LinearRegression for full dataset training
  • Hardware telemetry includes RAPL energy measurements when available
  • Adaptive chunk resizing automatically handles memory pressure
  • All timing uses time.perf_counter() for high-resolution measurements

Build docs developers (and LLMs) love