Overview
The validation stage ensures data quality throughout the pipeline. The DataValidator class provides comprehensive quality reporting, schema validation, and drift detection to catch issues before they affect downstream models.
DataValidator Class
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:20
class DataValidator :
# No initialization parameters - stateless validation methods
Core Methods
quality_report()
Generates comprehensive data quality metrics.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:21
def quality_report (
self ,
df : pd.DataFrame,
outlier_mask : pd.Series
) -> ValidationReport
Cleaned DataFrame to assess
Boolean Series indicating outlier rows (from Preprocessor.detect_outliers_iqr())
Returns: ValidationReport object containing:
missing_ratio (dict): Proportion of missing values per column
outlier_rate (float): Proportion of rows flagged as outliers
drift_score (float): Reserved for drift analysis
schema_ok (bool): Schema validation status
schema_issues (list[str]): List of schema problems
feature_drift (dict): Feature-wise drift metrics
temporal_drift (dict): Time-based drift metrics
Implementation:
missing_ratio = (df.isna().sum() / max ( len (df), 1 )).to_dict()
outlier_rate = float (outlier_mask.mean()) if len (outlier_mask) else 0.0
return ValidationReport(
missing_ratio = missing_ratio,
outlier_rate = outlier_rate,
drift_score = 0.0 ,
schema_ok = True ,
schema_issues = [],
feature_drift = {},
temporal_drift = {}
)
Example:
from pipeline.validation import DataValidator
from pipeline.preprocessing import Preprocessor
import pandas as pd
validator = DataValidator()
preprocessor = Preprocessor()
df = pd.DataFrame({
'salary' : [ 1000000 , None , 3000000 ],
'age' : [ 25 , 27 , None ],
'height' : [ 1.98 , 2.10 , 2.01 ]
})
# Detect outliers first
outlier_mask = preprocessor.detect_outliers_iqr(df)
# Generate quality report
report = validator.quality_report(df, outlier_mask)
print ( f "Missing ratios: { report.missing_ratio } " )
# {'salary': 0.333, 'age': 0.333, 'height': 0.0}
print ( f "Outlier rate: { report.outlier_rate :.2%} " )
# 0.00% (if no outliers detected)
Missing ratio is calculated before imputation. Always generate quality reports on cleaned but not imputed data.
schema_validation()
Validates that the DataFrame conforms to expected schema requirements.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:34
def schema_validation (
self ,
df : pd.DataFrame,
required_columns : list[ str ]
) -> tuple[ bool , list[ str ]]
List of column names that must be present
Returns: Tuple of (schema_ok, issues) where:
schema_ok (bool): True if no issues found
issues (list[str]): List of validation error messages
Validation Checks:
1. Missing Columns
issues = []
missing_cols = [c for c in required_columns if c not in df.columns]
if missing_cols:
issues.append( f 'missing_columns: { missing_cols } ' )
Example:
required_columns = ['salary', 'age', 'height']
df.columns = ['salary', 'age'] # 'height' missing
issues = ['missing_columns:[\'height\']']
2. Salary Column Type Check
if 'salary' in df.columns:
numeric_salary = pd.to_numeric(
df[ 'salary' ].astype( str ).str.replace( '$' , '' , regex = False ),
errors = 'coerce'
)
if numeric_salary.isna().all():
issues.append( 'salary_column_not_numeric' )
Example:
df['salary'] = ['invalid', 'text', 'values']
→ issues.append('salary_column_not_numeric')
Complete Example:
validator = DataValidator()
df = pd.DataFrame({
'salary' : [ '$1000000' , '$2000000' ],
'age' : [ 25 , 27 ]
})
schema_ok, issues = validator.schema_validation(
df,
required_columns = [ 'salary' , 'age' , 'height' ]
)
print ( f "Schema valid: { schema_ok } " )
# False
print ( f "Issues: { issues } " )
# ['missing_columns:[\'height\']']
Always validate schema before preprocessing to catch data pipeline issues early.
drift_detection()
Detects distribution shifts between baseline and candidate datasets using the Kolmogorov-Smirnov test.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:45
def drift_detection (
self ,
baseline : pd.DataFrame,
candidate : pd.DataFrame,
numeric_col : str = 'salary'
) -> float
Reference dataset (e.g., training data)
New dataset to compare (e.g., production data)
Numeric column to compare distributions
Returns: KS statistic (float in [0, 1])
0.0 : Identical distributions
~0.05 : Minimal drift
~0.2 : Moderate drift
>0.5 : Severe drift
Algorithm (Kolmogorov-Smirnov Test):
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:46-58
# Extract and clean numeric values
base_series = pd.to_numeric(
baseline[numeric_col].astype( str ).str.replace( '$' , '' , regex = False ),
errors = 'coerce'
).dropna()
cand_series = pd.to_numeric(
candidate[numeric_col].astype( str ).str.replace( '$' , '' , regex = False ),
errors = 'coerce'
).dropna()
base = base_series.to_numpy( dtype = float )
cand = cand_series.to_numpy( dtype = float )
if len (base) == 0 or len (cand) == 0 :
return 0.0
# Compute empirical CDFs
base_sorted = np.sort(base)
cand_sorted = np.sort(cand)
values = np.unique(np.concatenate([base_sorted, cand_sorted]))
base_cdf = np.searchsorted(base_sorted, values, side = 'right' ) / len (base_sorted)
cand_cdf = np.searchsorted(cand_sorted, values, side = 'right' ) / len (cand_sorted)
# Maximum absolute difference between CDFs
return float (np.max(np.abs(base_cdf - cand_cdf)))
Visual Explanation:
Cumulative Distribution Functions (CDFs):
1.0 ┤ ╭─────────
│ ╭─┘ candidate
│ ╭─┘
0.5 ┤ ╭────┘
│ ╭────┘ baseline
│ ╭──┘
0.0 ┤─┘
└─────────────────────────────────
salary →
KS statistic = max vertical distance between curves
Example:
validator = DataValidator()
baseline_df = pd.DataFrame({
'salary' : [ '$1000000' , '$2000000' , '$3000000' ]
})
candidate_df = pd.DataFrame({
'salary' : [ '$5000000' , '$6000000' , '$7000000' ] # Higher salaries
})
drift_score = validator.drift_detection(baseline_df, candidate_df)
print ( f "Drift score: { drift_score :.3f} " )
# 1.000 (complete distribution shift)
if drift_score > 0.2 :
print ( "⚠️ Significant drift detected! Retrain model." )
Use KS drift detection to monitor model inputs over time. Set alerts for drift scores above 0.2.
feature_wise_drift()
Computes drift metrics for all shared columns between datasets.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:60
def feature_wise_drift (
self ,
baseline : pd.DataFrame,
candidate : pd.DataFrame
) -> dict
Returns: Dictionary mapping column names to drift metrics
For Numeric Columns:
if pd.api.types.is_numeric_dtype(baseline[col]):
result[col] = {
'type' : 'numeric' ,
'ks_like' : self .drift_detection(baseline, candidate, numeric_col = col),
'mean_shift' : float (candidate[col].mean() - baseline[col].mean())
}
For Categorical Columns:
else :
b_dist = baseline[col].value_counts( normalize = True )
c_dist = candidate[col].value_counts( normalize = True )
cats = sorted ( set (b_dist.index) | set (c_dist.index))
l1 = sum ( abs (b_dist.get(cat, 0.0 ) - c_dist.get(cat, 0.0 )) for cat in cats)
result[col] = { 'type' : 'categorical' , 'l1_drift' : l1}
Example:
validator = DataValidator()
baseline = pd.DataFrame({
'salary' : [ 1000000 , 2000000 , 3000000 ],
'age' : [ 25 , 27 , 30 ],
'team' : [ 'Lakers' , 'Bulls' , 'Lakers' ]
})
candidate = pd.DataFrame({
'salary' : [ 5000000 , 6000000 , 7000000 ],
'age' : [ 26 , 28 , 31 ],
'team' : [ 'Bulls' , 'Bulls' , 'Celtics' ]
})
drift = validator.feature_wise_drift(baseline, candidate)
print ( "Numeric feature drift:" )
print ( f " Salary KS: { drift[ 'salary' ][ 'ks_like' ] :.3f} " )
print ( f " Salary mean shift: $ { drift[ 'salary' ][ 'mean_shift' ] :,.0f} " )
print ( f " Age KS: { drift[ 'age' ][ 'ks_like' ] :.3f} " )
print ( " \n Categorical feature drift:" )
print ( f " Team L1 distance: { drift[ 'team' ][ 'l1_drift' ] :.3f} " )
Output:
Numeric feature drift:
Salary KS: 1.000
Salary mean shift: $4,000,000
Age KS: 0.333
Categorical feature drift:
Team L1 distance: 1.333
L1 distance for categorical features ranges from 0 (identical) to 2 (completely different distributions).
temporal_drift()
Analyzes drift over time by comparing early and late time periods.
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:83
def temporal_drift (
self ,
df : pd.DataFrame,
time_col : str = 'version'
) -> dict
DataFrame with temporal data
Column containing version or timestamp information
Returns: Dictionary with temporal drift metrics or {'status': 'insufficient_data'}
Algorithm:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:84-98
if time_col not in df.columns or 'salary' not in df.columns or len (df) < 4 :
return { 'status' : 'insufficient_data' }
# Extract year from version (e.g., "NBA2K20" → 2020)
years = pd.to_numeric(
df[time_col].astype( str ).str.extract( r ' ( \d + ) $ ' )[ 0 ],
errors = 'coerce'
).fillna( 0 ).astype( int )
tmp = df.copy()
tmp[ '_year' ] = np.where(years < 100 , years + 2000 , years)
tmp = tmp.sort_values( '_year' )
# Split into early and late periods
mid = len (tmp) // 2
early = tmp.iloc[:mid]
late = tmp.iloc[mid:]
return {
'early_vs_late_salary_drift' : self .drift_detection(early, late, numeric_col = 'salary' ),
'early_mean_salary' : float (pd.to_numeric(
early[ 'salary' ].astype( str ).str.replace( '$' , '' , regex = False ),
errors = 'coerce'
).mean()),
'late_mean_salary' : float (pd.to_numeric(
late[ 'salary' ].astype( str ).str.replace( '$' , '' , regex = False ),
errors = 'coerce'
).mean())
}
Example:
validator = DataValidator()
df = pd.DataFrame({
'version' : [ 'NBA2K18' , 'NBA2K19' , 'NBA2K20' , 'NBA2K21' , 'NBA2K22' , 'NBA2K23' ],
'salary' : [ 1000000 , 1500000 , 2000000 , 5000000 , 6000000 , 7000000 ]
})
temporal = validator.temporal_drift(df)
print ( f "Early period (2018-2020) avg salary: $ { temporal[ 'early_mean_salary' ] :,.0f} " )
print ( f "Late period (2021-2023) avg salary: $ { temporal[ 'late_mean_salary' ] :,.0f} " )
print ( f "Temporal drift score: { temporal[ 'early_vs_late_salary_drift' ] :.3f} " )
Output:
Early period (2018-2020) avg salary: $1,500,000
Late period (2021-2023) avg salary: $6,000,000
Temporal drift score: 1.000
High temporal drift indicates that models trained on historical data may not perform well on recent data.
ValidationReport
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:9
@dataclass
class ValidationReport :
missing_ratio: dict
outlier_rate: float
drift_score: float
schema_ok: bool
schema_issues: list[ str ]
feature_drift: dict
temporal_drift: dict
Comprehensive validation results container.
Column-wise proportion of missing values (0.0 = no missing, 1.0 = all missing)
Proportion of rows flagged as outliers
Overall drift score (reserved for future use)
Whether schema validation passed
List of schema violation messages
Feature-wise drift metrics from feature_wise_drift()
Temporal drift metrics from temporal_drift()
Integration with Pipeline
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:39
class RealTimePipelineRunner :
def __init__ ( self , config : PipelineConfig):
self .validator = DataValidator()
Full Pipeline Validation:
Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:425-431
def run_all ( self , source ) -> dict :
df = self .ingestor.load(source)
# Quality checks
cleaned = self .preprocessor.clean(df)
outlier_mask = self .preprocessor.detect_outliers_iqr(
cleaned.select_dtypes( include = 'number' )
)
quality = self .validator.quality_report(cleaned, outlier_mask)
# Drift detection
drift_score = self .validator.drift_detection(
cleaned,
cleaned.sample( frac = 1.0 , random_state = self .config.random_seed)
)
# Schema validation
schema_ok, schema_issues = self .validator.schema_validation(
df,
required_columns = [ 'version' , 'salary' , 'b_day' , 'draft_year' ]
)
# Feature-wise and temporal drift
feature_drift = self .validator.feature_wise_drift(
cleaned.iloc[: len (cleaned) // 2 ],
cleaned.iloc[ len (cleaned) // 2 :]
)
temporal_drift = self .validator.temporal_drift(df)
return {
'quality' : quality,
'drift_score' : drift_score,
'schema_ok' : schema_ok,
'schema_issues' : schema_issues,
'feature_drift' : feature_drift,
'temporal_drift' : temporal_drift
}
Data Flow
Drift Monitoring Strategy
Recommended Thresholds
Metric Low Moderate High Action KS statistic <0.05 0.05-0.2 >0.2 Retrain model Outlier rate <5% 5-10% >10% Investigate data quality Missing ratio <1% 1-5% >5% Check data pipeline L1 distance (categorical) <0.1 0.1-0.5 >0.5 Update encodings
Monitoring Workflow
def monitor_production_data ( baseline_df , production_df ):
validator = DataValidator()
# 1. Schema check
schema_ok, issues = validator.schema_validation(
production_df,
required_columns = [ 'salary' , 'age' , 'height' , 'team' ]
)
if not schema_ok:
alert_team( f "Schema issues: { issues } " )
return
# 2. Feature drift
feature_drift = validator.feature_wise_drift(baseline_df, production_df)
for col, metrics in feature_drift.items():
if metrics.get( 'ks_like' , 0 ) > 0.2 :
alert_team( f "High drift in { col } : { metrics[ 'ks_like' ] :.3f} " )
# 3. Temporal drift
temporal = validator.temporal_drift(production_df)
if temporal.get( 'early_vs_late_salary_drift' , 0 ) > 0.2 :
alert_team( "Temporal drift detected - consider retraining" )
Implement continuous drift monitoring in production to maintain model performance.
Best Practices
Validate Early and Often Run schema validation immediately after ingestion, before preprocessing.
Track Drift Over Time Log drift metrics to a time-series database for trend analysis.
Set Adaptive Thresholds Adjust drift thresholds based on business impact and model sensitivity.
Automate Alerts Configure automatic notifications when drift exceeds critical thresholds.
Next Steps
Streaming Engine Execute validated pipelines in real-time
Preprocessing Return to data cleaning and transformation