Skip to main content

Overview

The DataValidator class performs comprehensive data quality checks including missing value analysis, outlier detection, distribution drift detection, schema validation, and temporal drift analysis.

Class Definition

class DataValidator:
    def __init__(self)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:20 No configuration parameters required.

Methods

quality_report

def quality_report(
    self, 
    df: pd.DataFrame, 
    outlier_mask: pd.Series
) -> ValidationReport
Generates a comprehensive data quality report.
df
pd.DataFrame
required
DataFrame to validate (typically cleaned data)
outlier_mask
pd.Series
required
Boolean Series indicating outlier rows (from Preprocessor.detect_outliers_iqr())
return
ValidationReport
Dataclass containing:
  • missing_ratio (dict): Proportion of missing values per column
  • outlier_rate (float): Proportion of rows flagged as outliers
  • drift_score (float): Set to 0.0 (computed separately via drift_detection())
  • schema_ok (bool): Set to True (computed separately via schema_validation())
  • schema_issues (list): Empty list (populated by schema_validation())
  • feature_drift (dict): Empty dict (computed separately via feature_wise_drift())
  • temporal_drift (dict): Empty dict (computed separately via temporal_drift())
Example:
from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd

preprocessor = Preprocessor()
validator = DataValidator()

df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))

quality = validator.quality_report(cleaned, outlier_mask)

print(f"Outlier rate: {quality.outlier_rate:.2%}")
print(f"\nMissing values per column:")
for col, ratio in quality.missing_ratio.items():
    if ratio > 0:
        print(f"  {col}: {ratio:.2%}")

schema_validation

def schema_validation(
    self, 
    df: pd.DataFrame, 
    required_columns: list[str]
) -> tuple[bool, list[str]]
Validates that DataFrame contains required columns and expected data types.
df
pd.DataFrame
required
DataFrame to validate
required_columns
list[str]
required
List of column names that must be present
return
tuple[bool, list[str]]
Tuple of (is_valid, issues) where:
  • is_valid (bool): True if all checks pass
  • issues (list[str]): List of issue descriptions (empty if valid)
Checks performed:
  • Missing required columns → "missing_columns:[col1, col2, ...]"
  • Non-numeric salary column → "salary_column_not_numeric"
Example:
from pipeline.validation import DataValidator
import pandas as pd

validator = DataValidator()

df = pd.read_csv('nba_data.csv')
schema_ok, issues = validator.schema_validation(
    df, 
    required_columns=['version', 'salary', 'b_day', 'draft_year']
)

if schema_ok:
    print("✓ Schema validation passed")
else:
    print("✗ Schema validation failed:")
    for issue in issues:
        print(f"  - {issue}")

# Example with missing columns
incomplete_df = df.drop(columns=['salary'])
schema_ok, issues = validator.schema_validation(
    incomplete_df,
    required_columns=['version', 'salary', 'b_day']
)
print(issues)  # ['missing_columns:[salary]']

drift_detection

def drift_detection(
    self, 
    baseline: pd.DataFrame, 
    candidate: pd.DataFrame, 
    numeric_col: str = 'salary'
) -> float
Detects distribution drift using Kolmogorov-Smirnov statistic.
baseline
pd.DataFrame
required
Baseline (reference) DataFrame
candidate
pd.DataFrame
required
Candidate (current) DataFrame to compare against baseline
numeric_col
str
default:"'salary'"
Name of numeric column to compare
return
float
Kolmogorov-Smirnov statistic (0.0-1.0):
  • 0.0: Identical distributions
  • 0.0-0.1: Minimal drift
  • 0.1-0.3: Moderate drift
  • 0.3: Significant drift
Example:
from pipeline.validation import DataValidator
import pandas as pd

validator = DataValidator()

# Load baseline and new data
baseline_df = pd.read_csv('nba_2020.csv')
candidate_df = pd.read_csv('nba_2023.csv')

# Check salary drift
drift_score = validator.drift_detection(baseline_df, candidate_df, numeric_col='salary')
print(f"Salary drift score: {drift_score:.4f}")

if drift_score < 0.1:
    print("✓ No significant drift detected")
elif drift_score < 0.3:
    print("⚠ Moderate drift detected - review data")
else:
    print("✗ Significant drift detected - data may be incompatible")

# Check drift in other columns
for col in ['age', 'height', 'weight']:
    if col in baseline_df.columns and col in candidate_df.columns:
        drift = validator.drift_detection(baseline_df, candidate_df, numeric_col=col)
        print(f"{col} drift: {drift:.4f}")

feature_wise_drift

def feature_wise_drift(
    self, 
    baseline: pd.DataFrame, 
    candidate: pd.DataFrame
) -> dict
Computes drift metrics for each feature individually.
baseline
pd.DataFrame
required
Baseline DataFrame
candidate
pd.DataFrame
required
Candidate DataFrame
return
dict
Dictionary with one entry per common column:For numeric columns:
{
  'column_name': {
    'type': 'numeric',
    'ks_like': float,  # KS statistic
    'mean_shift': float  # candidate_mean - baseline_mean
  }
}
For categorical columns:
{
  'column_name': {
    'type': 'categorical',
    'l1_drift': float  # L1 distance between distributions
  }
}
Example:
from pipeline.validation import DataValidator
import pandas as pd
import json

validator = DataValidator()

baseline = pd.read_csv('nba_2020.csv')
candidate = pd.read_csv('nba_2023.csv')

drift_report = validator.feature_wise_drift(baseline, candidate)

print("Feature-wise drift report:")
print(json.dumps(drift_report, indent=2))

# Identify high-drift features
for feature, metrics in drift_report.items():
    if metrics['type'] == 'numeric':
        if metrics['ks_like'] > 0.3:
            print(f"⚠ High drift in {feature}:")
            print(f"  KS statistic: {metrics['ks_like']:.4f}")
            print(f"  Mean shift: {metrics['mean_shift']:+.2f}")
    else:  # categorical
        if metrics['l1_drift'] > 0.5:
            print(f"⚠ High drift in {feature} (categorical):")
            print(f"  L1 distance: {metrics['l1_drift']:.4f}")

temporal_drift

def temporal_drift(
    self, 
    df: pd.DataFrame, 
    time_col: str = 'version'
) -> dict
Analyzes temporal drift by comparing early vs late time periods.
df
pd.DataFrame
required
DataFrame with temporal data
time_col
str
default:"'version'"
Column name containing version/year information (extracts year with regex r'(\d+)$')
return
dict
If insufficient data:
{'status': 'insufficient_data'}
Otherwise:
{
  'early_vs_late_salary_drift': float,  # KS statistic
  'early_mean_salary': float,
  'late_mean_salary': float
}
Example:
from pipeline.validation import DataValidator
import pandas as pd

validator = DataValidator()

df = pd.read_csv('nba_historical_data.csv')
temporal_report = validator.temporal_drift(df, time_col='version')

if 'status' in temporal_report:
    print(f"Cannot analyze: {temporal_report['status']}")
else:
    print("Temporal drift analysis:")
    print(f"  Early period mean salary: ${temporal_report['early_mean_salary']:,.2f}")
    print(f"  Late period mean salary: ${temporal_report['late_mean_salary']:,.2f}")
    print(f"  Salary drift: {temporal_report['early_vs_late_salary_drift']:.4f}")
    
    # Calculate change
    change_pct = (
        (temporal_report['late_mean_salary'] - temporal_report['early_mean_salary']) 
        / temporal_report['early_mean_salary'] * 100
    )
    print(f"  Salary change: {change_pct:+.1f}%")

ValidationReport

@dataclass
class ValidationReport:
    missing_ratio: dict
    outlier_rate: float
    drift_score: float
    schema_ok: bool
    schema_issues: list[str]
    feature_drift: dict
    temporal_drift: dict
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:9 Dataclass encapsulating comprehensive validation results.

Usage Patterns

Complete Validation Pipeline

from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd
from dataclasses import asdict
import json

preprocessor = Preprocessor()
validator = DataValidator()

df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)

# Quality report
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))
quality = validator.quality_report(cleaned, outlier_mask)

# Schema validation
schema_ok, schema_issues = validator.schema_validation(
    df, 
    required_columns=['version', 'salary', 'b_day', 'draft_year']
)

# Drift detection (compare to shuffled data as test)
drift_score = validator.drift_detection(
    cleaned, 
    cleaned.sample(frac=1.0, random_state=42)
)

# Complete report
full_report = asdict(quality)
full_report['drift_score'] = drift_score
full_report['schema_ok'] = schema_ok
full_report['schema_issues'] = schema_issues

print(json.dumps(full_report, indent=2))

Monitoring Production Data

from pipeline.validation import DataValidator
import pandas as pd

validator = DataValidator()

# Load baseline (training data)
baseline = pd.read_csv('training_data.csv')

# Load production data
production = pd.read_csv('production_data_latest.csv')

# Comprehensive drift analysis
overall_drift = validator.drift_detection(baseline, production, 'salary')
feature_drift = validator.feature_wise_drift(baseline, production)
temporal_drift = validator.temporal_drift(production)

# Alert on significant drift
if overall_drift > 0.3:
    print(f"🚨 ALERT: Significant drift detected ({overall_drift:.4f})")
    print("\nPer-feature analysis:")
    for feature, metrics in feature_drift.items():
        if metrics.get('ks_like', 0) > 0.3 or metrics.get('l1_drift', 0) > 0.5:
            print(f"  High drift in {feature}: {metrics}")

Schema Validation in ETL

from pipeline.validation import DataValidator
import pandas as pd
import sys

validator = DataValidator()

REQUIRED_COLUMNS = [
    'version', 'name', 'team', 'position',
    'height', 'weight', 'b_day', 'salary',
    'country', 'draft_year', 'draft_round', 'draft_peak'
]

def validate_input_file(file_path: str) -> bool:
    try:
        df = pd.read_csv(file_path)
        schema_ok, issues = validator.schema_validation(df, REQUIRED_COLUMNS)
        
        if not schema_ok:
            print(f"❌ Validation failed for {file_path}:")
            for issue in issues:
                print(f"   - {issue}")
            return False
        
        print(f"✅ Validation passed for {file_path}")
        return True
    except Exception as e:
        print(f"❌ Error loading {file_path}: {e}")
        return False

if __name__ == '__main__':
    files_to_validate = ['nba_2021.csv', 'nba_2022.csv', 'nba_2023.csv']
    results = [validate_input_file(f) for f in files_to_validate]
    
    if not all(results):
        sys.exit(1)  # Fail ETL pipeline

Notes

  • KS statistic ranges from 0.0 (identical) to 1.0 (completely different)
  • L1 drift for categoricals is the sum of absolute differences in category proportions
  • Temporal drift splits data at the midpoint after sorting by extracted year
  • Schema validation automatically handles $ prefix in salary column
  • Year extraction expects format "NBA2K{YY}" and handles both 2-digit and 4-digit years
  • All drift methods return 0.0 for empty or invalid data rather than raising errors
  • Missing values are handled gracefully with .dropna() before drift computation

Build docs developers (and LLMs) love