Overview
The NBA Data Preprocessing Pipeline implements adaptive resource management to enable execution in constrained environments. The system dynamically adjusts chunk sizes, monitors memory usage, and provides graceful degradation mechanisms.
Configuration Parameters
Resource constraints are controlled through PipelineConfig:
@dataclass ( frozen = True )
class PipelineConfig :
chunk_size: int = 128 # Base chunk size for streaming
batch_size: int = 256 # Base batch size for model updates
max_memory_mb: int = 1024 # Memory limit (MB)
max_compute_units: float = 1.0 # Compute constraint (0.0-1.0)
adaptive_chunk_resize: bool = True # Enable dynamic chunk splitting
max_chunk_retries: int = 3 # Max retry attempts per chunk
spill_to_disk: bool = False # Enable disk spilling
Adaptive Chunk Sizing
The pipeline automatically adjusts chunk and batch sizes based on available resources.
Hardware-Adjusted Sizing
The _hardware_adjusted_sizes method scales chunk sizes based on memory and compute constraints:
def _hardware_adjusted_sizes (
self ,
rows : int ,
chunk_size : int | None = None ,
batch_size : int | None = None ,
max_memory_mb : int | None = None ,
max_compute_units : float | None = None ,
) -> tuple[ int , int ]:
chunk_base = self .config.chunk_size if chunk_size is None else chunk_size
batch_base = self .config.batch_size if batch_size is None else batch_size
memory_cap = self .config.max_memory_mb if max_memory_mb is None else max_memory_mb
compute_cap = self .config.max_compute_units if max_compute_units is None else max_compute_units
# Calculate scaling factors
memory_factor = max ( 0.1 , min ( 1.0 , memory_cap / 1024 ))
compute_factor = max ( 0.1 , min ( 1.0 , compute_cap))
scale = memory_factor * compute_factor
# Apply scaling with minimum bounds
adjusted_batch = max ( 16 , int (batch_base * scale))
adjusted_chunk = max ( 16 , int (chunk_base * scale))
return min (adjusted_batch, rows), min (adjusted_chunk, rows)
Scaling Behavior
Memory Scaling
Compute Scaling
Combined Scaling
Memory factor ranges from 0.1 to 1.0 based on max_memory_mb / 1024:
256 MB → factor = 0.25 → chunk size reduced to 25%
512 MB → factor = 0.5 → chunk size reduced to 50%
1024 MB → factor = 1.0 → no reduction
2048 MB → factor = 1.0 → capped at full size
Compute factor ranges from 0.1 to 1.0 based on max_compute_units:
0.25 → 25% of base chunk size
0.5 → 50% of base chunk size
1.0 → full base chunk size
Final scale is the product of memory and compute factors: scale = memory_factor * compute_factor
Example: 512 MB memory + 0.5 compute units = 0.5 × 0.5 = 0.25 scale
Minimum chunk size is always 16 rows to prevent excessive scheduler overhead.
Dynamic Chunk Splitting
When a chunk exceeds memory limits, the pipeline automatically splits it and retries.
Memory Monitoring
Each chunk is monitored before and after processing:
mem_before = self .hardware.process_memory_mb()
t0 = time.perf_counter()
# Process chunk
X_chunk, y_chunk, operator_profile = self ._profile_stream_chunk(
chunk, rolling_state
)
elapsed = time.perf_counter() - t0
mem_after = self .hardware.process_memory_mb()
memory_exceeded = mem_after > max_memory
Retry Logic
If memory is exceeded and adaptive_chunk_resize=True, the chunk is split:
if memory_exceeded and self .config.adaptive_chunk_resize and \
retries < self .config.max_chunk_retries and len (chunk) > 16 :
retries += 1
split = max ( 16 , len (chunk) // 2 ) # Split in half
# Queue second half for later processing
pending_chunks.insert( 0 , chunk.iloc[split:].copy())
# Retry with first half
chunk = chunk.iloc[:split].copy()
current_chunk_size = split
# Backoff delay
time.sleep( min ( 0.05 * retries, 0.2 ))
continue
Retry Behavior
Memory Violation Detected
mem_after > max_memory triggers adaptive resize check.
Split Chunk
Chunk is split in half (minimum 16 rows per split).
Queue Second Half
Second half is added to the front of pending chunks queue.
Retry First Half
Processing retries with the first half of the chunk.
Backoff Delay
Exponential backoff delay: 50ms, 100ms, 150ms (capped at 200ms).
Terminate After Max Retries
If retries exceed max_chunk_retries, chunk is processed as-is.
Setting max_chunk_retries too high can cause excessive splitting and slow down processing. The default value of 3 is recommended for most use cases.
Spill-to-Disk
Optional disk spilling enables processing beyond available RAM by persisting intermediate results.
Configuration
config = PipelineConfig(
spill_to_disk = True ,
output_dir = Path( 'artifacts' ),
)
Implementation
When enabled, processed chunks are written to disk:
spill_paths: dict[ str , str ] = {}
if self .config.spill_to_disk:
x_path = self .config.output_dir / 'intermediate' / f 'stream_chunk_ { chunk_id } _X.csv'
y_path = self .config.output_dir / 'intermediate' / f 'stream_chunk_ { chunk_id } _y.csv'
X_chunk.to_csv(x_path, index = False )
y_chunk.to_frame( 'salary' ).to_csv(y_path, index = False )
spill_paths = { 'X' : str (x_path), 'y' : str (y_path)}
Spill paths are tracked in chunk metrics:
chunk_metrics.append({
'chunk_id' : chunk_id,
'spill_paths' : spill_paths,
# ... other metrics
})
Trade-offs
Advantages
Enables processing of arbitrarily large datasets
Prevents out-of-memory failures
Intermediate results can be inspected
Disadvantages
Significant I/O overhead
Increased wall-clock time (2-10x slower)
Requires disk space (~ same as dataset size)
Constraint Experiments
The pipeline can run systematic experiments across different constraint configurations.
Running Experiments
runner = RealTimePipelineRunner(config)
experiment = runner.run_constraint_experiment( 'data/nba2k-full.csv' )
Constraint Sweep
The experiment sweeps across multiple dimensions:
def run_constraint_experiment ( self , source : str | Path | pd.DataFrame) -> dict :
rows = len (source) if isinstance (source, pd.DataFrame) else \
len ( self .ingestor.load(source))
# Define constraint grid
chunk_sizes = sorted ( set ([ max ( 16 , min (rows, s))
for s in [ 64 , self .config.chunk_size]]))
memory_limits = sorted ( set ([ 256 , self .config.max_memory_mb]))
compute_limits = sorted ( set ([ 0.5 , self .config.max_compute_units]))
tasks = [(c, m, cp) for c in chunk_sizes
for m in memory_limits
for cp in compute_limits]
# Run experiments (optionally in parallel)
if self .config.n_jobs > 1 :
experiment_rows = Parallel( n_jobs = self .config.n_jobs)(
delayed( self ._single_constraint_run)(source, c, m, cp)
for c, m, cp in tasks
)
else :
experiment_rows = [
self ._single_constraint_run(source, c, m, cp)
for c, m, cp in tasks
]
Experiment Results
Results include latency, memory, and accuracy metrics:
def _single_constraint_run (
self , source : str | Path | pd.DataFrame,
chunk : int , memory : int , compute : float
) -> dict :
run = self .run_streaming(
source,
chunk_size = chunk,
max_memory_mb = memory,
max_compute_units = compute
)
return {
'chunk_size' : int (chunk),
'memory_limit_mb' : int (memory),
'compute_limit' : float (compute),
'preprocessing_latency_s' : float (run[ 'latency_s' ]),
'peak_memory_mb' : float (run[ 'peak_memory_mb' ]),
'training_time_s' : float (run[ 'model' ][ 'training_time_s' ]),
'model_accuracy_r2' : float (run[ 'model' ][ 'r2' ]),
'model_rmse' : float (run[ 'model' ][ 'rmse' ]),
}
Results are saved to benchmarks/constraint_experiment.csv and visualized in PNG plots.
Hardware Telemetry
The pipeline monitors hardware resources using the HardwareMonitor class.
Available Metrics
Process memory : Current memory usage via psutil
CPU usage : Per-core CPU utilization
RAPL energy : Intel RAPL energy counters (if available)
Fallback energy : Coarse estimate based on runtime (45W for batch, 30W for streaming)
Energy Estimation
telemetry = self .hardware.compare(start_snapshot, end_snapshot)
telemetry[ 'fallback_energy_estimate_j' ] = elapsed * 45.0 # Batch
# or
telemetry[ 'fallback_energy_estimate_j' ] = total_elapsed * 30.0 # Streaming
# Use RAPL if available, otherwise fall back
energy = telemetry[ 'rapl_energy_j' ] if telemetry[ 'rapl_energy_j' ] is not None \
else telemetry[ 'fallback_energy_estimate_j' ]
RAPL energy counters are only available on Intel CPUs and may be disabled in containerized or virtualized environments.
Configuration Templates
Two templates are provided for common deployment scenarios:
Edge Template
configs/pipeline.edge.template.json:
{
"chunk_size" : 64 ,
"batch_size" : 128 ,
"max_memory_mb" : 512 ,
"max_compute_units" : 0.5 ,
"adaptive_chunk_resize" : true ,
"max_chunk_retries" : 3 ,
"spill_to_disk" : true ,
"n_jobs" : 1
}
Server Template
configs/pipeline.server.template.json:
{
"chunk_size" : 256 ,
"batch_size" : 512 ,
"max_memory_mb" : 4096 ,
"max_compute_units" : 1.0 ,
"adaptive_chunk_resize" : false ,
"max_chunk_retries" : 1 ,
"spill_to_disk" : false ,
"n_jobs" : 4
}
Failure Modes and Mitigations
Common bottlenecks and how the system handles them:
CSV Ingestion is I/O-Bound
Symptom : Slow load times on slower storageMitigation : Use SSD storage or pre-load DataFrame into memory
One-Hot Expansion Increases Memory
Symptom : Memory spikes during encoding for high-cardinality categoriesMitigation : Enable adaptive_chunk_resize to dynamically split chunks
Small Chunks Increase Overhead
Symptom : Low throughput despite low memory usageMitigation : Increase chunk_size if more memory is available
RAPL Energy Counters Unavailable
Symptom : Missing RAPL measurements in containers/VMsMitigation : Fallback estimation is automatically used (45W batch, 30W streaming)
Overly Strict Compute Limits
Symptom : Poor model quality due to aggressive downscalingMitigation : Increase max_compute_units or use batch mode
Best Practices
Profile First
Run a full pipeline with telemetry enabled to understand resource usage: results = runner.run_all( 'data/nba2k-full.csv' )
Start Conservative
Begin with small chunk sizes and low memory limits, then increase: config = PipelineConfig(
chunk_size = 64 ,
max_memory_mb = 512 ,
adaptive_chunk_resize = True ,
)
Monitor Chunk Metrics
Examine benchmarks/streaming_chunks.csv to identify bottlenecks:
High retries → chunk size too large
High memory_exceeded count → reduce max_memory_mb
High latency_s → increase chunk_size if memory allows
Run Constraint Experiments
Use systematic experiments to find optimal configuration: experiment = runner.run_constraint_experiment(source)
# Examine benchmarks/constraint_experiment.csv
Use Templates for Deployment
Apply edge or server templates based on environment: python run_pipeline.py \
--input data/nba2k-full.csv \
--config-template configs/pipeline.edge.template.json
Next Steps
Reproducibility Learn about deterministic execution
Architecture Review the system architecture