Skip to main content

Overview

This guide provides optimization strategies based on profiling data and hardware constraints. Use the hardware profiling outputs to identify bottlenecks, then apply these strategies to improve performance.

Hardware-Adjusted Sizing

The pipeline automatically adjusts chunk and batch sizes based on available resources.

Implementation

From engine.py:42-60:
def _hardware_adjusted_sizes(
    self,
    rows: int,
    chunk_size: int | None = None,
    batch_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> tuple[int, int]:
    chunk_base = self.config.chunk_size if chunk_size is None else chunk_size
    batch_base = self.config.batch_size if batch_size is None else batch_size
    memory_cap = self.config.max_memory_mb if max_memory_mb is None else max_memory_mb
    compute_cap = self.config.max_compute_units if max_compute_units is None else max_compute_units

    memory_factor = max(0.1, min(1.0, memory_cap / 1024))
    compute_factor = max(0.1, min(1.0, compute_cap))
    scale = memory_factor * compute_factor
    adjusted_batch = max(16, int(batch_base * scale))
    adjusted_chunk = max(16, int(chunk_base * scale))
    return min(adjusted_batch, rows), min(adjusted_chunk, rows)

Sizing Formulas

Memory factor: min(1.0, max_memory_mb / 1024)
  • 512 MB → factor = 0.5
  • 1024 MB → factor = 1.0
  • 2048 MB → factor = 1.0 (capped)
Compute factor: max_compute_units (0.0 to 1.0)
  • 0.5 → factor = 0.5 (half available cores)
  • 1.0 → factor = 1.0 (all cores)
Combined scale: memory_factor × compute_factor Adjusted sizes:
  • chunk_size = max(16, base_chunk_size × scale)
  • batch_size = max(16, base_batch_size × scale)

Example

With chunk_size=128, max_memory_mb=512, max_compute_units=0.5:
memory_factor = 512 / 1024 = 0.5
compute_factor = 0.5
scale = 0.5 × 0.5 = 0.25
adjusted_chunk = max(16, 128 × 0.25) = max(16, 32) = 32
The chunk size is reduced from 128 to 32 rows to fit memory constraints.

Optimization by Scenario

Low-Memory Systems

Indicators:
  • Frequent memory_exceeded=true in streaming_chunks.csv
  • High retries count
  • Process killed by OOM (Out of Memory)
Strategies:
  1. Enable disk spilling:
    config = PipelineConfig(spill_to_disk=True)
    
    From engine.py:260-266, this saves intermediate results to disk:
    if self.config.spill_to_disk:
        x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
        y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
        X_chunk.to_csv(x_path, index=False)
        y_chunk.to_frame('salary').to_csv(y_path, index=False)
    
  2. Reduce chunk size:
    config = PipelineConfig(
        chunk_size=64,      # Smaller chunks
        max_memory_mb=256   # Realistic limit
    )
    
  3. Enable adaptive chunk resizing:
    config = PipelineConfig(
        adaptive_chunk_resize=True,
        max_chunk_retries=3
    )
    
    From engine.py:251-258, chunks are automatically split when memory is exceeded:
    if memory_exceeded and self.config.adaptive_chunk_resize and retries < self.config.max_chunk_retries:
        retries += 1
        split = max(16, len(chunk) // 2)
        pending_chunks.insert(0, chunk.iloc[split:].copy())
        chunk = chunk.iloc[:split].copy()
        current_chunk_size = split
        continue
    
  4. Use streaming mode exclusively:
    runner = RealTimePipelineRunner(config)
    result = runner.run_streaming(data, max_memory_mb=256)
    
Example configuration:
python run_pipeline.py \
  --chunk-size 64 \
  --max-memory-mb 256 \
  --spill-to-disk \
  --adaptive-chunk-resize \
  --max-chunk-retries 3

CPU-Constrained Systems

Indicators:
  • High cpu_percent in telemetry
  • Low throughput despite adequate memory
  • Long feature_engineering_s or encode_scale_s times
Strategies:
  1. Reduce compute allocation:
    config = PipelineConfig(
        max_compute_units=0.5,  # Use 50% of cores
        n_jobs=1                 # Disable parallelism
    )
    
  2. Use smaller batch sizes:
    config = PipelineConfig(
        batch_size=128  # Reduce from default 256
    )
    
  3. Disable parallel processing:
    config = PipelineConfig(n_jobs=1)
    
    From engine.py:397-402, the constraint experiment uses this setting:
    if self.config.n_jobs > 1:
        experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
            delayed(self._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
        )
    else:
        experiment_rows = [self._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]
    
Example configuration:
python run_pipeline.py \
  --max-compute-units 0.5 \
  --batch-size 128 \
  --n-jobs 1

Encode Stage Bottleneck

Indicators:
  • encode_scale_s dominates in operator_profile.csv
  • Time increases non-linearly with chunk size
Strategies:
  1. Reduce chunk size to improve cache locality:
    config = PipelineConfig(chunk_size=64)
    
  2. Profile chunk size impact:
    import pandas as pd
    profile = pd.read_csv('artifacts/profiles/operator_profile.csv')
    
    # Find optimal chunk size
    for size in [32, 64, 128, 256]:
        runner = RealTimePipelineRunner(PipelineConfig(chunk_size=size))
        result = runner.run_streaming(data, chunk_size=size)
        print(f"Chunk size {size}: {result['latency_s']:.3f}s")
    
  3. Monitor cache pressure:

Feature Engineering Bottleneck

Indicators:
  • feature_engineering_s dominates in operator_profile.csv
  • High CPU usage during this stage
Strategies:
  1. Use simpler rolling aggregations:
    • The build_features_streaming() method computes rolling statistics
    • Consider reducing the window size or number of features
  2. Pre-compute features offline:
    • For batch processing, compute features once and cache
  3. Parallelize feature computation:
    • Increase n_jobs if memory allows

I/O Bottleneck

Indicators:
  • Low estimated_input_bandwidth_mb_s (< 100 MB/s)
  • High latency despite low CPU and memory usage
  • Divergence between bandwidth estimate and throughput
Strategies:
  1. Increase chunk size to amortize I/O:
    config = PipelineConfig(chunk_size=256)
    
  2. Use faster storage:
    • SSD instead of HDD
    • Local storage instead of network drives
  3. Pre-load data into memory:
    import pandas as pd
    data = pd.read_csv('nba2k-full.csv')  # Load once
    runner.run_streaming(data)  # Pass DataFrame
    

Multi-Objective Optimization

Latency vs. Accuracy

From the benchmark visualizations (engine.py:512-525):
plt.scatter(
    experiment_df['preprocessing_latency_s'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['compute_limit'],
    cmap='viridis',
)
plt.xlabel('Preprocessing latency (s)')
plt.ylabel('Model accuracy (R²)')
plt.title('Latency vs Accuracy')
Trade-offs:
  • Smaller chunks → faster iteration but potential accuracy loss
  • Larger chunks → better feature context but higher latency
Finding optimal point:
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

results = []
for chunk_size in [32, 64, 128, 256]:
    config = PipelineConfig(chunk_size=chunk_size)
    runner = RealTimePipelineRunner(config)
    r = runner.run_streaming(data, chunk_size=chunk_size)
    results.append({
        'chunk_size': chunk_size,
        'latency': r['latency_s'],
        'r2': r['model']['r2']
    })

import pandas as pd
df = pd.DataFrame(results)
print(df)

Memory vs. Accuracy

From the benchmark visualizations (engine.py:527-540):
plt.scatter(
    experiment_df['peak_memory_mb'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['memory_limit_mb'],
    cmap='plasma',
)
plt.xlabel('Peak memory (MB)')
plt.ylabel('Model accuracy (R²)')
Trade-offs:
  • Lower memory limits require smaller chunks
  • May reduce model quality for streaming models
  • Batch models maintain accuracy but can’t run
Strategy: Use the constraint experiment to find the Pareto frontier.

Tuning Workflow

Step 1: Establish Baseline

python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir baseline \
  --benchmark-runs 3

Step 2: Identify Bottleneck

import pandas as pd

profile = pd.read_csv('baseline/profiles/operator_profile.csv')
means = profile[['preprocess_s', 'feature_engineering_s', 
                  'feature_selection_s', 'encode_scale_s']].mean()
print("Bottleneck:", means.idxmax())

Step 3: Apply Targeted Optimization

Based on bottleneck:
  • preprocess_s: Reduce chunk size or optimize cleaning logic
  • feature_engineering_s: Simplify features or increase parallelism
  • feature_selection_s: Reduce correlation threshold
  • encode_scale_s: Reduce chunk size (cache pressure)

Step 4: Validate Improvement

python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir optimized \
  --benchmark-runs 3 \
  --chunk-size 64  # example optimization

Step 5: Compare Results

import pandas as pd
import json

with open('baseline/reports/pipeline_report.json') as f:
    baseline = json.load(f)
    
with open('optimized/reports/pipeline_report.json') as f:
    optimized = json.load(f)

print(f"Baseline latency: {baseline['streaming']['latency_s']:.3f}s")
print(f"Optimized latency: {optimized['streaming']['latency_s']:.3f}s")
print(f"Speedup: {baseline['streaming']['latency_s'] / optimized['streaming']['latency_s']:.2f}x")

Advanced Techniques

Dynamic Chunk Sizing

The adaptive chunk resize feature (engine.py:251-258) automatically reduces chunk size when memory is exceeded:
config = PipelineConfig(
    adaptive_chunk_resize=True,
    max_chunk_retries=3,
    max_memory_mb=512
)
How it works:
  1. Chunk exceeds memory limit
  2. Split chunk in half
  3. Process first half
  4. Add second half back to queue
  5. Retry up to max_chunk_retries times
Backoff strategy (engine.py:257):
time.sleep(min(0.05 * retries, 0.2))  # 50ms, 100ms, 150ms, max 200ms

Online Learning for Streaming

The streaming mode uses SGDRegressor for incremental learning (engine.py:214):
online_model = SGDRegressor(
    random_state=self.config.random_seed,
    max_iter=1,
    tol=None,
    learning_rate='invscaling'
)
Each chunk updates the model via partial_fit() (engine.py:245):
if len(X_chunk) > 0:
    online_model.partial_fit(X_chunk.to_numpy(dtype=float), y_chunk.to_numpy(dtype=float))
Optimization: Adjust batch size for model updates based on convergence requirements.

Monitoring in Production

Key Metrics

  1. Per-chunk latency: Should remain stable
    chunks = pd.read_csv('artifacts/benchmarks/streaming_chunks.csv')
    print(f"Mean latency: {chunks['latency_s'].mean():.3f}s")
    print(f"Std latency: {chunks['latency_s'].std():.3f}s")
    print(f"P95 latency: {chunks['latency_s'].quantile(0.95):.3f}s")
    
  2. Memory stability: No increasing trend
    chunks['memory_delta'] = chunks['memory_after_mb'] - chunks['memory_before_mb']
    if chunks['memory_delta'].mean() > 10:
        print("WARNING: Potential memory leak")
    
  3. Retry rate: Should be near zero
    retry_rate = (chunks['retries'] > 0).mean()
    print(f"Retry rate: {retry_rate*100:.1f}%")
    

Alerting Thresholds

  • Latency P95 > 2× median: Performance degradation
  • Memory exceeded > 10%: Undersized configuration
  • Retry rate > 5%: Frequent memory pressure
  • Bandwidth < 50 MB/s: I/O bottleneck

Best Practices

  1. Always profile first: Use operator profiling to guide optimization
  2. Optimize the bottleneck: Focus on the dominant stage
  3. Test on representative data: Use production-scale samples
  4. Validate accuracy: Ensure optimizations don’t harm model quality
  5. Document baselines: Save reports before and after optimization
  6. Monitor continuously: Track metrics over time in production

Next Steps

Build docs developers (and LLMs) love