Skip to main content

Overview

The NBA Data Preprocessing Pipeline is designed as a staged, deterministic system that supports both batch and streaming execution modes. The architecture prioritizes reproducibility, resource awareness, and resilience under constrained environments.

Pipeline Stages

The runtime is organized as a five-stage pipeline:
1

Ingestion

Load source data (CSV or DataFrame) and compute a SHA-256 dataset fingerprint for reproducibility tracking.
df = self.ingestor.load(source)
fp = self.ingestor.fingerprint(df)
2

Preprocessing

Parse and normalize fields, handle missing values, and detect outliers using IQR-based detection.
cleaned = self.preprocessor.clean(df)
outlier_mask = self.preprocessor.detect_outliers_iqr(
    cleaned.select_dtypes(include='number')
)
3

Feature Engineering

Derive temporal and rolling features, then remove high-correlation inputs to reduce multicollinearity.
featured = self.engineer.build_features(cleaned)
filtered = self.engineer.drop_multicollinearity(featured)
X, y = self.engineer.encode_and_scale(filtered)
4

Validation

Generate schema validation, quality reports, and drift detection checks.
schema_ok, schema_issues = self.validator.schema_validation(
    df, required_columns=['version', 'salary', 'b_day', 'draft_year']
)
drift_score = self.validator.drift_detection(cleaned, reference_data)
5

Evaluation and Reporting

Train and evaluate models, run constraint experiments, and write benchmark artifacts.

System Components

Core Modules

The pipeline is built from modular components defined in NBA Data Preprocessing/task/pipeline/:
  • DataIngestor (ingestion.py): Loads CSV files and computes dataset fingerprints
  • Preprocessor (preprocessing.py): Cleans data and detects outliers
  • FeatureEngineer (feature_engineering.py): Builds features for both batch and streaming modes
  • DataValidator (validation.py): Schema validation, drift detection, and quality reporting
  • HardwareMonitor (hardware.py): CPU, memory, and energy telemetry

Streaming Engine

The RealTimePipelineRunner in streaming/engine.py orchestrates both execution modes:
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        self.config = config
        set_global_seed(config.random_seed)
        self.ingestor = DataIngestor(config.random_seed)
        self.preprocessor = Preprocessor(config.random_seed)
        self.engineer = FeatureEngineer()
        self.validator = DataValidator()
        self.hardware = HardwareMonitor()

Resource-Aware Runtime

The pipeline includes adaptive mechanisms for resource-constrained execution:

Adaptive Chunk Sizing

Chunk and batch sizes are dynamically adjusted based on memory and compute constraints:
def _hardware_adjusted_sizes(
    self,
    rows: int,
    chunk_size: int | None = None,
    max_memory_mb: int | None = None,
    max_compute_units: float | None = None,
) -> tuple[int, int]:
    memory_factor = max(0.1, min(1.0, memory_cap / 1024))
    compute_factor = max(0.1, min(1.0, compute_cap))
    scale = memory_factor * compute_factor
    adjusted_batch = max(16, int(batch_base * scale))
    adjusted_chunk = max(16, int(chunk_base * scale))
    return min(adjusted_batch, rows), min(adjusted_chunk, rows)

Memory Monitoring

Each chunk is monitored for memory usage, with automatic retry and splitting logic:
mem_before = self.hardware.process_memory_mb()
# Process chunk
mem_after = self.hardware.process_memory_mb()
memory_exceeded = mem_after > max_memory

if memory_exceeded and self.config.adaptive_chunk_resize:
    split = max(16, len(chunk) // 2)
    pending_chunks.insert(0, chunk.iloc[split:].copy())
    chunk = chunk.iloc[:split].copy()

Spill-to-Disk

Optional disk spilling enables processing beyond available RAM:
if self.config.spill_to_disk:
    x_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_X.csv'
    y_path = self.config.output_dir / 'intermediate' / f'stream_chunk_{chunk_id}_y.csv'
    X_chunk.to_csv(x_path, index=False)
    y_chunk.to_frame('salary').to_csv(y_path, index=False)

Architecture Diagram

The following diagram illustrates the complete pipeline flow:

Design Trade-offs

The architecture makes intentional trade-offs to prioritize specific goals:
Seeded random number generation and fixed artifact naming improve reproducibility but can reduce peak processing speed.
Dynamic chunk adjustment reduces memory failures but adds per-chunk control overhead.
Constrained devices can complete runs by spilling intermediate results to disk, but I/O amplification increases wall-clock time.
Using n_jobs > 1 accelerates constraint sweeps while introducing timing variance.

Configuration

All pipeline behavior is controlled through PipelineConfig:
@dataclass(frozen=True)
class PipelineConfig:
    random_seed: int = 42
    chunk_size: int = 128
    batch_size: int = 256
    n_jobs: int = 1
    max_memory_mb: int = 1024
    max_compute_units: float = 1.0
    benchmark_runs: int = 5
    adaptive_chunk_resize: bool = True
    max_chunk_retries: int = 3
    spill_to_disk: bool = False
    output_dir: Path = field(default_factory=lambda: Path('artifacts'))
Configuration templates are available in configs/pipeline.edge.template.json and configs/pipeline.server.template.json.

Next Steps

Execution Modes

Learn about batch vs streaming execution

Resource Constraints

Understand resource management

Build docs developers (and LLMs) love