Overview
The streaming engine orchestrates the entire NBA preprocessing pipeline in both batch and streaming modes. The RealTimePipelineRunner class provides adaptive resource management, performance profiling, and comprehensive benchmarking capabilities.
RealTimePipelineRunner Class
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:32
Initialization
class RealTimePipelineRunner :
def __init__ ( self , config : PipelineConfig)
Pipeline configuration object containing:
random_seed: Reproducibility seed
chunk_size: Default rows per chunk
batch_size: Batch processing size
max_memory_mb: Memory limit
max_compute_units: CPU limit (0-1 scale)
benchmark_runs: Number of benchmark iterations
n_jobs: Parallel job count
adaptive_chunk_resize: Enable dynamic chunk sizing
max_chunk_retries: Max retry attempts for memory-constrained chunks
spill_to_disk: Enable disk spillover for large chunks
Initialization:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:33-40
def __init__ ( self , config : PipelineConfig):
self .config = config
set_global_seed(config.random_seed)
self .ingestor = DataIngestor(config.random_seed)
self .preprocessor = Preprocessor(config.random_seed)
self .engineer = FeatureEngineer()
self .validator = DataValidator()
self .hardware = HardwareMonitor()
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming import RealTimePipelineRunner
config = PipelineConfig(
random_seed = 42 ,
chunk_size = 1000 ,
max_memory_mb = 512 ,
max_compute_units = 0.8
)
runner = RealTimePipelineRunner(config)
Core Execution Methods
run_batch()
Executes the full pipeline on the entire dataset in memory.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:172
def run_batch ( self , source : str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Data source (file path, Path object, or DataFrame)
Returns: Dictionary with batch execution metrics:
mode: “batch”
rows: Number of rows processed
latency_s: Total execution time
throughput_rows_s: Processing speed
peak_memory_mb: Maximum memory usage
energy_estimate_j: Estimated energy consumption
telemetry: Hardware metrics
model: Model training metrics (RMSE, R²)
Execution Flow:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:173-196
def run_batch ( self , source ) -> dict :
# 1. Load full dataset
df = self .ingestor.load(source)
# 2. Start profiling
start_snapshot = self .hardware.snapshot()
tracemalloc.start()
t0 = time.perf_counter()
# 3. Process through pipeline
X, y = self ._process_df(df)
# 4. Measure performance
elapsed = time.perf_counter() - t0
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
end_snapshot = self .hardware.snapshot()
# 5. Evaluate model
throughput = len (df) / max (elapsed, 1e-9 )
model_metrics = self ._evaluate_model(X, y)
telemetry = self .hardware.compare(start_snapshot, end_snapshot)
return {
'mode' : 'batch' ,
'rows' : len (df),
'latency_s' : elapsed,
'throughput_rows_s' : throughput,
'peak_memory_mb' : peak / ( 1024 * 1024 ),
'energy_estimate_j' : telemetry.get( 'rapl_energy_j' , elapsed * 45.0 ),
'telemetry' : telemetry,
'model' : model_metrics
}
Example:
runner = RealTimePipelineRunner(config)
result = runner.run_batch( 'data/nba_players.csv' )
print ( f "Processed { result[ 'rows' ] :,} rows in { result[ 'latency_s' ] :.2f} s" )
print ( f "Throughput: { result[ 'throughput_rows_s' ] :,.0f} rows/sec" )
print ( f "Peak memory: { result[ 'peak_memory_mb' ] :.1f} MB" )
print ( f "Model R²: { result[ 'model' ][ 'r2' ] :.3f} " )
Batch mode loads the entire dataset into memory. Use streaming mode for large datasets.
run_streaming()
Executes the pipeline in streaming mode with adaptive resource management.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:198
def run_streaming (
self ,
source : str | Path | pd.DataFrame,
chunk_size : int | None = None ,
max_memory_mb : int | None = None ,
max_compute_units : float | None = None
) -> dict
source
str | Path | pd.DataFrame
required
Data source to stream
Override default chunk size (uses config if None)
Override memory limit (uses config if None)
max_compute_units
float | None
default: "None"
Override compute limit (uses config if None)
Returns: Dictionary with streaming execution metrics:
mode: “streaming”
rows: Total rows processed
latency_s: Total execution time
throughput_rows_s: Average processing speed
peak_memory_mb: Maximum memory usage
energy_estimate_j: Total energy consumption
chunk_metrics: Per-chunk performance metrics
operator_profile_summary_s: Average time per pipeline stage
model: Online learning model metrics
Key Features:
1. Hardware-Adjusted Chunk Sizing
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:205-211
rows = len (source) if isinstance (source, pd.DataFrame) else len ( self .ingestor.load(source))
batch_size, adjusted_chunk_size = self ._hardware_adjusted_sizes(
rows,
chunk_size = chunk_size,
max_memory_mb = max_memory_mb,
max_compute_units = max_compute_units
)
Algorithm:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:42-60
def _hardware_adjusted_sizes ( self , rows , chunk_size , batch_size , max_memory_mb , max_compute_units ):
chunk_base = self .config.chunk_size if chunk_size is None else chunk_size
batch_base = self .config.batch_size if batch_size is None else batch_size
memory_cap = self .config.max_memory_mb if max_memory_mb is None else max_memory_mb
compute_cap = self .config.max_compute_units if max_compute_units is None else max_compute_units
# Scale based on available resources
memory_factor = max ( 0.1 , min ( 1.0 , memory_cap / 1024 )) # More memory → larger chunks
compute_factor = max ( 0.1 , min ( 1.0 , compute_cap)) # More CPU → larger chunks
scale = memory_factor * compute_factor
adjusted_batch = max ( 16 , int (batch_base * scale))
adjusted_chunk = max ( 16 , int (chunk_base * scale))
return min (adjusted_batch, rows), min (adjusted_chunk, rows)
Example:
Base chunk_size: 1000
Memory: 512 MB → memory_factor = 0.5
Compute: 0.8 → compute_factor = 0.8
Scale = 0.5 × 0.8 = 0.4
Adjusted chunk_size = max(16, 1000 × 0.4) = 400 rows
2. Adaptive Chunk Resizing
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:251-258
if memory_exceeded and self .config.adaptive_chunk_resize and retries < self .config.max_chunk_retries and len (chunk) > 16 :
retries += 1
split = max ( 16 , len (chunk) // 2 )
pending_chunks.insert( 0 , chunk.iloc[split:].copy())
chunk = chunk.iloc[:split].copy()
current_chunk_size = split
time.sleep( min ( 0.05 * retries, 0.2 ))
continue # Retry with smaller chunk
Process:
Detect memory limit exceeded
Split chunk in half
Queue second half for later processing
Retry with first half
Exponential backoff between retries
3. Online Learning with SGDRegressor
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:214-246
online_model = SGDRegressor(
random_state = self .config.random_seed,
max_iter = 1 ,
tol = None ,
learning_rate = 'invscaling'
)
online_seen = False
online_feature_cols = None
for chunk in self ._iter_source_chunks(source, adjusted_chunk_size):
X_chunk, y_chunk = self ._process_stream_chunk(chunk, rolling_state)
# Align feature columns across chunks
if online_feature_cols is None :
online_feature_cols = list (X_chunk.columns)
else :
for col in online_feature_cols:
if col not in X_chunk.columns:
X_chunk[col] = 0.0
X_chunk = X_chunk.reindex( columns = online_feature_cols, fill_value = 0.0 )
# Incremental learning
if len (X_chunk) > 0 :
online_model.partial_fit(X_chunk.to_numpy( dtype = float ), y_chunk.to_numpy( dtype = float ))
online_seen = True
Feature alignment ensures consistent feature dimensions across chunks, even if new categories appear.
4. Disk Spillover
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:260-266
spill_paths: dict[ str , str ] = {}
if self .config.spill_to_disk:
x_path = self .config.output_dir / 'intermediate' / f 'stream_chunk_ { chunk_id } _X.csv'
y_path = self .config.output_dir / 'intermediate' / f 'stream_chunk_ { chunk_id } _y.csv'
X_chunk.to_csv(x_path, index = False )
y_chunk.to_frame( 'salary' ).to_csv(y_path, index = False )
spill_paths = { 'X' : str (x_path), 'y' : str (y_path)}
Example:
config = PipelineConfig(
chunk_size = 500 ,
max_memory_mb = 256 ,
adaptive_chunk_resize = True ,
max_chunk_retries = 3 ,
spill_to_disk = True
)
runner = RealTimePipelineRunner(config)
result = runner.run_streaming( 'large_dataset.csv' )
print ( f "Total latency: { result[ 'latency_s' ] :.2f} s" )
print ( f "Chunks processed: { len (result[ 'chunk_metrics' ]) } " )
print ( f "Avg chunk latency: { result[ 'operator_profile_summary_s' ][ 'preprocess_s' ] :.4f} s" )
print ( f "Model R²: { result[ 'model' ][ 'r2' ] :.3f} " )
for chunk in result[ 'chunk_metrics' ]:
if chunk[ 'memory_exceeded' ]:
print ( f "⚠️ Chunk { chunk[ 'chunk_id' ] } exceeded memory (retries: { chunk[ 'retries' ] } )" )
Benchmarking Methods
benchmark()
Runs multiple iterations of batch and streaming modes for statistical comparison.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:324
def benchmark ( self , source : str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Dataset to benchmark
Returns: Dictionary containing:
runs: List of all run results
latency_batch: Bootstrap confidence intervals for batch latency
latency_streaming: Bootstrap confidence intervals for streaming latency
throughput_batch: Throughput statistics
throughput_streaming: Throughput statistics
significance: Statistical tests comparing modes
latency_vs_data_size: Scalability analysis
throughput_vs_memory: Resource efficiency
resource_vs_accuracy: Trade-off analysis
Statistical Methods:
Bootstrap Confidence Intervals
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:108-132
def _bootstrap_ci ( self , arr : np.ndarray, n_bootstrap : int = 400 ) -> dict :
rng = np.random.default_rng( self .config.random_seed)
means = []
for _ in range (n_bootstrap):
sample = rng.choice(arr, size = len (arr), replace = True )
means.append( float (sample.mean()))
return {
'sample_size' : int ( len (arr)),
'mean' : float (arr.mean()),
'std' : float (arr.std( ddof = 0 )),
'median' : float (np.median(arr)),
'p95' : float (np.percentile(arr, 95 )),
'ci95_low' : float (np.percentile(means, 2.5 )),
'ci95_high' : float (np.percentile(means, 97.5 ))
}
Permutation Test (p-value)
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:134-147
def _permutation_pvalue ( self , a : np.ndarray, b : np.ndarray, n_perm : int = 1000 ) -> float :
rng = np.random.default_rng( self .config.random_seed)
observed = abs ( float (a.mean() - b.mean()))
combined = np.concatenate([a, b])
count = 0
for _ in range (n_perm):
shuffled = rng.permutation(combined)
a_perm = shuffled[: len (a)]
b_perm = shuffled[ len (a):]
if abs ( float (a_perm.mean() - b_perm.mean())) >= observed:
count += 1
return float ((count + 1 ) / (n_perm + 1 ))
Example:
runner = RealTimePipelineRunner(config)
bench = runner.benchmark( 'data/nba_players.csv' )
print ( "Batch latency:" )
print ( f " Mean: { bench[ 'latency_batch' ][ 'mean' ] :.3f} s" )
print ( f " 95% CI: [ { bench[ 'latency_batch' ][ 'ci95_low' ] :.3f} , { bench[ 'latency_batch' ][ 'ci95_high' ] :.3f} ]" )
print ( " \n Streaming latency:" )
print ( f " Mean: { bench[ 'latency_streaming' ][ 'mean' ] :.3f} s" )
print ( f " 95% CI: [ { bench[ 'latency_streaming' ][ 'ci95_low' ] :.3f} , { bench[ 'latency_streaming' ][ 'ci95_high' ] :.3f} ]" )
print ( " \n Statistical significance:" )
print ( f " p-value: { bench[ 'significance' ][ 'latency_pvalue' ] :.4f} " )
if bench[ 'significance' ][ 'latency_pvalue' ] < 0.05 :
print ( " ✓ Difference is statistically significant" )
run_constraint_experiment()
Tests pipeline performance under various resource constraints.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:389
def run_constraint_experiment ( self , source : str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Dataset for constraint testing
Returns: Dictionary with constraint experiment results:
records: List of all constraint configurations and their results
summary: Best performance metrics across all configurations
Experiment Grid:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:392-395
chunk_sizes = [ 64 , self .config.chunk_size]
memory_limits = [ 256 , self .config.max_memory_mb]
compute_limits = [ 0.5 , self .config.max_compute_units]
tasks = [(c, m, cp) for c in chunk_sizes for m in memory_limits for cp in compute_limits]
Parallel Execution:
if self .config.n_jobs > 1 :
experiment_rows = Parallel( n_jobs = self .config.n_jobs)(
delayed( self ._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
)
else :
experiment_rows = [ self ._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]
Example:
runner = RealTimePipelineRunner(config)
experiment = runner.run_constraint_experiment( 'data/nba_players.csv' )
print ( "Constraint experiment summary:" )
print ( f " Best accuracy: { experiment[ 'summary' ][ 'best_accuracy_r2' ] :.3f} " )
print ( f " Lowest latency: { experiment[ 'summary' ][ 'lowest_latency_s' ] :.3f} s" )
print ( f " Max memory used: { experiment[ 'summary' ][ 'max_peak_memory_mb' ] :.1f} MB" )
print ( " \n All configurations:" )
for record in experiment[ 'records' ]:
print ( f " Chunk= { record[ 'chunk_size' ] } , Mem= { record[ 'memory_limit_mb' ] } MB, " )
print ( f " Compute= { record[ 'compute_limit' ] :.1f} → R²= { record[ 'model_accuracy_r2' ] :.3f} , " )
print ( f " Latency= { record[ 'preprocessing_latency_s' ] :.2f} s" )
run_all()
Executes the complete pipeline with all benchmarks, experiments, and validation.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:415
def run_all ( self , source : str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Complete dataset to process
Returns: Comprehensive report containing:
dataset_fingerprint: SHA-256 hash and metadata
reproducibility: Environment and configuration details
batch: Batch mode results
streaming: Streaming mode results
benchmark: Statistical comparison
constraint_experiment: Resource constraint analysis
quality: Data quality and validation metrics
scaling: Parallelization info
Execution Flow:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:416-455
def run_all ( self , source ) -> dict :
self .config.ensure_output_dirs()
df = self .ingestor.load(source)
fp = self .ingestor.fingerprint(df)
# 1. Run both modes
batch_report = self .run_batch(df)
streaming_report = self .run_streaming(df)
# 2. Benchmarking
benchmark = self .benchmark(df)
constraint_experiment = self .run_constraint_experiment(df)
# 3. Validation
cleaned = self .preprocessor.clean(df)
outlier_mask = self .preprocessor.detect_outliers_iqr(
cleaned.select_dtypes( include = 'number' )
)
quality = self .validator.quality_report(cleaned, outlier_mask)
drift_score = self .validator.drift_detection(
cleaned,
cleaned.sample( frac = 1.0 , random_state = self .config.random_seed)
)
schema_ok, schema_issues = self .validator.schema_validation(
df,
required_columns = [ 'version' , 'salary' , 'b_day' , 'draft_year' ]
)
feature_drift = self .validator.feature_wise_drift(
cleaned.iloc[: len (cleaned) // 2 ],
cleaned.iloc[ len (cleaned) // 2 :]
)
temporal_drift = self .validator.temporal_drift(df)
# 4. Compile report
report = {
'dataset_fingerprint' : asdict(fp),
'reproducibility' : self ._reproducibility_manifest(),
'batch' : batch_report,
'streaming' : streaming_report,
'benchmark' : benchmark,
'constraint_experiment' : constraint_experiment,
'quality' : asdict(quality) | {
'drift_score' : drift_score,
'schema_ok' : schema_ok,
'schema_issues' : schema_issues,
'feature_drift' : feature_drift,
'temporal_drift' : temporal_drift
},
'scaling' : {
'n_jobs' : self .config.n_jobs,
'parallel_enabled' : self .config.n_jobs > 1
}
}
self ._write_artifacts(report)
return report
Example:
config = PipelineConfig(
random_seed = 42 ,
chunk_size = 1000 ,
benchmark_runs = 5 ,
n_jobs = 4 ,
output_dir = Path( 'output' )
)
runner = RealTimePipelineRunner(config)
report = runner.run_all( 'data/nba_players.csv' )
print ( f "Dataset: { report[ 'dataset_fingerprint' ][ 'path' ] } " )
print ( f "Rows: { report[ 'dataset_fingerprint' ][ 'rows' ] :,} " )
print ( f "SHA-256: { report[ 'dataset_fingerprint' ][ 'sha256' ][: 16 ] } ..." )
print ( f " \n Batch mode: { report[ 'batch' ][ 'latency_s' ] :.2f} s" )
print ( f "Streaming mode: { report[ 'streaming' ][ 'latency_s' ] :.2f} s" )
print ( f " \n Quality:" )
print ( f " Outlier rate: { report[ 'quality' ][ 'outlier_rate' ] :.2%} " )
print ( f " Schema valid: { report[ 'quality' ][ 'schema_ok' ] } " )
print ( f " Drift score: { report[ 'quality' ][ 'drift_score' ] :.3f} " )
Output Artifacts
The pipeline generates comprehensive output files:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:473-509
Generated Files
output/
├── reports/
│ ├── pipeline_report.json # Complete execution report
│ ├── streaming_chunks.jsonl # Per-chunk metrics
│ └── constraint_experiment_log.jsonl
├── metadata/
│ └── run_manifest.json # Reproducibility metadata
├── benchmarks/
│ ├── streaming_chunks.csv
│ ├── latency_vs_data_size.csv
│ ├── throughput_vs_memory.csv
│ ├── resource_vs_accuracy.csv
│ ├── significance_tests.csv
│ ├── constraint_experiment.csv
│ ├── latency_vs_accuracy.png
│ ├── memory_vs_accuracy.png
│ └── latency_memory_accuracy.png
├── profiles/
│ └── operator_profile.csv # Stage-wise timing
└── intermediate/
├── stream_chunk_1_X.csv # Spilled features (if enabled)
└── stream_chunk_1_y.csv # Spilled targets
Visualization
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:511-555
Three plots are generated:
Latency vs Accuracy (colored by compute constraint)
Memory vs Accuracy (colored by memory limit)
Latency vs Memory vs Accuracy (3D trade-off)
Data Flow
Batch Mode Best For:
Small to medium datasets (<1GB)
High-memory environments
Maximum throughput required
No real-time constraints
Streaming Mode Best For:
Large datasets (>1GB)
Memory-constrained environments
Real-time processing
Incremental learning scenarios
Resource Management Tips
Right-Size Chunks Start with chunk_size=1000 and adjust based on memory usage monitoring.
Enable Adaptive Resizing Set adaptive_chunk_resize=True for automatic memory management.
Use Disk Spillover Enable spill_to_disk=True for debugging and fault tolerance.
Parallelize Experiments Set n_jobs > 1 to speed up constraint experiments and benchmarks.
Best Practices
Profile First
Run run_all() on a sample dataset to understand resource requirements.
Configure Constraints
Set max_memory_mb and max_compute_units based on production environment.
Benchmark Regularly
Use benchmark() to track performance regression over time.
Monitor Artifacts
Review generated CSVs and plots to identify bottlenecks.
Validate Reproducibility
Check run_manifest.json to ensure consistent results.
Next Steps
Ingestion Learn about data loading and fingerprinting
Preprocessing Explore data cleaning and transformation
Feature Engineering Build derived features for ML models
Validation Validate data quality and detect drift