Skip to main content

Overview

The constraint experiment functionality systematically tests pipeline performance across combinations of chunk sizes, memory limits, and compute constraints. This enables:
  • Finding optimal configurations for constrained environments
  • Understanding performance trade-offs
  • Validating edge and low-resource scenarios
  • Identifying Pareto-optimal points

Running Constraint Experiments

Programmatic Usage

from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig

config = PipelineConfig(
    chunk_size=128,
    max_memory_mb=512,
    max_compute_units=1.0,
    n_jobs=1  # Disable parallelism for isolated tests
)

runner = RealTimePipelineRunner(config)
results = runner.run_constraint_experiment("nba2k-full.csv")

Command-Line Usage

The constraint experiment runs automatically as part of run_all():
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir artifacts \
  --chunk-size 128 \
  --max-memory-mb 512 \
  --max-compute-units 1.0

Experiment Methodology

Implementation

From engine.py:389-413:
def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict:
    rows = len(source) if isinstance(source, pd.DataFrame) else len(self.ingestor.load(source))

    # Define parameter grid
    chunk_sizes = sorted(set([max(16, min(rows, s)) for s in [64, self.config.chunk_size]]))
    memory_limits = sorted(set([256, self.config.max_memory_mb]))
    compute_limits = sorted(set([0.5, self.config.max_compute_units]))
    tasks = [(c, m, cp) for c in chunk_sizes for m in memory_limits for cp in compute_limits]

    # Run experiments (parallel or sequential)
    if self.config.n_jobs > 1:
        experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
            delayed(self._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
        )
    else:
        experiment_rows = [self._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]

    results_df = pd.DataFrame(experiment_rows).sort_values(['chunk_size', 'memory_limit_mb', 'compute_limit'])
    return {
        'records': results_df.to_dict(orient='records'),
        'summary': {
            'best_accuracy_r2': float(results_df['model_accuracy_r2'].max()),
            'lowest_latency_s': float(results_df['preprocessing_latency_s'].min()),
            'lowest_training_time_s': float(results_df['training_time_s'].min()),
            'max_peak_memory_mb': float(results_df['peak_memory_mb'].max()),
        },
    }

Parameter Grid

The experiment tests all combinations of:
  1. Chunk sizes: [64, config.chunk_size]
    • Minimum: 64 rows
    • Maximum: Configured chunk size
    • Duplicates removed
  2. Memory limits: [256, config.max_memory_mb]
    • Low-memory scenario: 256 MB
    • Configured limit: User-specified
  3. Compute limits: [0.5, config.max_compute_units]
    • CPU-constrained: 50% utilization
    • Full utilization: User-specified
Total runs: Up to 2 × 2 × 2 = 8 configurations

Single Run Implementation

From engine.py:376-387:
def _single_constraint_run(self, source: str | Path | pd.DataFrame, chunk: int, memory: int, compute: float) -> dict:
    run = self.run_streaming(source, chunk_size=chunk, max_memory_mb=memory, max_compute_units=compute)
    return {
        'chunk_size': int(chunk),
        'memory_limit_mb': int(memory),
        'compute_limit': float(compute),
        'preprocessing_latency_s': float(run['latency_s']),
        'peak_memory_mb': float(run['peak_memory_mb']),
        'training_time_s': float(run['model']['training_time_s']),
        'model_accuracy_r2': float(run['model']['r2']),
        'model_rmse': float(run['model']['rmse']),
    }

Generated Artifacts

constraint_experiment.csv

Complete results matrix in output_dir/benchmarks/:
chunk_size,memory_limit_mb,compute_limit,preprocessing_latency_s,peak_memory_mb,training_time_s,model_accuracy_r2,model_rmse
64,256,0.5,2.145,248.3,2.134,0.821,1245.67
64,256,1.0,1.987,251.7,1.976,0.824,1238.92
64,512,0.5,1.856,342.1,1.845,0.829,1229.45
64,512,1.0,1.678,347.8,1.667,0.831,1223.18
128,256,0.5,1.923,254.9,1.912,0.835,1215.34
128,256,1.0,1.734,258.2,1.723,0.837,1209.76
128,512,0.5,1.567,389.4,1.556,0.842,1198.23
128,512,1.0,1.421,394.6,1.410,0.845,1192.45
Column Descriptions:
  • chunk_size: Streaming chunk size (rows)
  • memory_limit_mb: Maximum memory constraint
  • compute_limit: CPU constraint factor (0.0-1.0)
  • preprocessing_latency_s: Total preprocessing time
  • peak_memory_mb: Maximum memory usage observed
  • training_time_s: Model training time (includes preprocessing)
  • model_accuracy_r2: Regression R² score
  • model_rmse: Root mean squared error

constraint_experiment_log.jsonl

JSON Lines format for programmatic analysis in output_dir/reports/:
{"chunk_size": 64, "memory_limit_mb": 256, "compute_limit": 0.5, "preprocessing_latency_s": 2.145, "peak_memory_mb": 248.3, "training_time_s": 2.134, "model_accuracy_r2": 0.821, "model_rmse": 1245.67}
{"chunk_size": 64, "memory_limit_mb": 256, "compute_limit": 1.0, "preprocessing_latency_s": 1.987, "peak_memory_mb": 251.7, "training_time_s": 1.976, "model_accuracy_r2": 0.824, "model_rmse": 1238.92}

Visualization Plots

Generated in output_dir/benchmarks/ (see engine.py:509-555):

latency_vs_accuracy.png

Scatter plot showing trade-off between preprocessing speed and model quality:
  • X-axis: Preprocessing latency (seconds)
  • Y-axis: Model accuracy (R²)
  • Color: Compute constraint level
plt.scatter(
    experiment_df['preprocessing_latency_s'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['compute_limit'],
    cmap='viridis',
)

memory_vs_accuracy.png

Memory consumption vs. model quality:
  • X-axis: Peak memory (MB)
  • Y-axis: Model accuracy (R²)
  • Color: Memory limit setting
plt.scatter(
    experiment_df['peak_memory_mb'],
    experiment_df['model_accuracy_r2'],
    c=experiment_df['memory_limit_mb'],
    cmap='plasma',
)

latency_memory_accuracy.png

Three-way relationship visualization:
  • X-axis: Peak memory (MB)
  • Y-axis: Preprocessing latency (seconds)
  • Color: Model accuracy (R²)
plt.scatter(
    experiment_df['peak_memory_mb'],
    experiment_df['preprocessing_latency_s'],
    c=experiment_df['model_accuracy_r2'],
    cmap='coolwarm',
)

Analyzing Results

Finding Optimal Configuration

import pandas as pd

# Load experiment results
df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Find best accuracy
best_accuracy = df.loc[df['model_accuracy_r2'].idxmax()]
print("Best accuracy configuration:")
print(f"  Chunk size: {best_accuracy['chunk_size']}")
print(f"  Memory limit: {best_accuracy['memory_limit_mb']} MB")
print(f"  Compute limit: {best_accuracy['compute_limit']}")
print(f"  R²: {best_accuracy['model_accuracy_r2']:.3f}")
print(f"  Latency: {best_accuracy['preprocessing_latency_s']:.3f}s")

# Find lowest latency
lowest_latency = df.loc[df['preprocessing_latency_s'].idxmin()]
print("\nLowest latency configuration:")
print(f"  Chunk size: {lowest_latency['chunk_size']}")
print(f"  Memory limit: {lowest_latency['memory_limit_mb']} MB")
print(f"  Compute limit: {lowest_latency['compute_limit']}")
print(f"  Latency: {lowest_latency['preprocessing_latency_s']:.3f}s")
print(f"  R²: {lowest_latency['model_accuracy_r2']:.3f}")

Pareto Frontier Analysis

Identify configurations that aren’t strictly dominated:
import pandas as pd
import numpy as np

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Multi-objective: minimize latency, maximize accuracy
df['neg_latency'] = -df['preprocessing_latency_s']  # Convert to maximization

def is_pareto_efficient(costs):
    """Find Pareto-efficient points."""
    is_efficient = np.ones(costs.shape[0], dtype=bool)
    for i, c in enumerate(costs):
        if is_efficient[i]:
            is_efficient[is_efficient] = np.any(costs[is_efficient] > c, axis=1)
            is_efficient[i] = True
    return is_efficient

costs = df[['neg_latency', 'model_accuracy_r2']].values
pareto_mask = is_pareto_efficient(costs)
pareto_df = df[pareto_mask]

print("Pareto-optimal configurations:")
print(pareto_df[['chunk_size', 'memory_limit_mb', 'compute_limit', 
                  'preprocessing_latency_s', 'model_accuracy_r2']])

Memory-Constrained Scenarios

Filter results by memory availability:
import pandas as pd

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Available memory: 256 MB
low_mem = df[df['memory_limit_mb'] == 256]

# Find best configuration within constraint
best_low_mem = low_mem.loc[low_mem['model_accuracy_r2'].idxmax()]
print(f"Best configuration with 256 MB:")
print(f"  Chunk size: {best_low_mem['chunk_size']}")
print(f"  Compute limit: {best_low_mem['compute_limit']}")
print(f"  R²: {best_low_mem['model_accuracy_r2']:.3f}")
print(f"  Peak usage: {best_low_mem['peak_memory_mb']:.1f} MB")

# Verify memory usage is within limit
if best_low_mem['peak_memory_mb'] > 256:
    print("WARNING: Configuration exceeded memory limit!")

CPU-Constrained Scenarios

import pandas as pd

df = pd.read_csv('artifacts/benchmarks/constraint_experiment.csv')

# Available compute: 50% (e.g., shared environment)
low_cpu = df[df['compute_limit'] == 0.5]

# Compare latencies
print("Low-CPU configurations:")
print(low_cpu.sort_values('preprocessing_latency_s')[[
    'chunk_size', 'memory_limit_mb', 'preprocessing_latency_s', 'model_accuracy_r2'
]])

Edge Scenarios

Low-Memory Systems

From the hardware profiling documentation: Recommendations:
  • Enable --spill-to-disk
  • Reduce --chunk-size
  • Keep --max-memory-mb realistic for resident process limits
Example:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_low_mem \
  --chunk-size 32 \
  --max-memory-mb 256 \
  --spill-to-disk \
  --adaptive-chunk-resize
Validation:
import pandas as pd

chunks = pd.read_csv('edge_low_mem/benchmarks/streaming_chunks.csv')
memory_violations = chunks[chunks['memory_after_mb'] > 256]

if len(memory_violations) > 0:
    print(f"Memory exceeded in {len(memory_violations)} chunks")
    print(f"Max usage: {chunks['memory_after_mb'].max():.1f} MB")
else:
    print("All chunks within memory limit")

CPU-Constrained Systems

Recommendations:
  • Lower --max-compute-units
  • Use smaller --batch-size
  • Keep --n-jobs 1 to avoid contention
Example:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_low_cpu \
  --max-compute-units 0.25 \
  --batch-size 64 \
  --n-jobs 1
Validation:
import json

with open('edge_low_cpu/reports/pipeline_report.json') as f:
    report = json.load(f)

latency = report['streaming']['latency_s']
throughput = report['streaming']['throughput_rows_s']

print(f"Latency: {latency:.3f}s")
print(f"Throughput: {throughput:.1f} rows/s")

Minimal Resource Scenario

Combined memory and CPU constraints:
python run_pipeline.py \
  --input ../data/nba2k-full.csv \
  --output-dir edge_minimal \
  --chunk-size 16 \
  --batch-size 32 \
  --max-memory-mb 128 \
  --max-compute-units 0.25 \
  --spill-to-disk \
  --adaptive-chunk-resize \
  --n-jobs 1

Experiment Summary

The experiment results include a summary dictionary (engine.py:407-412):
'summary': {
    'best_accuracy_r2': float(results_df['model_accuracy_r2'].max()),
    'lowest_latency_s': float(results_df['preprocessing_latency_s'].min()),
    'lowest_training_time_s': float(results_df['training_time_s'].min()),
    'max_peak_memory_mb': float(results_df['peak_memory_mb'].max()),
}
Access via JSON report:
import json

with open('artifacts/reports/pipeline_report.json') as f:
    report = json.load(f)

summary = report['constraint_experiment']['summary']
print(f"Best accuracy achieved: {summary['best_accuracy_r2']:.3f}")
print(f"Lowest latency achieved: {summary['lowest_latency_s']:.3f}s")
print(f"Max memory used: {summary['max_peak_memory_mb']:.1f} MB")

Parallel Execution

Constraint experiments can run in parallel (engine.py:397-402):
config = PipelineConfig(
    n_jobs=4,  # Use 4 parallel workers
    chunk_size=128,
    max_memory_mb=512
)

runner = RealTimePipelineRunner(config)
results = runner.run_constraint_experiment(data)
Note: Parallel execution may interfere with accurate resource measurements. For precise profiling, use n_jobs=1.

Best Practices

  1. Run experiments on representative data: Use production-scale samples
  2. Test edge cases separately: Minimal resource scenarios may need custom grids
  3. Validate constraints: Verify peak usage doesn’t exceed limits
  4. Document findings: Save experiment reports for comparison
  5. Use Pareto analysis: Identify optimal trade-offs, not just best single metric
  6. Consider deployment environment: Match constraints to target hardware

Limitations

  • Fixed parameter grid: Only tests predefined combinations
  • No hyperparameter tuning: Model parameters are fixed
  • Sequential dependencies: Each run is independent (no warm-up effects)
  • Coarse granularity: Limited to 2 values per parameter

Next Steps

Build docs developers (and LLMs) love