Skip to main content

Overview

The validation stage ensures data quality throughout the pipeline. The DataValidator class provides comprehensive quality reporting, schema validation, and drift detection to catch issues before they affect downstream models.

DataValidator Class

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:20
class DataValidator:
    # No initialization parameters - stateless validation methods

Core Methods

quality_report()

Generates comprehensive data quality metrics. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:21
def quality_report(
    self, 
    df: pd.DataFrame, 
    outlier_mask: pd.Series
) -> ValidationReport
df
pd.DataFrame
required
Cleaned DataFrame to assess
outlier_mask
pd.Series
required
Boolean Series indicating outlier rows (from Preprocessor.detect_outliers_iqr())
Returns: ValidationReport object containing:
  • missing_ratio (dict): Proportion of missing values per column
  • outlier_rate (float): Proportion of rows flagged as outliers
  • drift_score (float): Reserved for drift analysis
  • schema_ok (bool): Schema validation status
  • schema_issues (list[str]): List of schema problems
  • feature_drift (dict): Feature-wise drift metrics
  • temporal_drift (dict): Time-based drift metrics
Implementation:
missing_ratio = (df.isna().sum() / max(len(df), 1)).to_dict()
outlier_rate = float(outlier_mask.mean()) if len(outlier_mask) else 0.0

return ValidationReport(
    missing_ratio=missing_ratio,
    outlier_rate=outlier_rate,
    drift_score=0.0,
    schema_ok=True,
    schema_issues=[],
    feature_drift={},
    temporal_drift={}
)
Example:
from pipeline.validation import DataValidator
from pipeline.preprocessing import Preprocessor
import pandas as pd

validator = DataValidator()
preprocessor = Preprocessor()

df = pd.DataFrame({
    'salary': [1000000, None, 3000000],
    'age': [25, 27, None],
    'height': [1.98, 2.10, 2.01]
})

# Detect outliers first
outlier_mask = preprocessor.detect_outliers_iqr(df)

# Generate quality report
report = validator.quality_report(df, outlier_mask)

print(f"Missing ratios: {report.missing_ratio}")
# {'salary': 0.333, 'age': 0.333, 'height': 0.0}

print(f"Outlier rate: {report.outlier_rate:.2%}")
# 0.00% (if no outliers detected)
Missing ratio is calculated before imputation. Always generate quality reports on cleaned but not imputed data.

schema_validation()

Validates that the DataFrame conforms to expected schema requirements. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:34
def schema_validation(
    self, 
    df: pd.DataFrame, 
    required_columns: list[str]
) -> tuple[bool, list[str]]
df
pd.DataFrame
required
DataFrame to validate
required_columns
list[str]
required
List of column names that must be present
Returns: Tuple of (schema_ok, issues) where:
  • schema_ok (bool): True if no issues found
  • issues (list[str]): List of validation error messages
Validation Checks:

1. Missing Columns

issues = []
missing_cols = [c for c in required_columns if c not in df.columns]
if missing_cols:
    issues.append(f'missing_columns:{missing_cols}')
Example:
required_columns = ['salary', 'age', 'height']
df.columns = ['salary', 'age']  # 'height' missing

issues = ['missing_columns:[\'height\']']

2. Salary Column Type Check

if 'salary' in df.columns:
    numeric_salary = pd.to_numeric(
        df['salary'].astype(str).str.replace('$', '', regex=False),
        errors='coerce'
    )
    if numeric_salary.isna().all():
        issues.append('salary_column_not_numeric')
Example:
df['salary'] = ['invalid', 'text', 'values']
→ issues.append('salary_column_not_numeric')
Complete Example:
validator = DataValidator()

df = pd.DataFrame({
    'salary': ['$1000000', '$2000000'],
    'age': [25, 27]
})

schema_ok, issues = validator.schema_validation(
    df, 
    required_columns=['salary', 'age', 'height']
)

print(f"Schema valid: {schema_ok}")
# False

print(f"Issues: {issues}")
# ['missing_columns:[\'height\']']
Always validate schema before preprocessing to catch data pipeline issues early.

drift_detection()

Detects distribution shifts between baseline and candidate datasets using the Kolmogorov-Smirnov test. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:45
def drift_detection(
    self, 
    baseline: pd.DataFrame, 
    candidate: pd.DataFrame, 
    numeric_col: str = 'salary'
) -> float
baseline
pd.DataFrame
required
Reference dataset (e.g., training data)
candidate
pd.DataFrame
required
New dataset to compare (e.g., production data)
numeric_col
str
default:"'salary'"
Numeric column to compare distributions
Returns: KS statistic (float in [0, 1])
  • 0.0: Identical distributions
  • ~0.05: Minimal drift
  • ~0.2: Moderate drift
  • >0.5: Severe drift
Algorithm (Kolmogorov-Smirnov Test): Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:46-58
# Extract and clean numeric values
base_series = pd.to_numeric(
    baseline[numeric_col].astype(str).str.replace('$', '', regex=False),
    errors='coerce'
).dropna()
cand_series = pd.to_numeric(
    candidate[numeric_col].astype(str).str.replace('$', '', regex=False),
    errors='coerce'
).dropna()

base = base_series.to_numpy(dtype=float)
cand = cand_series.to_numpy(dtype=float)

if len(base) == 0 or len(cand) == 0:
    return 0.0

# Compute empirical CDFs
base_sorted = np.sort(base)
cand_sorted = np.sort(cand)
values = np.unique(np.concatenate([base_sorted, cand_sorted]))

base_cdf = np.searchsorted(base_sorted, values, side='right') / len(base_sorted)
cand_cdf = np.searchsorted(cand_sorted, values, side='right') / len(cand_sorted)

# Maximum absolute difference between CDFs
return float(np.max(np.abs(base_cdf - cand_cdf)))
Visual Explanation:
Cumulative Distribution Functions (CDFs):

1.0 ┤                  ╭─────────
    │                ╭─┘      candidate
    │              ╭─┘
0.5 ┤         ╭────┘
    │    ╭────┘ baseline
    │ ╭──┘
0.0 ┤─┘
    └─────────────────────────────────
    salary →

KS statistic = max vertical distance between curves
Example:
validator = DataValidator()

baseline_df = pd.DataFrame({
    'salary': ['$1000000', '$2000000', '$3000000']
})

candidate_df = pd.DataFrame({
    'salary': ['$5000000', '$6000000', '$7000000']  # Higher salaries
})

drift_score = validator.drift_detection(baseline_df, candidate_df)

print(f"Drift score: {drift_score:.3f}")
# 1.000 (complete distribution shift)

if drift_score > 0.2:
    print("⚠️ Significant drift detected! Retrain model.")
Use KS drift detection to monitor model inputs over time. Set alerts for drift scores above 0.2.

feature_wise_drift()

Computes drift metrics for all shared columns between datasets. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:60
def feature_wise_drift(
    self, 
    baseline: pd.DataFrame, 
    candidate: pd.DataFrame
) -> dict
baseline
pd.DataFrame
required
Reference dataset
candidate
pd.DataFrame
required
Comparison dataset
Returns: Dictionary mapping column names to drift metrics For Numeric Columns:
if pd.api.types.is_numeric_dtype(baseline[col]):
    result[col] = {
        'type': 'numeric',
        'ks_like': self.drift_detection(baseline, candidate, numeric_col=col),
        'mean_shift': float(candidate[col].mean() - baseline[col].mean())
    }
For Categorical Columns:
else:
    b_dist = baseline[col].value_counts(normalize=True)
    c_dist = candidate[col].value_counts(normalize=True)
    cats = sorted(set(b_dist.index) | set(c_dist.index))
    l1 = sum(abs(b_dist.get(cat, 0.0) - c_dist.get(cat, 0.0)) for cat in cats)
    result[col] = {'type': 'categorical', 'l1_drift': l1}
Example:
validator = DataValidator()

baseline = pd.DataFrame({
    'salary': [1000000, 2000000, 3000000],
    'age': [25, 27, 30],
    'team': ['Lakers', 'Bulls', 'Lakers']
})

candidate = pd.DataFrame({
    'salary': [5000000, 6000000, 7000000],
    'age': [26, 28, 31],
    'team': ['Bulls', 'Bulls', 'Celtics']
})

drift = validator.feature_wise_drift(baseline, candidate)

print("Numeric feature drift:")
print(f"  Salary KS: {drift['salary']['ks_like']:.3f}")
print(f"  Salary mean shift: ${drift['salary']['mean_shift']:,.0f}")
print(f"  Age KS: {drift['age']['ks_like']:.3f}")

print("\nCategorical feature drift:")
print(f"  Team L1 distance: {drift['team']['l1_drift']:.3f}")
Output:
Numeric feature drift:
  Salary KS: 1.000
  Salary mean shift: $4,000,000
  Age KS: 0.333

Categorical feature drift:
  Team L1 distance: 1.333
L1 distance for categorical features ranges from 0 (identical) to 2 (completely different distributions).

temporal_drift()

Analyzes drift over time by comparing early and late time periods. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:83
def temporal_drift(
    self, 
    df: pd.DataFrame, 
    time_col: str = 'version'
) -> dict
df
pd.DataFrame
required
DataFrame with temporal data
time_col
str
default:"'version'"
Column containing version or timestamp information
Returns: Dictionary with temporal drift metrics or {'status': 'insufficient_data'} Algorithm: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:84-98
if time_col not in df.columns or 'salary' not in df.columns or len(df) < 4:
    return {'status': 'insufficient_data'}

# Extract year from version (e.g., "NBA2K20" → 2020)
years = pd.to_numeric(
    df[time_col].astype(str).str.extract(r'(\d+)$')[0],
    errors='coerce'
).fillna(0).astype(int)
tmp = df.copy()
tmp['_year'] = np.where(years < 100, years + 2000, years)
tmp = tmp.sort_values('_year')

# Split into early and late periods
mid = len(tmp) // 2
early = tmp.iloc[:mid]
late = tmp.iloc[mid:]

return {
    'early_vs_late_salary_drift': self.drift_detection(early, late, numeric_col='salary'),
    'early_mean_salary': float(pd.to_numeric(
        early['salary'].astype(str).str.replace('$', '', regex=False),
        errors='coerce'
    ).mean()),
    'late_mean_salary': float(pd.to_numeric(
        late['salary'].astype(str).str.replace('$', '', regex=False),
        errors='coerce'
    ).mean())
}
Example:
validator = DataValidator()

df = pd.DataFrame({
    'version': ['NBA2K18', 'NBA2K19', 'NBA2K20', 'NBA2K21', 'NBA2K22', 'NBA2K23'],
    'salary': [1000000, 1500000, 2000000, 5000000, 6000000, 7000000]
})

temporal = validator.temporal_drift(df)

print(f"Early period (2018-2020) avg salary: ${temporal['early_mean_salary']:,.0f}")
print(f"Late period (2021-2023) avg salary: ${temporal['late_mean_salary']:,.0f}")
print(f"Temporal drift score: {temporal['early_vs_late_salary_drift']:.3f}")
Output:
Early period (2018-2020) avg salary: $1,500,000
Late period (2021-2023) avg salary: $6,000,000
Temporal drift score: 1.000
High temporal drift indicates that models trained on historical data may not perform well on recent data.

ValidationReport

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:9
@dataclass
class ValidationReport:
    missing_ratio: dict
    outlier_rate: float
    drift_score: float
    schema_ok: bool
    schema_issues: list[str]
    feature_drift: dict
    temporal_drift: dict
Comprehensive validation results container.
missing_ratio
dict
Column-wise proportion of missing values (0.0 = no missing, 1.0 = all missing)
outlier_rate
float
Proportion of rows flagged as outliers
drift_score
float
Overall drift score (reserved for future use)
schema_ok
bool
Whether schema validation passed
schema_issues
list[str]
List of schema violation messages
feature_drift
dict
Feature-wise drift metrics from feature_wise_drift()
temporal_drift
dict
Temporal drift metrics from temporal_drift()

Integration with Pipeline

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:39
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        self.validator = DataValidator()
Full Pipeline Validation: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:425-431
def run_all(self, source) -> dict:
    df = self.ingestor.load(source)
    
    # Quality checks
    cleaned = self.preprocessor.clean(df)
    outlier_mask = self.preprocessor.detect_outliers_iqr(
        cleaned.select_dtypes(include='number')
    )
    quality = self.validator.quality_report(cleaned, outlier_mask)
    
    # Drift detection
    drift_score = self.validator.drift_detection(
        cleaned, 
        cleaned.sample(frac=1.0, random_state=self.config.random_seed)
    )
    
    # Schema validation
    schema_ok, schema_issues = self.validator.schema_validation(
        df, 
        required_columns=['version', 'salary', 'b_day', 'draft_year']
    )
    
    # Feature-wise and temporal drift
    feature_drift = self.validator.feature_wise_drift(
        cleaned.iloc[:len(cleaned)//2],
        cleaned.iloc[len(cleaned)//2:]
    )
    temporal_drift = self.validator.temporal_drift(df)
    
    return {
        'quality': quality,
        'drift_score': drift_score,
        'schema_ok': schema_ok,
        'schema_issues': schema_issues,
        'feature_drift': feature_drift,
        'temporal_drift': temporal_drift
    }

Data Flow

Drift Monitoring Strategy

MetricLowModerateHighAction
KS statistic<0.050.05-0.2>0.2Retrain model
Outlier rate<5%5-10%>10%Investigate data quality
Missing ratio<1%1-5%>5%Check data pipeline
L1 distance (categorical)<0.10.1-0.5>0.5Update encodings

Monitoring Workflow

def monitor_production_data(baseline_df, production_df):
    validator = DataValidator()
    
    # 1. Schema check
    schema_ok, issues = validator.schema_validation(
        production_df,
        required_columns=['salary', 'age', 'height', 'team']
    )
    if not schema_ok:
        alert_team(f"Schema issues: {issues}")
        return
    
    # 2. Feature drift
    feature_drift = validator.feature_wise_drift(baseline_df, production_df)
    for col, metrics in feature_drift.items():
        if metrics.get('ks_like', 0) > 0.2:
            alert_team(f"High drift in {col}: {metrics['ks_like']:.3f}")
    
    # 3. Temporal drift
    temporal = validator.temporal_drift(production_df)
    if temporal.get('early_vs_late_salary_drift', 0) > 0.2:
        alert_team("Temporal drift detected - consider retraining")
Implement continuous drift monitoring in production to maintain model performance.

Best Practices

Validate Early and Often

Run schema validation immediately after ingestion, before preprocessing.

Track Drift Over Time

Log drift metrics to a time-series database for trend analysis.

Set Adaptive Thresholds

Adjust drift thresholds based on business impact and model sensitivity.

Automate Alerts

Configure automatic notifications when drift exceeds critical thresholds.

Next Steps

Streaming Engine

Execute validated pipelines in real-time

Preprocessing

Return to data cleaning and transformation

Build docs developers (and LLMs) love