Skip to main content

Overview

The streaming engine orchestrates the entire NBA preprocessing pipeline in both batch and streaming modes. The RealTimePipelineRunner class provides adaptive resource management, performance profiling, and comprehensive benchmarking capabilities.

RealTimePipelineRunner Class

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:32

Initialization

class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig)
config
PipelineConfig
required
Pipeline configuration object containing:
  • random_seed: Reproducibility seed
  • chunk_size: Default rows per chunk
  • batch_size: Batch processing size
  • max_memory_mb: Memory limit
  • max_compute_units: CPU limit (0-1 scale)
  • benchmark_runs: Number of benchmark iterations
  • n_jobs: Parallel job count
  • adaptive_chunk_resize: Enable dynamic chunk sizing
  • max_chunk_retries: Max retry attempts for memory-constrained chunks
  • spill_to_disk: Enable disk spillover for large chunks
Initialization: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:33-40
def __init__(self, config: PipelineConfig):
    self.config = config
    set_global_seed(config.random_seed)
    self.ingestor = DataIngestor(config.random_seed)
    self.preprocessor = Preprocessor(config.random_seed)
    self.engineer = FeatureEngineer()
    self.validator = DataValidator()
    self.hardware = HardwareMonitor()
Example:
from pipeline.config import PipelineConfig
from pipeline.streaming import RealTimePipelineRunner

config = PipelineConfig(
    random_seed=42,
    chunk_size=1000,
    max_memory_mb=512,
    max_compute_units=0.8
)

runner = RealTimePipelineRunner(config)

Core Execution Methods

run_batch()

Executes the full pipeline on the entire dataset in memory. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:172
def run_batch(self, source: str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Data source (file path, Path object, or DataFrame)
Returns: Dictionary with batch execution metrics:
  • mode: “batch”
  • rows: Number of rows processed
  • latency_s: Total execution time
  • throughput_rows_s: Processing speed
  • peak_memory_mb: Maximum memory usage
  • energy_estimate_j: Estimated energy consumption
  • telemetry: Hardware metrics
  • model: Model training metrics (RMSE, R²)
Execution Flow: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:173-196
def run_batch(self, source) -> dict:
    # 1. Load full dataset
    df = self.ingestor.load(source)
    
    # 2. Start profiling
    start_snapshot = self.hardware.snapshot()
    tracemalloc.start()
    t0 = time.perf_counter()
    
    # 3. Process through pipeline
    X, y = self._process_df(df)
    
    # 4. Measure performance
    elapsed = time.perf_counter() - t0
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    end_snapshot = self.hardware.snapshot()
    
    # 5. Evaluate model
    throughput = len(df) / max(elapsed, 1e-9)
    model_metrics = self._evaluate_model(X, y)
    telemetry = self.hardware.compare(start_snapshot, end_snapshot)
    
    return {
        'mode': 'batch',
        'rows': len(df),
        'latency_s': elapsed,
        'throughput_rows_s': throughput,
        'peak_memory_mb': peak / (1024 * 1024),
        'energy_estimate_j': telemetry.get('rapl_energy_j', elapsed * 45.0),
        'telemetry': telemetry,
        'model': model_metrics
    }
Example:
runner = RealTimePipelineRunner(config)

result = runner.run_batch('data/nba_players.csv')

print(f"Processed {result['rows']:,} rows in {result['latency_s']:.2f}s")
print(f"Throughput: {result['throughput_rows_s']:,.0f} rows/sec")
print(f"Peak memory: {result['peak_memory_mb']:.1f} MB")
print(f"Model R²: {result['model']['r2']:.3f}")
Batch mode loads the entire dataset into memory. Use streaming mode for large datasets.

run_streaming()

Executes the pipeline in streaming mode with adaptive resource management. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:198
def run_streaming(
    self,
    source: str | Path | pd.DataFrame,
    chunk_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None
) -> dict
source
str | Path | pd.DataFrame
required
Data source to stream
chunk_size
int | None
default:"None"
Override default chunk size (uses config if None)
max_memory_mb
int | None
default:"None"
Override memory limit (uses config if None)
max_compute_units
float | None
default:"None"
Override compute limit (uses config if None)
Returns: Dictionary with streaming execution metrics:
  • mode: “streaming”
  • rows: Total rows processed
  • latency_s: Total execution time
  • throughput_rows_s: Average processing speed
  • peak_memory_mb: Maximum memory usage
  • energy_estimate_j: Total energy consumption
  • chunk_metrics: Per-chunk performance metrics
  • operator_profile_summary_s: Average time per pipeline stage
  • model: Online learning model metrics
Key Features:

1. Hardware-Adjusted Chunk Sizing

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:205-211
rows = len(source) if isinstance(source, pd.DataFrame) else len(self.ingestor.load(source))
batch_size, adjusted_chunk_size = self._hardware_adjusted_sizes(
    rows,
    chunk_size=chunk_size,
    max_memory_mb=max_memory_mb,
    max_compute_units=max_compute_units
)
Algorithm: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:42-60
def _hardware_adjusted_sizes(self, rows, chunk_size, batch_size, max_memory_mb, max_compute_units):
    chunk_base = self.config.chunk_size if chunk_size is None else chunk_size
    batch_base = self.config.batch_size if batch_size is None else batch_size
    memory_cap = self.config.max_memory_mb if max_memory_mb is None else max_memory_mb
    compute_cap = self.config.max_compute_units if max_compute_units is None else max_compute_units
    
    # Scale based on available resources
    memory_factor = max(0.1, min(1.0, memory_cap / 1024))    # More memory → larger chunks
    compute_factor = max(0.1, min(1.0, compute_cap))         # More CPU → larger chunks
    scale = memory_factor * compute_factor
    
    adjusted_batch = max(16, int(batch_base * scale))
    adjusted_chunk = max(16, int(chunk_base * scale))
    
    return min(adjusted_batch, rows), min(adjusted_chunk, rows)
Example:
Base chunk_size: 1000
Memory: 512 MB → memory_factor = 0.5
Compute: 0.8 → compute_factor = 0.8
Scale = 0.5 × 0.8 = 0.4
Adjusted chunk_size = max(16, 1000 × 0.4) = 400 rows

2. Adaptive Chunk Resizing

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:251-258
if memory_exceeded and self.config.adaptive_chunk_resize and retries < self.config.max_chunk_retries and len(chunk) > 16:
    retries += 1
    split = max(16, len(chunk) // 2)
    pending_chunks.insert(0, chunk.iloc[split:].copy())
    chunk = chunk.iloc[:split].copy()
    current_chunk_size = split
    time.sleep(min(0.05 * retries, 0.2))
    continue  # Retry with smaller chunk
Process:
  1. Detect memory limit exceeded
  2. Split chunk in half
  3. Queue second half for later processing
  4. Retry with first half
  5. Exponential backoff between retries

3. Online Learning with SGDRegressor

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:214-246
online_model = SGDRegressor(
    random_state=self.config.random_seed,
    max_iter=1,
    tol=None,
    learning_rate='invscaling'
)
online_seen = False
online_feature_cols = None

for chunk in self._iter_source_chunks(source, adjusted_chunk_size):
    X_chunk, y_chunk = self._process_stream_chunk(chunk, rolling_state)
    
    # Align feature columns across chunks
    if online_feature_cols is None:
        online_feature_cols = list(X_chunk.columns)
    else:
        for col in online_feature_cols:
            if col not in X_chunk.columns:
                X_chunk[col] = 0.0
        X_chunk = X_chunk.reindex(columns=online_feature_cols, fill_value=0.0)
    
    # Incremental learning
    if len(X_chunk) > 0:
        online_model.partial_fit(X_chunk.to_numpy(dtype=float), y_chunk.to_numpy(dtype=float))
        online_seen = True
Feature alignment ensures consistent feature dimensions across chunks, even if new categories appear.

4. Disk Spillover

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:260-266
spill_paths: dict[str, str] = {}
if self.config.spill_to_disk:
    x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
    y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
    X_chunk.to_csv(x_path, index=False)
    y_chunk.to_frame('salary').to_csv(y_path, index=False)
    spill_paths = {'X': str(x_path), 'y': str(y_path)}
Example:
config = PipelineConfig(
    chunk_size=500,
    max_memory_mb=256,
    adaptive_chunk_resize=True,
    max_chunk_retries=3,
    spill_to_disk=True
)

runner = RealTimePipelineRunner(config)

result = runner.run_streaming('large_dataset.csv')

print(f"Total latency: {result['latency_s']:.2f}s")
print(f"Chunks processed: {len(result['chunk_metrics'])}")
print(f"Avg chunk latency: {result['operator_profile_summary_s']['preprocess_s']:.4f}s")
print(f"Model R²: {result['model']['r2']:.3f}")

for chunk in result['chunk_metrics']:
    if chunk['memory_exceeded']:
        print(f"⚠️ Chunk {chunk['chunk_id']} exceeded memory (retries: {chunk['retries']})")

Benchmarking Methods

benchmark()

Runs multiple iterations of batch and streaming modes for statistical comparison. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:324
def benchmark(self, source: str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Dataset to benchmark
Returns: Dictionary containing:
  • runs: List of all run results
  • latency_batch: Bootstrap confidence intervals for batch latency
  • latency_streaming: Bootstrap confidence intervals for streaming latency
  • throughput_batch: Throughput statistics
  • throughput_streaming: Throughput statistics
  • significance: Statistical tests comparing modes
  • latency_vs_data_size: Scalability analysis
  • throughput_vs_memory: Resource efficiency
  • resource_vs_accuracy: Trade-off analysis
Statistical Methods:

Bootstrap Confidence Intervals

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:108-132
def _bootstrap_ci(self, arr: np.ndarray, n_bootstrap: int = 400) -> dict:
    rng = np.random.default_rng(self.config.random_seed)
    means = []
    for _ in range(n_bootstrap):
        sample = rng.choice(arr, size=len(arr), replace=True)
        means.append(float(sample.mean()))
    return {
        'sample_size': int(len(arr)),
        'mean': float(arr.mean()),
        'std': float(arr.std(ddof=0)),
        'median': float(np.median(arr)),
        'p95': float(np.percentile(arr, 95)),
        'ci95_low': float(np.percentile(means, 2.5)),
        'ci95_high': float(np.percentile(means, 97.5))
    }

Permutation Test (p-value)

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:134-147
def _permutation_pvalue(self, a: np.ndarray, b: np.ndarray, n_perm: int = 1000) -> float:
    rng = np.random.default_rng(self.config.random_seed)
    observed = abs(float(a.mean() - b.mean()))
    combined = np.concatenate([a, b])
    count = 0
    for _ in range(n_perm):
        shuffled = rng.permutation(combined)
        a_perm = shuffled[:len(a)]
        b_perm = shuffled[len(a):]
        if abs(float(a_perm.mean() - b_perm.mean())) >= observed:
            count += 1
    return float((count + 1) / (n_perm + 1))
Example:
runner = RealTimePipelineRunner(config)

bench = runner.benchmark('data/nba_players.csv')

print("Batch latency:")
print(f"  Mean: {bench['latency_batch']['mean']:.3f}s")
print(f"  95% CI: [{bench['latency_batch']['ci95_low']:.3f}, {bench['latency_batch']['ci95_high']:.3f}]")

print("\nStreaming latency:")
print(f"  Mean: {bench['latency_streaming']['mean']:.3f}s")
print(f"  95% CI: [{bench['latency_streaming']['ci95_low']:.3f}, {bench['latency_streaming']['ci95_high']:.3f}]")

print("\nStatistical significance:")
print(f"  p-value: {bench['significance']['latency_pvalue']:.4f}")
if bench['significance']['latency_pvalue'] < 0.05:
    print("  ✓ Difference is statistically significant")

run_constraint_experiment()

Tests pipeline performance under various resource constraints. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:389
def run_constraint_experiment(self, source: str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Dataset for constraint testing
Returns: Dictionary with constraint experiment results:
  • records: List of all constraint configurations and their results
  • summary: Best performance metrics across all configurations
Experiment Grid: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:392-395
chunk_sizes = [64, self.config.chunk_size]
memory_limits = [256, self.config.max_memory_mb]
compute_limits = [0.5, self.config.max_compute_units]
tasks = [(c, m, cp) for c in chunk_sizes for m in memory_limits for cp in compute_limits]
Parallel Execution:
if self.config.n_jobs > 1:
    experiment_rows = Parallel(n_jobs=self.config.n_jobs)(
        delayed(self._single_constraint_run)(source, c, m, cp) for c, m, cp in tasks
    )
else:
    experiment_rows = [self._single_constraint_run(source, c, m, cp) for c, m, cp in tasks]
Example:
runner = RealTimePipelineRunner(config)

experiment = runner.run_constraint_experiment('data/nba_players.csv')

print("Constraint experiment summary:")
print(f"  Best accuracy: {experiment['summary']['best_accuracy_r2']:.3f}")
print(f"  Lowest latency: {experiment['summary']['lowest_latency_s']:.3f}s")
print(f"  Max memory used: {experiment['summary']['max_peak_memory_mb']:.1f} MB")

print("\nAll configurations:")
for record in experiment['records']:
    print(f"  Chunk={record['chunk_size']}, Mem={record['memory_limit_mb']}MB, ")
    print(f"  Compute={record['compute_limit']:.1f} → R²={record['model_accuracy_r2']:.3f}, ")
    print(f"  Latency={record['preprocessing_latency_s']:.2f}s")

run_all()

Executes the complete pipeline with all benchmarks, experiments, and validation. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:415
def run_all(self, source: str | Path | pd.DataFrame) -> dict
source
str | Path | pd.DataFrame
required
Complete dataset to process
Returns: Comprehensive report containing:
  • dataset_fingerprint: SHA-256 hash and metadata
  • reproducibility: Environment and configuration details
  • batch: Batch mode results
  • streaming: Streaming mode results
  • benchmark: Statistical comparison
  • constraint_experiment: Resource constraint analysis
  • quality: Data quality and validation metrics
  • scaling: Parallelization info
Execution Flow: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:416-455
def run_all(self, source) -> dict:
    self.config.ensure_output_dirs()
    df = self.ingestor.load(source)
    fp = self.ingestor.fingerprint(df)
    
    # 1. Run both modes
    batch_report = self.run_batch(df)
    streaming_report = self.run_streaming(df)
    
    # 2. Benchmarking
    benchmark = self.benchmark(df)
    constraint_experiment = self.run_constraint_experiment(df)
    
    # 3. Validation
    cleaned = self.preprocessor.clean(df)
    outlier_mask = self.preprocessor.detect_outliers_iqr(
        cleaned.select_dtypes(include='number')
    )
    quality = self.validator.quality_report(cleaned, outlier_mask)
    drift_score = self.validator.drift_detection(
        cleaned, 
        cleaned.sample(frac=1.0, random_state=self.config.random_seed)
    )
    schema_ok, schema_issues = self.validator.schema_validation(
        df, 
        required_columns=['version', 'salary', 'b_day', 'draft_year']
    )
    feature_drift = self.validator.feature_wise_drift(
        cleaned.iloc[:len(cleaned)//2],
        cleaned.iloc[len(cleaned)//2:]
    )
    temporal_drift = self.validator.temporal_drift(df)
    
    # 4. Compile report
    report = {
        'dataset_fingerprint': asdict(fp),
        'reproducibility': self._reproducibility_manifest(),
        'batch': batch_report,
        'streaming': streaming_report,
        'benchmark': benchmark,
        'constraint_experiment': constraint_experiment,
        'quality': asdict(quality) | {
            'drift_score': drift_score,
            'schema_ok': schema_ok,
            'schema_issues': schema_issues,
            'feature_drift': feature_drift,
            'temporal_drift': temporal_drift
        },
        'scaling': {
            'n_jobs': self.config.n_jobs,
            'parallel_enabled': self.config.n_jobs > 1
        }
    }
    
    self._write_artifacts(report)
    return report
Example:
config = PipelineConfig(
    random_seed=42,
    chunk_size=1000,
    benchmark_runs=5,
    n_jobs=4,
    output_dir=Path('output')
)

runner = RealTimePipelineRunner(config)

report = runner.run_all('data/nba_players.csv')

print(f"Dataset: {report['dataset_fingerprint']['path']}")
print(f"Rows: {report['dataset_fingerprint']['rows']:,}")
print(f"SHA-256: {report['dataset_fingerprint']['sha256'][:16]}...")
print(f"\nBatch mode: {report['batch']['latency_s']:.2f}s")
print(f"Streaming mode: {report['streaming']['latency_s']:.2f}s")
print(f"\nQuality:")
print(f"  Outlier rate: {report['quality']['outlier_rate']:.2%}")
print(f"  Schema valid: {report['quality']['schema_ok']}")
print(f"  Drift score: {report['quality']['drift_score']:.3f}")

Output Artifacts

The pipeline generates comprehensive output files: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:473-509

Generated Files

output/
├── reports/
│   ├── pipeline_report.json         # Complete execution report
│   ├── streaming_chunks.jsonl       # Per-chunk metrics
│   └── constraint_experiment_log.jsonl
├── metadata/
│   └── run_manifest.json            # Reproducibility metadata
├── benchmarks/
│   ├── streaming_chunks.csv
│   ├── latency_vs_data_size.csv
│   ├── throughput_vs_memory.csv
│   ├── resource_vs_accuracy.csv
│   ├── significance_tests.csv
│   ├── constraint_experiment.csv
│   ├── latency_vs_accuracy.png
│   ├── memory_vs_accuracy.png
│   └── latency_memory_accuracy.png
├── profiles/
│   └── operator_profile.csv         # Stage-wise timing
└── intermediate/
    ├── stream_chunk_1_X.csv         # Spilled features (if enabled)
    └── stream_chunk_1_y.csv         # Spilled targets

Visualization

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:511-555 Three plots are generated:
  1. Latency vs Accuracy (colored by compute constraint)
  2. Memory vs Accuracy (colored by memory limit)
  3. Latency vs Memory vs Accuracy (3D trade-off)

Data Flow

Performance Optimization

Batch Mode Best For:

  • Small to medium datasets (<1GB)
  • High-memory environments
  • Maximum throughput required
  • No real-time constraints

Streaming Mode Best For:

  • Large datasets (>1GB)
  • Memory-constrained environments
  • Real-time processing
  • Incremental learning scenarios

Resource Management Tips

Right-Size Chunks

Start with chunk_size=1000 and adjust based on memory usage monitoring.

Enable Adaptive Resizing

Set adaptive_chunk_resize=True for automatic memory management.

Use Disk Spillover

Enable spill_to_disk=True for debugging and fault tolerance.

Parallelize Experiments

Set n_jobs > 1 to speed up constraint experiments and benchmarks.

Best Practices

1

Profile First

Run run_all() on a sample dataset to understand resource requirements.
2

Configure Constraints

Set max_memory_mb and max_compute_units based on production environment.
3

Benchmark Regularly

Use benchmark() to track performance regression over time.
4

Monitor Artifacts

Review generated CSVs and plots to identify bottlenecks.
5

Validate Reproducibility

Check run_manifest.json to ensure consistent results.

Next Steps

Ingestion

Learn about data loading and fingerprinting

Preprocessing

Explore data cleaning and transformation

Feature Engineering

Build derived features for ML models

Validation

Validate data quality and detect drift

Build docs developers (and LLMs) love