Skip to main content

Overview

The NBA Data Preprocessing Pipeline supports two execution modes, each optimized for different scenarios:
  • Batch mode: Full-dataset preprocessing with in-memory processing
  • Streaming mode: Chunked processing with adaptive resource management
Both modes use the same preprocessing logic and produce comparable model quality, but differ in memory footprint, latency, and throughput characteristics.

Batch Mode

Batch mode processes the entire dataset in memory as a single operation.

When to Use Batch Mode

Server Environments

High-memory systems with 4GB+ RAM available

Small Datasets

Datasets that comfortably fit in memory

Minimum Latency

When processing speed is the primary concern

Offline Processing

Non-real-time batch ETL pipelines

Implementation

Batch mode is implemented in the run_batch method:
def run_batch(self, source: str | Path | pd.DataFrame) -> dict:
    df = self.ingestor.load(source)
    start_snapshot = self.hardware.snapshot()
    tracemalloc.start()
    t0 = time.perf_counter()
    
    # Single-pass processing
    X, y = self._process_df(df)
    
    elapsed = time.perf_counter() - t0
    _, peak = tracemalloc.get_traced_memory()
    tracemalloc.stop()
    end_snapshot = self.hardware.snapshot()
    
    throughput = len(df) / max(elapsed, 1e-9)
    model_metrics = self._evaluate_model(X, y)
    telemetry = self.hardware.compare(start_snapshot, end_snapshot)
    
    return {
        'mode': 'batch',
        'rows': len(df),
        'latency_s': elapsed,
        'throughput_rows_s': throughput,
        'peak_memory_mb': peak / (1024 * 1024),
        'model': model_metrics,
    }

Processing Pipeline

Batch mode executes a linear pipeline:
def _process_df(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.Series]:
    cleaned = self.preprocessor.clean(df)
    featured = self.engineer.build_features(cleaned)
    filtered = self.engineer.drop_multicollinearity(featured)
    X, y = self.engineer.encode_and_scale(filtered)
    return X, y

Characteristics

MetricBatch Mode
Memory UsageHigh (entire dataset in RAM)
LatencyLowest
ThroughputHighest
Model TrainingOffline LinearRegression with train/test split
Failure ModeOOM on large datasets

Streaming Mode

Streaming mode processes data in configurable chunks with adaptive resource management.

When to Use Streaming Mode

Edge Devices

Resource-constrained devices (IoT, embedded systems)

Large Datasets

Datasets larger than available memory

Memory Limits

Strict memory constraints (< 1GB RAM)

Online Learning

Real-time incremental model updates

Implementation

Streaming mode is implemented in the run_streaming method:
def run_streaming(
    self,
    source: str | Path | pd.DataFrame,
    chunk_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> dict:
    # Adjust chunk size based on resource constraints
    batch_size, adjusted_chunk_size = self._hardware_adjusted_sizes(
        rows,
        chunk_size=chunk_size,
        max_memory_mb=max_memory_mb,
        max_compute_units=max_compute_units,
    )
    
    # Initialize online learning model
    online_model = SGDRegressor(
        random_state=self.config.random_seed,
        max_iter=1,
        tol=None,
        learning_rate='invscaling'
    )
    
    rolling_state = self.engineer.init_rolling_state()
    
    # Process chunks iteratively
    for raw_chunk in self._iter_source_chunks(source, adjusted_chunk_size):
        X_chunk, y_chunk, operator_profile = self._profile_stream_chunk(
            chunk, rolling_state
        )
        
        # Incremental model update
        if len(X_chunk) > 0:
            online_model.partial_fit(
                X_chunk.to_numpy(dtype=float),
                y_chunk.to_numpy(dtype=float)
            )

Chunk Processing Pipeline

Each chunk goes through a profiled pipeline:
def _profile_stream_chunk(
    self, chunk: pd.DataFrame, rolling_state: Any
) -> tuple[pd.DataFrame, pd.Series, dict[str, float]]:
    stage_start = time.perf_counter()
    cleaned = self.preprocessor.clean(chunk)
    preprocess_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    featured = self.engineer.build_features_streaming(cleaned, rolling_state)
    feature_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    filtered = self.engineer.drop_multicollinearity(featured)
    select_s = time.perf_counter() - stage_start

    stage_start = time.perf_counter()
    x_chunk, y_chunk = self.engineer.encode_and_scale(filtered)
    encode_s = time.perf_counter() - stage_start

    return x_chunk, y_chunk, {
        'preprocess_s': float(preprocess_s),
        'feature_engineering_s': float(feature_s),
        'feature_selection_s': float(select_s),
        'encode_scale_s': float(encode_s),
    }

Chunk Iteration

The pipeline supports both DataFrame and CSV streaming:
def _iter_source_chunks(
    self, source: str | Path | pd.DataFrame, chunk_size: int
) -> Iterator[pd.DataFrame]:
    if isinstance(source, pd.DataFrame):
        for start in range(0, len(source), chunk_size):
            yield source.iloc[start : start + chunk_size].copy()
        return

    for chunk in pd.read_csv(source, chunksize=chunk_size):
        yield chunk.copy()

Characteristics

MetricStreaming Mode
Memory UsageLow (controlled by chunk_size)
LatencyHigher (chunk overhead)
ThroughputLower (incremental processing)
Model TrainingOnline SGDRegressor with partial_fit
Failure ModeGraceful degradation with adaptive resizing

Chunk-Level Telemetry

Streaming mode captures detailed per-chunk metrics:
chunk_metrics.append({
    'chunk_id': chunk_id,
    'rows': len(chunk),
    'latency_s': elapsed,
    'throughput_rows_s': len(chunk) / max(elapsed, 1e-9),
    'batch_size': batch_size,
    'chunk_size': current_chunk_size,
    'memory_before_mb': mem_before,
    'memory_after_mb': mem_after,
    'memory_exceeded': memory_exceeded,
    'retries': retries,
    'spill_paths': spill_paths,
    'operator_profile_s': operator_profile,
})
These metrics are written to benchmarks/streaming_chunks.csv and reports/streaming_chunks.jsonl.

Comparison

Performance Trade-offs

Batch mode has lower latency because it avoids per-chunk overhead and scheduler costs.Typical batch latency: 0.5-2s for 1000 rowsTypical streaming latency: 1-4s for 1000 rows (depends on chunk size)

Running Each Mode

Batch Execution

from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

config = PipelineConfig(
    random_seed=42,
    max_memory_mb=4096,  # High memory available
)

runner = RealTimePipelineRunner(config)
results = runner.run_batch('data/nba2k-full.csv')

print(f"Latency: {results['latency_s']:.2f}s")
print(f"Peak memory: {results['peak_memory_mb']:.1f}MB")
print(f"R²: {results['model']['r2']:.3f}")

Streaming Execution

config = PipelineConfig(
    random_seed=42,
    chunk_size=64,
    max_memory_mb=512,  # Limited memory
    adaptive_chunk_resize=True,
)

runner = RealTimePipelineRunner(config)
results = runner.run_streaming('data/nba2k-full.csv')

print(f"Latency: {results['latency_s']:.2f}s")
print(f"Peak memory: {results['peak_memory_mb']:.1f}MB")
print(f"Chunks processed: {len(results['chunk_metrics'])}")
print(f"R²: {results['model']['r2']:.3f}")
When using streaming mode, ensure chunk_size is set appropriately for your memory constraints. Smaller chunks reduce memory usage but increase overhead.

Next Steps

Resource Constraints

Learn about adaptive resource management

Reproducibility

Understand deterministic execution

Build docs developers (and LLMs) love