Skip to main content

Overview

The NBA Data Preprocessing Pipeline implements adaptive resource management to enable execution in constrained environments. The system dynamically adjusts chunk sizes, monitors memory usage, and provides graceful degradation mechanisms.

Configuration Parameters

Resource constraints are controlled through PipelineConfig:
@dataclass(frozen=True)
class PipelineConfig:
    chunk_size: int = 128              # Base chunk size for streaming
    batch_size: int = 256              # Base batch size for model updates
    max_memory_mb: int = 1024          # Memory limit (MB)
    max_compute_units: float = 1.0     # Compute constraint (0.0-1.0)
    adaptive_chunk_resize: bool = True # Enable dynamic chunk splitting
    max_chunk_retries: int = 3         # Max retry attempts per chunk
    spill_to_disk: bool = False        # Enable disk spilling

Adaptive Chunk Sizing

The pipeline automatically adjusts chunk and batch sizes based on available resources.

Hardware-Adjusted Sizing

The _hardware_adjusted_sizes method scales chunk sizes based on memory and compute constraints:
def _hardware_adjusted_sizes(
    self,
    rows: int,
    chunk_size: int | None = None,
    batch_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> tuple[int, int]:
    chunk_base = self.config.chunk_size if chunk_size is None else chunk_size
    batch_base = self.config.batch_size if batch_size is None else batch_size
    memory_cap = self.config.max_memory_mb if max_memory_mb is None else max_memory_mb
    compute_cap = self.config.max_compute_units if max_compute_units is None else max_compute_units

    # Calculate scaling factors
    memory_factor = max(0.1, min(1.0, memory_cap / 1024))
    compute_factor = max(0.1, min(1.0, compute_cap))
    scale = memory_factor * compute_factor
    
    # Apply scaling with minimum bounds
    adjusted_batch = max(16, int(batch_base * scale))
    adjusted_chunk = max(16, int(chunk_base * scale))
    
    return min(adjusted_batch, rows), min(adjusted_chunk, rows)

Scaling Behavior

Memory factor ranges from 0.1 to 1.0 based on max_memory_mb / 1024:
  • 256 MB → factor = 0.25 → chunk size reduced to 25%
  • 512 MB → factor = 0.5 → chunk size reduced to 50%
  • 1024 MB → factor = 1.0 → no reduction
  • 2048 MB → factor = 1.0 → capped at full size
Minimum chunk size is always 16 rows to prevent excessive scheduler overhead.

Dynamic Chunk Splitting

When a chunk exceeds memory limits, the pipeline automatically splits it and retries.

Memory Monitoring

Each chunk is monitored before and after processing:
mem_before = self.hardware.process_memory_mb()
t0 = time.perf_counter()

# Process chunk
X_chunk, y_chunk, operator_profile = self._profile_stream_chunk(
    chunk, rolling_state
)

elapsed = time.perf_counter() - t0
mem_after = self.hardware.process_memory_mb()
memory_exceeded = mem_after > max_memory

Retry Logic

If memory is exceeded and adaptive_chunk_resize=True, the chunk is split:
if memory_exceeded and self.config.adaptive_chunk_resize and \
   retries < self.config.max_chunk_retries and len(chunk) > 16:
    
    retries += 1
    split = max(16, len(chunk) // 2)  # Split in half
    
    # Queue second half for later processing
    pending_chunks.insert(0, chunk.iloc[split:].copy())
    
    # Retry with first half
    chunk = chunk.iloc[:split].copy()
    current_chunk_size = split
    
    # Backoff delay
    time.sleep(min(0.05 * retries, 0.2))
    continue

Retry Behavior

1

Memory Violation Detected

mem_after > max_memory triggers adaptive resize check.
2

Split Chunk

Chunk is split in half (minimum 16 rows per split).
3

Queue Second Half

Second half is added to the front of pending chunks queue.
4

Retry First Half

Processing retries with the first half of the chunk.
5

Backoff Delay

Exponential backoff delay: 50ms, 100ms, 150ms (capped at 200ms).
6

Terminate After Max Retries

If retries exceed max_chunk_retries, chunk is processed as-is.
Setting max_chunk_retries too high can cause excessive splitting and slow down processing. The default value of 3 is recommended for most use cases.

Spill-to-Disk

Optional disk spilling enables processing beyond available RAM by persisting intermediate results.

Configuration

config = PipelineConfig(
    spill_to_disk=True,
    output_dir=Path('artifacts'),
)

Implementation

When enabled, processed chunks are written to disk:
spill_paths: dict[str, str] = {}
if self.config.spill_to_disk:
    x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
    y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
    
    X_chunk.to_csv(x_path, index=False)
    y_chunk.to_frame('salary').to_csv(y_path, index=False)
    
    spill_paths = {'X': str(x_path), 'y': str(y_path)}
Spill paths are tracked in chunk metrics:
chunk_metrics.append({
    'chunk_id': chunk_id,
    'spill_paths': spill_paths,
    # ... other metrics
})

Trade-offs

Advantages

  • Enables processing of arbitrarily large datasets
  • Prevents out-of-memory failures
  • Intermediate results can be inspected

Disadvantages

  • Significant I/O overhead
  • Increased wall-clock time (2-10x slower)
  • Requires disk space (~ same as dataset size)

Constraint Experiments

The pipeline can run systematic experiments across different constraint configurations.

Running Experiments

runner = RealTimePipelineRunner(config)
experiment = runner.run_constraint_experiment('data/nba2k-full.csv')

Constraint Sweep

The experiment sweeps across multiple dimensions:
def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict:
    rows = len(source) if isinstance(source, pd.DataFrame) else \
           len(self.ingestor.load(source))

    # Define constraint grid
    chunk_sizes = sorted(set([max(16, min(rows, s)) 
                              for s in [64, self.config.chunk_size]]))
    memory_limits = sorted(set([256, self.config.max_memory_mb]))
    compute_limits = sorted(set([0.5, self.config.max_compute_units]))
    
    tasks = [(c, m, cp) for c in chunk_sizes 
                        for m in memory_limits 
                        for cp in compute_limits]
    
    # Run experiments (optionally in parallel)
    if self.config.n_jobs > 1:
        experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
            delayed(self._single_constraint_run)(source, c, m, cp) 
            for c, m, cp in tasks
        )
    else:
        experiment_rows = [
            self._single_constraint_run(source, c, m, cp) 
            for c, m, cp in tasks
        ]

Experiment Results

Results include latency, memory, and accuracy metrics:
def _single_constraint_run(
    self, source: str | Path | pd.DataFrame, 
    chunk: int, memory: int, compute: float
) -> dict:
    run = self.run_streaming(
        source, 
        chunk_size=chunk, 
        max_memory_mb=memory, 
        max_compute_units=compute
    )
    return {
        'chunk_size': int(chunk),
        'memory_limit_mb': int(memory),
        'compute_limit': float(compute),
        'preprocessing_latency_s': float(run['latency_s']),
        'peak_memory_mb': float(run['peak_memory_mb']),
        'training_time_s': float(run['model']['training_time_s']),
        'model_accuracy_r2': float(run['model']['r2']),
        'model_rmse': float(run['model']['rmse']),
    }
Results are saved to benchmarks/constraint_experiment.csv and visualized in PNG plots.

Hardware Telemetry

The pipeline monitors hardware resources using the HardwareMonitor class.

Available Metrics

  • Process memory: Current memory usage via psutil
  • CPU usage: Per-core CPU utilization
  • RAPL energy: Intel RAPL energy counters (if available)
  • Fallback energy: Coarse estimate based on runtime (45W for batch, 30W for streaming)

Energy Estimation

telemetry = self.hardware.compare(start_snapshot, end_snapshot)
telemetry['fallback_energy_estimate_j'] = elapsed * 45.0  # Batch
# or
telemetry['fallback_energy_estimate_j'] = total_elapsed * 30.0  # Streaming

# Use RAPL if available, otherwise fall back
energy = telemetry['rapl_energy_j'] if telemetry['rapl_energy_j'] is not None \
         else telemetry['fallback_energy_estimate_j']
RAPL energy counters are only available on Intel CPUs and may be disabled in containerized or virtualized environments.

Configuration Templates

Two templates are provided for common deployment scenarios:

Edge Template

configs/pipeline.edge.template.json:
{
  "chunk_size": 64,
  "batch_size": 128,
  "max_memory_mb": 512,
  "max_compute_units": 0.5,
  "adaptive_chunk_resize": true,
  "max_chunk_retries": 3,
  "spill_to_disk": true,
  "n_jobs": 1
}

Server Template

configs/pipeline.server.template.json:
{
  "chunk_size": 256,
  "batch_size": 512,
  "max_memory_mb": 4096,
  "max_compute_units": 1.0,
  "adaptive_chunk_resize": false,
  "max_chunk_retries": 1,
  "spill_to_disk": false,
  "n_jobs": 4
}

Failure Modes and Mitigations

Common bottlenecks and how the system handles them:
Symptom: Slow load times on slower storageMitigation: Use SSD storage or pre-load DataFrame into memory
Symptom: Memory spikes during encoding for high-cardinality categoriesMitigation: Enable adaptive_chunk_resize to dynamically split chunks
Symptom: Low throughput despite low memory usageMitigation: Increase chunk_size if more memory is available
Symptom: Missing RAPL measurements in containers/VMsMitigation: Fallback estimation is automatically used (45W batch, 30W streaming)
Symptom: Poor model quality due to aggressive downscalingMitigation: Increase max_compute_units or use batch mode

Best Practices

1

Profile First

Run a full pipeline with telemetry enabled to understand resource usage:
results = runner.run_all('data/nba2k-full.csv')
2

Start Conservative

Begin with small chunk sizes and low memory limits, then increase:
config = PipelineConfig(
    chunk_size=64,
    max_memory_mb=512,
    adaptive_chunk_resize=True,
)
3

Monitor Chunk Metrics

Examine benchmarks/streaming_chunks.csv to identify bottlenecks:
  • High retries → chunk size too large
  • High memory_exceeded count → reduce max_memory_mb
  • High latency_s → increase chunk_size if memory allows
4

Run Constraint Experiments

Use systematic experiments to find optimal configuration:
experiment = runner.run_constraint_experiment(source)
# Examine benchmarks/constraint_experiment.csv
5

Use Templates for Deployment

Apply edge or server templates based on environment:
python run_pipeline.py \
  --input data/nba2k-full.csv \
  --config-template configs/pipeline.edge.template.json

Next Steps

Reproducibility

Learn about deterministic execution

Architecture

Review the system architecture

Build docs developers (and LLMs) love