Overview
TheRealTimePipelineRunner class orchestrates the entire preprocessing pipeline, supporting both batch and streaming execution modes with comprehensive benchmarking and telemetry.
Class Definition
~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:32
Constructor
Pipeline configuration controlling all execution parameters
DataIngestorfor data loadingPreprocessorfor cleaningFeatureEngineerfor feature creationDataValidatorfor quality checksHardwareMonitorfor telemetry
Methods
run_batch
Data source: file path (CSV) or pandas DataFrame
Comprehensive report containing:
mode: “batch”rows: Number of rows processedlatency_s: Total execution time in secondsthroughput_rows_s: Processing throughputpeak_memory_mb: Peak memory usageenergy_estimate_j: Energy consumption estimatetelemetry: Hardware metricsmodel: Model performance metrics (RMSE, R², training time)
run_streaming
Data source: file path (CSV) or pandas DataFrame
Override config chunk size for this run
Override config memory limit for this run
Override config compute limit for this run
Streaming execution report containing:
mode: “streaming”rows: Total rows processedlatency_s: Total execution timethroughput_rows_s: Processing throughputpeak_memory_mb: Peak memory usageenergy_estimate_j: Energy consumptiontelemetry: Hardware metricschunk_metrics: Per-chunk performance dataoperator_profile_summary_s: Timing breakdown by operationmodel: Online model performance
benchmark
Data source for benchmarking
Benchmark results containing:
runs: List of all individual run resultslatency_batch: Bootstrap confidence intervals for batch latencylatency_streaming: Bootstrap confidence intervals for streaming latencythroughput_batch: Bootstrap confidence intervals for batch throughputthroughput_streaming: Bootstrap confidence intervals for streaming throughputsignificance: Permutation test p-values comparing modeslatency_vs_data_size: Scaling analysisthroughput_vs_memory: Resource efficiency analysisresource_vs_accuracy: Accuracy-resource tradeoffs
run_constraint_experiment
Data source for experiments
Experiment results containing:
records: List of results for each constraint combinationsummary: Aggregate statistics including best accuracy, lowest latency, etc.
run_all
Data source for comprehensive pipeline execution
Comprehensive report containing:
dataset_fingerprint: Dataset SHA256 hash and metadatareproducibility: Environment and configuration manifestbatch: Batch execution resultsstreaming: Streaming execution resultsbenchmark: Statistical benchmark resultsconstraint_experiment: Resource constraint experimentsquality: Data quality and validation reportsscaling: Parallel processing statistics
reports/pipeline_report.jsonmetadata/run_manifest.jsonbenchmarks/*.csv(multiple files)profiles/operator_profile.csvreports/streaming_chunks.jsonlbenchmarks/*.png(visualization plots)
Private Methods
_hardware_adjusted_sizes
(adjusted_batch_size, adjusted_chunk_size)
_bootstrap_ci
mean, std, median, p95, ci95_low, ci95_high, sample_size
_permutation_pvalue
Backward Compatibility
PipelineRunner is provided for backward compatibility with older code.
Notes
- All methods respect the
random_seedfrom config for reproducibility - Streaming mode uses online learning (SGDRegressor) for incremental model updates
- Batch mode uses LinearRegression for full dataset training
- Hardware telemetry includes RAPL energy measurements when available
- Adaptive chunk resizing automatically handles memory pressure
- All timing uses
time.perf_counter()for high-resolution measurements