Overview
The NBA Data Preprocessing Pipeline prioritizes reproducibility through seeded random number generation, dataset fingerprinting, and comprehensive run manifests. All random operations are deterministic when a random_seed is provided.
Global Seed Management
The pipeline uses a global seed to ensure deterministic behavior across all random operations.
Setting the Global Seed
The set_global_seed function from pipeline/reproducibility.py initializes all random number generators:
import os
import random
import numpy as np
def set_global_seed ( seed : int ) -> None :
os.environ[ 'PYTHONHASHSEED' ] = str (seed)
random.seed(seed)
np.random.seed(seed)
This function is automatically called during RealTimePipelineRunner initialization.
Seeded Components
The following components use the configured random_seed:
Pipeline Components
Models
Validation
Statistical Tests
All major components receive the seed during initialization: class RealTimePipelineRunner :
def __init__ ( self , config : PipelineConfig):
set_global_seed(config.random_seed)
self .ingestor = DataIngestor(config.random_seed)
self .preprocessor = Preprocessor(config.random_seed)
self .engineer = FeatureEngineer()
self .validator = DataValidator()
Both batch and streaming models are seeded: Batch mode (LinearRegression):model = LinearRegression() # Deterministic
Streaming mode (SGDRegressor):online_model = SGDRegressor(
random_state = self .config.random_seed,
max_iter = 1 ,
tol = None ,
learning_rate = 'invscaling'
)
Data validation operations like sampling and drift detection: drift_score = self .validator.drift_detection(
cleaned,
cleaned.sample( frac = 1.0 , random_state = self .config.random_seed)
)
Bootstrap and permutation tests use seeded RNG: def _bootstrap_ci ( self , arr : np.ndarray, n_bootstrap : int = 400 ):
rng = np.random.default_rng( self .config.random_seed)
means = []
for _ in range (n_bootstrap):
sample = rng.choice(arr, size = len (arr), replace = True )
means.append( float (sample.mean()))
Dataset Fingerprinting
Every pipeline run computes a SHA-256 fingerprint of the input dataset to track data versions.
Fingerprint Generation
The DataIngestor computes fingerprints during ingestion:
df = self .ingestor.load(source)
fp = self .ingestor.fingerprint(df)
The fingerprint includes:
Dataset hash (SHA-256)
Row count
Column names
Data shape
Fingerprint in Reports
The fingerprint is included in all pipeline reports:
report = {
'dataset_fingerprint' : asdict(fp),
'reproducibility' : self ._reproducibility_manifest(),
# ... other results
}
Dataset fingerprinting is file-based, not registry-backed. For production use, consider implementing a centralized dataset registry.
Reproducibility Manifest
Each pipeline run generates a comprehensive reproducibility manifest.
Manifest Contents
The manifest captures all information needed to reproduce a run:
def _reproducibility_manifest ( self ) -> dict[ str , Any]:
return {
'random_seed' : self .config.random_seed,
'python_version' : sys.version.split()[ 0 ],
'platform' : platform.platform(),
'config' : {
'chunk_size' : self .config.chunk_size,
'batch_size' : self .config.batch_size,
'max_memory_mb' : self .config.max_memory_mb,
'max_compute_units' : self .config.max_compute_units,
'benchmark_runs' : self .config.benchmark_runs,
'n_jobs' : self .config.n_jobs,
'adaptive_chunk_resize' : self .config.adaptive_chunk_resize,
'max_chunk_retries' : self .config.max_chunk_retries,
'spill_to_disk' : self .config.spill_to_disk,
},
'dependencies' : {
'numpy' : np. __version__ ,
'pandas' : pd. __version__ ,
'matplotlib' : matplotlib. __version__ ,
},
}
Manifest Storage
The manifest is written to metadata/run_manifest.json:
def _write_artifacts ( self , report : dict ) -> None :
with (out / 'metadata' / 'run_manifest.json' ).open( 'w' , encoding = 'utf-8' ) as f:
json.dump(report[ 'reproducibility' ], f, indent = 2 )
Configuration Management
The PipelineConfig dataclass is frozen to prevent accidental modification:
@dataclass ( frozen = True )
class PipelineConfig :
"""Centralized configuration for deterministic pipeline runs."""
random_seed: int = 42
chunk_size: int = 128
batch_size: int = 256
# ... other parameters
Using frozen=True ensures the configuration is immutable after initialization, preventing drift during execution.
Creating Configurations
from pipeline.config import PipelineConfig
# Default configuration
config = PipelineConfig()
# Custom configuration
config = PipelineConfig(
random_seed = 123 ,
chunk_size = 256 ,
max_memory_mb = 2048 ,
)
Bootstrap Confidence Intervals
The pipeline uses bootstrap resampling to compute confidence intervals for benchmark metrics.
Implementation
def _bootstrap_ci ( self , arr : np.ndarray, n_bootstrap : int = 400 ) -> dict[ str , float ]:
if len (arr) == 0 :
return {
'sample_size' : 0 ,
'mean' : 0.0 ,
'std' : 0.0 ,
'median' : 0.0 ,
'p95' : 0.0 ,
'ci95_low' : 0.0 ,
'ci95_high' : 0.0 ,
}
rng = np.random.default_rng( self .config.random_seed)
means = []
for _ in range (n_bootstrap):
sample = rng.choice(arr, size = len (arr), replace = True )
means.append( float (sample.mean()))
return {
'sample_size' : int ( len (arr)),
'mean' : float (arr.mean()),
'std' : float (arr.std( ddof = 0 )),
'median' : float (np.median(arr)),
'p95' : float (np.percentile(arr, 95 )),
'ci95_low' : float (np.percentile(means, 2.5 )),
'ci95_high' : float (np.percentile(means, 97.5 )),
}
Usage in Benchmarks
def benchmark ( self , source : str | Path | pd.DataFrame) -> dict :
runs = []
for _ in range ( self .config.benchmark_runs):
runs.append({
'batch' : self .run_batch(source),
'streaming' : self .run_streaming(source)
})
batch_latencies = np.array([r[ 'batch' ][ 'latency_s' ] for r in runs], dtype = float )
stream_latencies = np.array([r[ 'streaming' ][ 'latency_s' ] for r in runs], dtype = float )
return {
'latency_batch' : self ._bootstrap_ci(batch_latencies),
'latency_streaming' : self ._bootstrap_ci(stream_latencies),
# ...
}
Permutation Tests
The pipeline uses permutation testing to assess statistical significance of performance differences.
Implementation
def _permutation_pvalue ( self , a : np.ndarray, b : np.ndarray, n_perm : int = 1000 ) -> float :
if len (a) == 0 or len (b) == 0 :
return 1.0
rng = np.random.default_rng( self .config.random_seed)
observed = abs ( float (a.mean() - b.mean()))
combined = np.concatenate([a, b])
count = 0
for _ in range (n_perm):
shuffled = rng.permutation(combined)
a_perm = shuffled[: len (a)]
b_perm = shuffled[ len (a):]
if abs ( float (a_perm.mean() - b_perm.mean())) >= observed:
count += 1
return float ((count + 1 ) / (n_perm + 1 ))
Significance Testing
Permutation tests are used to compare batch vs streaming modes:
return {
'significance' : {
'latency_pvalue' : self ._permutation_pvalue(batch_latencies, stream_latencies),
'throughput_pvalue' : self ._permutation_pvalue(batch_tp, stream_tp),
'latency_mean_delta_s' : float (stream_latencies.mean() - batch_latencies.mean()),
'throughput_mean_delta_rows_s' : float (stream_tp.mean() - batch_tp.mean()),
},
}
Results are saved to benchmarks/significance_tests.csv.
Reproducible Benchmarks
The benchmark_runs parameter controls the number of repeated runs for statistical stability.
Running Benchmarks
config = PipelineConfig(
random_seed = 42 ,
benchmark_runs = 10 , # Run each mode 10 times
)
runner = RealTimePipelineRunner(config)
benchmark_results = runner.benchmark( 'data/nba2k-full.csv' )
Benchmark Artifacts
Each benchmark run produces:
benchmarks/latency_vs_data_size.csv
benchmarks/throughput_vs_memory.csv
benchmarks/resource_vs_accuracy.csv
benchmarks/significance_tests.csv
benchmarks/latency_vs_accuracy.png
benchmarks/memory_vs_accuracy.png
benchmarks/latency_memory_accuracy.png
Latency vs Data Size
Throughput vs Memory
Resource vs Accuracy
Measures preprocessing latency across different dataset sizes: sizes = [ min (size_base, s) for s in ( 64 , 128 , 256 , size_base)]
latency_vs_size = []
for size in sizes:
sample = source.iloc[:size] if isinstance (source, pd.DataFrame) \
else self .ingestor.load(source).iloc[:size]
b = self .run_batch(sample)
latency_vs_size.append({ 'rows' : size, 'latency_s' : b[ 'latency_s' ]})
Correlates memory usage with throughput: throughput_vs_memory.append({
'peak_memory_mb' : b[ 'peak_memory_mb' ],
'throughput_rows_s' : b[ 'throughput_rows_s' ]
})
Tracks the relationship between resource usage and model quality: 'resource_vs_accuracy' : [
{
'mode' : r[ 'batch' ][ 'mode' ],
'peak_memory_mb' : r[ 'batch' ][ 'peak_memory_mb' ],
'r2' : r[ 'batch' ][ 'model' ][ 'r2' ],
}
for r in runs
]
Validating Reproducibility
To verify reproducibility, run the same configuration multiple times and compare artifacts.
Validation Command
cd "NBA Data Preprocessing/task"
python -m unittest discover -s test -p 'test_*.py'
Manual Validation
Run Pipeline Twice
config = PipelineConfig( random_seed = 42 , output_dir = Path( 'run1' ))
runner1 = RealTimePipelineRunner(config)
report1 = runner1.run_all( 'data/nba2k-full.csv' )
config = PipelineConfig( random_seed = 42 , output_dir = Path( 'run2' ))
runner2 = RealTimePipelineRunner(config)
report2 = runner2.run_all( 'data/nba2k-full.csv' )
Compare Dataset Fingerprints
assert report1[ 'dataset_fingerprint' ] == report2[ 'dataset_fingerprint' ]
Compare Model Metrics
assert abs (report1[ 'batch' ][ 'model' ][ 'r2' ] -
report2[ 'batch' ][ 'model' ][ 'r2' ]) < 1e-6
Compare Chunk Metrics
chunks1 = report1[ 'streaming' ][ 'chunk_metrics' ]
chunks2 = report2[ 'streaming' ][ 'chunk_metrics' ]
assert len (chunks1) == len (chunks2)
for c1, c2 in zip (chunks1, chunks2):
assert c1[ 'rows' ] == c2[ 'rows' ]
Best Practices
Always Set random_seed Explicitly set random_seed in all configurations, even for development.
Use Distinct output_dir Store each run’s artifacts in a separate directory to prevent overwriting.
Keep benchmark_runs Fixed Use the same benchmark_runs value when comparing different configurations.
Document Dependencies The manifest captures dependency versions, but maintain a requirements.txt for installation.
Example Reproducible Run
from pathlib import Path
from pipeline.config import PipelineConfig
from pipeline.streaming.engine import RealTimePipelineRunner
# Fixed seed and configuration
config = PipelineConfig(
random_seed = 42 ,
chunk_size = 128 ,
benchmark_runs = 5 ,
output_dir = Path( 'artifacts/run_20260304_143000' ),
)
# Run pipeline
runner = RealTimePipelineRunner(config)
report = runner.run_all( 'data/nba2k-full.csv' )
# Artifacts written to:
# - artifacts/run_20260304_143000/metadata/run_manifest.json
# - artifacts/run_20260304_143000/reports/pipeline_report.json
# - artifacts/run_20260304_143000/benchmarks/*.csv
Include a timestamp or run ID in output_dir to automatically organize multiple experimental runs.
Limitations
Dataset Versioning is File-Based
Current fingerprinting is based on file hashes, not a centralized registry. Consider implementing a dataset registry for production use.
Profiling is stage-level and chunk-level. Fine-grained kernel-level tracing is not included.
Energy Telemetry May Be Coarse
RAPL energy counters may be unavailable in containers or on non-Intel platforms. Fallback estimates are used but less accurate.
Timing Variance with n_jobs > 1
Parallel execution (n_jobs > 1) introduces timing variance. For strict reproducibility, use n_jobs=1.
Next Steps
Architecture Review the overall system architecture
Execution Modes Learn about batch vs streaming modes