Overview
The NBA Data Preprocessing Pipeline supports two execution modes, each optimized for different scenarios:
Batch mode : Full-dataset preprocessing with in-memory processing
Streaming mode : Chunked processing with adaptive resource management
Both modes use the same preprocessing logic and produce comparable model quality, but differ in memory footprint, latency, and throughput characteristics.
Batch Mode
Batch mode processes the entire dataset in memory as a single operation.
When to Use Batch Mode
Server Environments High-memory systems with 4GB+ RAM available
Small Datasets Datasets that comfortably fit in memory
Minimum Latency When processing speed is the primary concern
Offline Processing Non-real-time batch ETL pipelines
Implementation
Batch mode is implemented in the run_batch method:
def run_batch ( self , source : str | Path | pd.DataFrame) -> dict :
df = self .ingestor.load(source)
start_snapshot = self .hardware.snapshot()
tracemalloc.start()
t0 = time.perf_counter()
# Single-pass processing
X, y = self ._process_df(df)
elapsed = time.perf_counter() - t0
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
end_snapshot = self .hardware.snapshot()
throughput = len (df) / max (elapsed, 1e-9 )
model_metrics = self ._evaluate_model(X, y)
telemetry = self .hardware.compare(start_snapshot, end_snapshot)
return {
'mode' : 'batch' ,
'rows' : len (df),
'latency_s' : elapsed,
'throughput_rows_s' : throughput,
'peak_memory_mb' : peak / ( 1024 * 1024 ),
'model' : model_metrics,
}
Processing Pipeline
Batch mode executes a linear pipeline:
def _process_df ( self , df : pd.DataFrame) -> tuple[pd.DataFrame, pd.Series]:
cleaned = self .preprocessor.clean(df)
featured = self .engineer.build_features(cleaned)
filtered = self .engineer.drop_multicollinearity(featured)
X, y = self .engineer.encode_and_scale(filtered)
return X, y
Characteristics
Metric Batch Mode Memory Usage High (entire dataset in RAM) Latency Lowest Throughput Highest Model Training Offline LinearRegression with train/test split Failure Mode OOM on large datasets
Streaming Mode
Streaming mode processes data in configurable chunks with adaptive resource management.
When to Use Streaming Mode
Edge Devices Resource-constrained devices (IoT, embedded systems)
Large Datasets Datasets larger than available memory
Memory Limits Strict memory constraints (< 1GB RAM)
Online Learning Real-time incremental model updates
Implementation
Streaming mode is implemented in the run_streaming method:
def run_streaming (
self ,
source : str | Path | pd.DataFrame,
chunk_size : int | None = None ,
max_memory_mb : int | None = None ,
max_compute_units : float | None = None ,
) -> dict :
# Adjust chunk size based on resource constraints
batch_size, adjusted_chunk_size = self ._hardware_adjusted_sizes(
rows,
chunk_size = chunk_size,
max_memory_mb = max_memory_mb,
max_compute_units = max_compute_units,
)
# Initialize online learning model
online_model = SGDRegressor(
random_state = self .config.random_seed,
max_iter = 1 ,
tol = None ,
learning_rate = 'invscaling'
)
rolling_state = self .engineer.init_rolling_state()
# Process chunks iteratively
for raw_chunk in self ._iter_source_chunks(source, adjusted_chunk_size):
X_chunk, y_chunk, operator_profile = self ._profile_stream_chunk(
chunk, rolling_state
)
# Incremental model update
if len (X_chunk) > 0 :
online_model.partial_fit(
X_chunk.to_numpy( dtype = float ),
y_chunk.to_numpy( dtype = float )
)
Chunk Processing Pipeline
Each chunk goes through a profiled pipeline:
def _profile_stream_chunk (
self , chunk : pd.DataFrame, rolling_state : Any
) -> tuple[pd.DataFrame, pd.Series, dict[ str , float ]]:
stage_start = time.perf_counter()
cleaned = self .preprocessor.clean(chunk)
preprocess_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
featured = self .engineer.build_features_streaming(cleaned, rolling_state)
feature_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
filtered = self .engineer.drop_multicollinearity(featured)
select_s = time.perf_counter() - stage_start
stage_start = time.perf_counter()
x_chunk, y_chunk = self .engineer.encode_and_scale(filtered)
encode_s = time.perf_counter() - stage_start
return x_chunk, y_chunk, {
'preprocess_s' : float (preprocess_s),
'feature_engineering_s' : float (feature_s),
'feature_selection_s' : float (select_s),
'encode_scale_s' : float (encode_s),
}
Chunk Iteration
The pipeline supports both DataFrame and CSV streaming:
def _iter_source_chunks (
self , source : str | Path | pd.DataFrame, chunk_size : int
) -> Iterator[pd.DataFrame]:
if isinstance (source, pd.DataFrame):
for start in range ( 0 , len (source), chunk_size):
yield source.iloc[start : start + chunk_size].copy()
return
for chunk in pd.read_csv(source, chunksize = chunk_size):
yield chunk.copy()
Characteristics
Metric Streaming Mode Memory Usage Low (controlled by chunk_size) Latency Higher (chunk overhead) Throughput Lower (incremental processing) Model Training Online SGDRegressor with partial_fit Failure Mode Graceful degradation with adaptive resizing
Chunk-Level Telemetry
Streaming mode captures detailed per-chunk metrics:
chunk_metrics.append({
'chunk_id' : chunk_id,
'rows' : len (chunk),
'latency_s' : elapsed,
'throughput_rows_s' : len (chunk) / max (elapsed, 1e-9 ),
'batch_size' : batch_size,
'chunk_size' : current_chunk_size,
'memory_before_mb' : mem_before,
'memory_after_mb' : mem_after,
'memory_exceeded' : memory_exceeded,
'retries' : retries,
'spill_paths' : spill_paths,
'operator_profile_s' : operator_profile,
})
These metrics are written to benchmarks/streaming_chunks.csv and reports/streaming_chunks.jsonl.
Comparison
Latency
Memory
Throughput
Model Quality
Batch mode has lower latency because it avoids per-chunk overhead and scheduler costs.Typical batch latency: 0.5-2s for 1000 rows Typical streaming latency: 1-4s for 1000 rows (depends on chunk size)
Streaming mode uses significantly less memory through controlled chunk sizes.Batch peak memory: 200-500 MB for 1000 rows Streaming peak memory: 50-150 MB for 1000 rows (chunk_size=128)
Batch mode achieves higher throughput by processing data in bulk.Batch throughput: 500-2000 rows/s Streaming throughput: 200-800 rows/s
Both modes produce comparable model quality, though streaming uses online learning. Batch R²: 0.65-0.75 (train/test split) Streaming R²: 0.60-0.70 (incremental learning)
Running Each Mode
Batch Execution
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
config = PipelineConfig(
random_seed = 42 ,
max_memory_mb = 4096 , # High memory available
)
runner = RealTimePipelineRunner(config)
results = runner.run_batch( 'data/nba2k-full.csv' )
print ( f "Latency: { results[ 'latency_s' ] :.2f} s" )
print ( f "Peak memory: { results[ 'peak_memory_mb' ] :.1f} MB" )
print ( f "R²: { results[ 'model' ][ 'r2' ] :.3f} " )
Streaming Execution
config = PipelineConfig(
random_seed = 42 ,
chunk_size = 64 ,
max_memory_mb = 512 , # Limited memory
adaptive_chunk_resize = True ,
)
runner = RealTimePipelineRunner(config)
results = runner.run_streaming( 'data/nba2k-full.csv' )
print ( f "Latency: { results[ 'latency_s' ] :.2f} s" )
print ( f "Peak memory: { results[ 'peak_memory_mb' ] :.1f} MB" )
print ( f "Chunks processed: { len (results[ 'chunk_metrics' ]) } " )
print ( f "R²: { results[ 'model' ][ 'r2' ] :.3f} " )
When using streaming mode, ensure chunk_size is set appropriately for your memory constraints. Smaller chunks reduce memory usage but increase overhead.
Next Steps
Resource Constraints Learn about adaptive resource management
Reproducibility Understand deterministic execution