Skip to main content

Overview

The NBA Data Preprocessing Pipeline prioritizes reproducibility through seeded random number generation, dataset fingerprinting, and comprehensive run manifests. All random operations are deterministic when a random_seed is provided.

Global Seed Management

The pipeline uses a global seed to ensure deterministic behavior across all random operations.

Setting the Global Seed

The set_global_seed function from pipeline/reproducibility.py initializes all random number generators:
import os
import random
import numpy as np

def set_global_seed(seed: int) -> None:
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
This function is automatically called during RealTimePipelineRunner initialization.

Seeded Components

The following components use the configured random_seed:
All major components receive the seed during initialization:
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        set_global_seed(config.random_seed)
        self.ingestor = DataIngestor(config.random_seed)
        self.preprocessor = Preprocessor(config.random_seed)
        self.engineer = FeatureEngineer()
        self.validator = DataValidator()

Dataset Fingerprinting

Every pipeline run computes a SHA-256 fingerprint of the input dataset to track data versions.

Fingerprint Generation

The DataIngestor computes fingerprints during ingestion:
df = self.ingestor.load(source)
fp = self.ingestor.fingerprint(df)
The fingerprint includes:
  • Dataset hash (SHA-256)
  • Row count
  • Column names
  • Data shape

Fingerprint in Reports

The fingerprint is included in all pipeline reports:
report = {
    'dataset_fingerprint': asdict(fp),
    'reproducibility': self._reproducibility_manifest(),
    # ... other results
}
Dataset fingerprinting is file-based, not registry-backed. For production use, consider implementing a centralized dataset registry.

Reproducibility Manifest

Each pipeline run generates a comprehensive reproducibility manifest.

Manifest Contents

The manifest captures all information needed to reproduce a run:
def _reproducibility_manifest(self) -> dict[str, Any]:
    return {
        'random_seed': self.config.random_seed,
        'python_version': sys.version.split()[0],
        'platform': platform.platform(),
        'config': {
            'chunk_size': self.config.chunk_size,
            'batch_size': self.config.batch_size,
            'max_memory_mb': self.config.max_memory_mb,
            'max_compute_units': self.config.max_compute_units,
            'benchmark_runs': self.config.benchmark_runs,
            'n_jobs': self.config.n_jobs,
            'adaptive_chunk_resize': self.config.adaptive_chunk_resize,
            'max_chunk_retries': self.config.max_chunk_retries,
            'spill_to_disk': self.config.spill_to_disk,
        },
        'dependencies': {
            'numpy': np.__version__,
            'pandas': pd.__version__,
            'matplotlib': matplotlib.__version__,
        },
    }

Manifest Storage

The manifest is written to metadata/run_manifest.json:
def _write_artifacts(self, report: dict) -> None:
    with (out / 'metadata' / 'run_manifest.json').open('w', encoding='utf-8') as f:
        json.dump(report['reproducibility'], f, indent=2)

Configuration Management

The PipelineConfig dataclass is frozen to prevent accidental modification:
@dataclass(frozen=True)
class PipelineConfig:
    """Centralized configuration for deterministic pipeline runs."""
    
    random_seed: int = 42
    chunk_size: int = 128
    batch_size: int = 256
    # ... other parameters
Using frozen=True ensures the configuration is immutable after initialization, preventing drift during execution.

Creating Configurations

from pipeline.config import PipelineConfig

# Default configuration
config = PipelineConfig()

# Custom configuration
config = PipelineConfig(
    random_seed=123,
    chunk_size=256,
    max_memory_mb=2048,
)

Bootstrap Confidence Intervals

The pipeline uses bootstrap resampling to compute confidence intervals for benchmark metrics.

Implementation

def _bootstrap_ci(self, arr: np.ndarray, n_bootstrap: int = 400) -> dict[str, float]:
    if len(arr) == 0:
        return {
            'sample_size': 0,
            'mean': 0.0,
            'std': 0.0,
            'median': 0.0,
            'p95': 0.0,
            'ci95_low': 0.0,
            'ci95_high': 0.0,
        }
    
    rng = np.random.default_rng(self.config.random_seed)
    means = []
    for _ in range(n_bootstrap):
        sample = rng.choice(arr, size=len(arr), replace=True)
        means.append(float(sample.mean()))
    
    return {
        'sample_size': int(len(arr)),
        'mean': float(arr.mean()),
        'std': float(arr.std(ddof=0)),
        'median': float(np.median(arr)),
        'p95': float(np.percentile(arr, 95)),
        'ci95_low': float(np.percentile(means, 2.5)),
        'ci95_high': float(np.percentile(means, 97.5)),
    }

Usage in Benchmarks

def benchmark(self, source: str | Path | pd.DataFrame) -> dict:
    runs = []
    for _ in range(self.config.benchmark_runs):
        runs.append({
            'batch': self.run_batch(source),
            'streaming': self.run_streaming(source)
        })

    batch_latencies = np.array([r['batch']['latency_s'] for r in runs], dtype=float)
    stream_latencies = np.array([r['streaming']['latency_s'] for r in runs], dtype=float)
    
    return {
        'latency_batch': self._bootstrap_ci(batch_latencies),
        'latency_streaming': self._bootstrap_ci(stream_latencies),
        # ...
    }

Permutation Tests

The pipeline uses permutation testing to assess statistical significance of performance differences.

Implementation

def _permutation_pvalue(self, a: np.ndarray, b: np.ndarray, n_perm: int = 1000) -> float:
    if len(a) == 0 or len(b) == 0:
        return 1.0
    
    rng = np.random.default_rng(self.config.random_seed)
    observed = abs(float(a.mean() - b.mean()))
    combined = np.concatenate([a, b])
    
    count = 0
    for _ in range(n_perm):
        shuffled = rng.permutation(combined)
        a_perm = shuffled[:len(a)]
        b_perm = shuffled[len(a):]
        if abs(float(a_perm.mean() - b_perm.mean())) >= observed:
            count += 1
    
    return float((count + 1) / (n_perm + 1))

Significance Testing

Permutation tests are used to compare batch vs streaming modes:
return {
    'significance': {
        'latency_pvalue': self._permutation_pvalue(batch_latencies, stream_latencies),
        'throughput_pvalue': self._permutation_pvalue(batch_tp, stream_tp),
        'latency_mean_delta_s': float(stream_latencies.mean() - batch_latencies.mean()),
        'throughput_mean_delta_rows_s': float(stream_tp.mean() - batch_tp.mean()),
    },
}
Results are saved to benchmarks/significance_tests.csv.

Reproducible Benchmarks

The benchmark_runs parameter controls the number of repeated runs for statistical stability.

Running Benchmarks

config = PipelineConfig(
    random_seed=42,
    benchmark_runs=10,  # Run each mode 10 times
)

runner = RealTimePipelineRunner(config)
benchmark_results = runner.benchmark('data/nba2k-full.csv')

Benchmark Artifacts

Each benchmark run produces:
  • benchmarks/latency_vs_data_size.csv
  • benchmarks/throughput_vs_memory.csv
  • benchmarks/resource_vs_accuracy.csv
  • benchmarks/significance_tests.csv
  • benchmarks/latency_vs_accuracy.png
  • benchmarks/memory_vs_accuracy.png
  • benchmarks/latency_memory_accuracy.png
Measures preprocessing latency across different dataset sizes:
sizes = [min(size_base, s) for s in (64, 128, 256, size_base)]
latency_vs_size = []
for size in sizes:
    sample = source.iloc[:size] if isinstance(source, pd.DataFrame) \
             else self.ingestor.load(source).iloc[:size]
    b = self.run_batch(sample)
    latency_vs_size.append({'rows': size, 'latency_s': b['latency_s']})

Validating Reproducibility

To verify reproducibility, run the same configuration multiple times and compare artifacts.

Validation Command

cd "NBA Data Preprocessing/task"
python -m unittest discover -s test -p 'test_*.py'

Manual Validation

1

Run Pipeline Twice

config = PipelineConfig(random_seed=42, output_dir=Path('run1'))
runner1 = RealTimePipelineRunner(config)
report1 = runner1.run_all('data/nba2k-full.csv')

config = PipelineConfig(random_seed=42, output_dir=Path('run2'))
runner2 = RealTimePipelineRunner(config)
report2 = runner2.run_all('data/nba2k-full.csv')
2

Compare Dataset Fingerprints

assert report1['dataset_fingerprint'] == report2['dataset_fingerprint']
3

Compare Model Metrics

assert abs(report1['batch']['model']['r2'] - 
           report2['batch']['model']['r2']) < 1e-6
4

Compare Chunk Metrics

chunks1 = report1['streaming']['chunk_metrics']
chunks2 = report2['streaming']['chunk_metrics']

assert len(chunks1) == len(chunks2)
for c1, c2 in zip(chunks1, chunks2):
    assert c1['rows'] == c2['rows']

Best Practices

Always Set random_seed

Explicitly set random_seed in all configurations, even for development.

Use Distinct output_dir

Store each run’s artifacts in a separate directory to prevent overwriting.

Keep benchmark_runs Fixed

Use the same benchmark_runs value when comparing different configurations.

Document Dependencies

The manifest captures dependency versions, but maintain a requirements.txt for installation.

Example Reproducible Run

from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner

# Fixed seed and configuration
config = PipelineConfig(
    random_seed=42,
    chunk_size=128,
    benchmark_runs=5,
    output_dir=Path('artifacts/run_20260304_143000'),
)

# Run pipeline
runner = RealTimePipelineRunner(config)
report = runner.run_all('data/nba2k-full.csv')

# Artifacts written to:
# - artifacts/run_20260304_143000/metadata/run_manifest.json
# - artifacts/run_20260304_143000/reports/pipeline_report.json
# - artifacts/run_20260304_143000/benchmarks/*.csv
Include a timestamp or run ID in output_dir to automatically organize multiple experimental runs.

Limitations

Current fingerprinting is based on file hashes, not a centralized registry. Consider implementing a dataset registry for production use.
Profiling is stage-level and chunk-level. Fine-grained kernel-level tracing is not included.
RAPL energy counters may be unavailable in containers or on non-Intel platforms. Fallback estimates are used but less accurate.
Parallel execution (n_jobs > 1) introduces timing variance. For strict reproducibility, use n_jobs=1.

Next Steps

Architecture

Review the overall system architecture

Execution Modes

Learn about batch vs streaming modes

Build docs developers (and LLMs) love