Overview
The DataValidator class performs comprehensive data quality checks including missing value analysis, outlier detection, distribution drift detection, schema validation, and temporal drift analysis.
Class Definition
class DataValidator:
def __init__(self)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:20
No configuration parameters required.
Methods
quality_report
def quality_report(
self,
df: pd.DataFrame,
outlier_mask: pd.Series
) -> ValidationReport
Generates a comprehensive data quality report.
DataFrame to validate (typically cleaned data)
Boolean Series indicating outlier rows (from Preprocessor.detect_outliers_iqr())
Dataclass containing:
missing_ratio (dict): Proportion of missing values per column
outlier_rate (float): Proportion of rows flagged as outliers
drift_score (float): Set to 0.0 (computed separately via drift_detection())
schema_ok (bool): Set to True (computed separately via schema_validation())
schema_issues (list): Empty list (populated by schema_validation())
feature_drift (dict): Empty dict (computed separately via feature_wise_drift())
temporal_drift (dict): Empty dict (computed separately via temporal_drift())
Example:
from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd
preprocessor = Preprocessor()
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))
quality = validator.quality_report(cleaned, outlier_mask)
print(f"Outlier rate: {quality.outlier_rate:.2%}")
print(f"\nMissing values per column:")
for col, ratio in quality.missing_ratio.items():
if ratio > 0:
print(f" {col}: {ratio:.2%}")
schema_validation
def schema_validation(
self,
df: pd.DataFrame,
required_columns: list[str]
) -> tuple[bool, list[str]]
Validates that DataFrame contains required columns and expected data types.
List of column names that must be present
Tuple of (is_valid, issues) where:
is_valid (bool): True if all checks pass
issues (list[str]): List of issue descriptions (empty if valid)
Checks performed:
- Missing required columns →
"missing_columns:[col1, col2, ...]"
- Non-numeric salary column →
"salary_column_not_numeric"
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
schema_ok, issues = validator.schema_validation(
df,
required_columns=['version', 'salary', 'b_day', 'draft_year']
)
if schema_ok:
print("✓ Schema validation passed")
else:
print("✗ Schema validation failed:")
for issue in issues:
print(f" - {issue}")
# Example with missing columns
incomplete_df = df.drop(columns=['salary'])
schema_ok, issues = validator.schema_validation(
incomplete_df,
required_columns=['version', 'salary', 'b_day']
)
print(issues) # ['missing_columns:[salary]']
drift_detection
def drift_detection(
self,
baseline: pd.DataFrame,
candidate: pd.DataFrame,
numeric_col: str = 'salary'
) -> float
Detects distribution drift using Kolmogorov-Smirnov statistic.
Baseline (reference) DataFrame
Candidate (current) DataFrame to compare against baseline
Name of numeric column to compare
Kolmogorov-Smirnov statistic (0.0-1.0):
- 0.0: Identical distributions
- 0.0-0.1: Minimal drift
- 0.1-0.3: Moderate drift
-
0.3: Significant drift
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
# Load baseline and new data
baseline_df = pd.read_csv('nba_2020.csv')
candidate_df = pd.read_csv('nba_2023.csv')
# Check salary drift
drift_score = validator.drift_detection(baseline_df, candidate_df, numeric_col='salary')
print(f"Salary drift score: {drift_score:.4f}")
if drift_score < 0.1:
print("✓ No significant drift detected")
elif drift_score < 0.3:
print("⚠ Moderate drift detected - review data")
else:
print("✗ Significant drift detected - data may be incompatible")
# Check drift in other columns
for col in ['age', 'height', 'weight']:
if col in baseline_df.columns and col in candidate_df.columns:
drift = validator.drift_detection(baseline_df, candidate_df, numeric_col=col)
print(f"{col} drift: {drift:.4f}")
feature_wise_drift
def feature_wise_drift(
self,
baseline: pd.DataFrame,
candidate: pd.DataFrame
) -> dict
Computes drift metrics for each feature individually.
Dictionary with one entry per common column:For numeric columns:{
'column_name': {
'type': 'numeric',
'ks_like': float, # KS statistic
'mean_shift': float # candidate_mean - baseline_mean
}
}
For categorical columns:{
'column_name': {
'type': 'categorical',
'l1_drift': float # L1 distance between distributions
}
}
Example:
from pipeline.validation import DataValidator
import pandas as pd
import json
validator = DataValidator()
baseline = pd.read_csv('nba_2020.csv')
candidate = pd.read_csv('nba_2023.csv')
drift_report = validator.feature_wise_drift(baseline, candidate)
print("Feature-wise drift report:")
print(json.dumps(drift_report, indent=2))
# Identify high-drift features
for feature, metrics in drift_report.items():
if metrics['type'] == 'numeric':
if metrics['ks_like'] > 0.3:
print(f"⚠ High drift in {feature}:")
print(f" KS statistic: {metrics['ks_like']:.4f}")
print(f" Mean shift: {metrics['mean_shift']:+.2f}")
else: # categorical
if metrics['l1_drift'] > 0.5:
print(f"⚠ High drift in {feature} (categorical):")
print(f" L1 distance: {metrics['l1_drift']:.4f}")
temporal_drift
def temporal_drift(
self,
df: pd.DataFrame,
time_col: str = 'version'
) -> dict
Analyzes temporal drift by comparing early vs late time periods.
DataFrame with temporal data
Column name containing version/year information (extracts year with regex r'(\d+)$')
If insufficient data:{'status': 'insufficient_data'}
Otherwise:{
'early_vs_late_salary_drift': float, # KS statistic
'early_mean_salary': float,
'late_mean_salary': float
}
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
df = pd.read_csv('nba_historical_data.csv')
temporal_report = validator.temporal_drift(df, time_col='version')
if 'status' in temporal_report:
print(f"Cannot analyze: {temporal_report['status']}")
else:
print("Temporal drift analysis:")
print(f" Early period mean salary: ${temporal_report['early_mean_salary']:,.2f}")
print(f" Late period mean salary: ${temporal_report['late_mean_salary']:,.2f}")
print(f" Salary drift: {temporal_report['early_vs_late_salary_drift']:.4f}")
# Calculate change
change_pct = (
(temporal_report['late_mean_salary'] - temporal_report['early_mean_salary'])
/ temporal_report['early_mean_salary'] * 100
)
print(f" Salary change: {change_pct:+.1f}%")
ValidationReport
@dataclass
class ValidationReport:
missing_ratio: dict
outlier_rate: float
drift_score: float
schema_ok: bool
schema_issues: list[str]
feature_drift: dict
temporal_drift: dict
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:9
Dataclass encapsulating comprehensive validation results.
Usage Patterns
Complete Validation Pipeline
from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd
from dataclasses import asdict
import json
preprocessor = Preprocessor()
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)
# Quality report
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))
quality = validator.quality_report(cleaned, outlier_mask)
# Schema validation
schema_ok, schema_issues = validator.schema_validation(
df,
required_columns=['version', 'salary', 'b_day', 'draft_year']
)
# Drift detection (compare to shuffled data as test)
drift_score = validator.drift_detection(
cleaned,
cleaned.sample(frac=1.0, random_state=42)
)
# Complete report
full_report = asdict(quality)
full_report['drift_score'] = drift_score
full_report['schema_ok'] = schema_ok
full_report['schema_issues'] = schema_issues
print(json.dumps(full_report, indent=2))
Monitoring Production Data
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
# Load baseline (training data)
baseline = pd.read_csv('training_data.csv')
# Load production data
production = pd.read_csv('production_data_latest.csv')
# Comprehensive drift analysis
overall_drift = validator.drift_detection(baseline, production, 'salary')
feature_drift = validator.feature_wise_drift(baseline, production)
temporal_drift = validator.temporal_drift(production)
# Alert on significant drift
if overall_drift > 0.3:
print(f"🚨 ALERT: Significant drift detected ({overall_drift:.4f})")
print("\nPer-feature analysis:")
for feature, metrics in feature_drift.items():
if metrics.get('ks_like', 0) > 0.3 or metrics.get('l1_drift', 0) > 0.5:
print(f" High drift in {feature}: {metrics}")
Schema Validation in ETL
from pipeline.validation import DataValidator
import pandas as pd
import sys
validator = DataValidator()
REQUIRED_COLUMNS = [
'version', 'name', 'team', 'position',
'height', 'weight', 'b_day', 'salary',
'country', 'draft_year', 'draft_round', 'draft_peak'
]
def validate_input_file(file_path: str) -> bool:
try:
df = pd.read_csv(file_path)
schema_ok, issues = validator.schema_validation(df, REQUIRED_COLUMNS)
if not schema_ok:
print(f"❌ Validation failed for {file_path}:")
for issue in issues:
print(f" - {issue}")
return False
print(f"✅ Validation passed for {file_path}")
return True
except Exception as e:
print(f"❌ Error loading {file_path}: {e}")
return False
if __name__ == '__main__':
files_to_validate = ['nba_2021.csv', 'nba_2022.csv', 'nba_2023.csv']
results = [validate_input_file(f) for f in files_to_validate]
if not all(results):
sys.exit(1) # Fail ETL pipeline
Notes
- KS statistic ranges from 0.0 (identical) to 1.0 (completely different)
- L1 drift for categoricals is the sum of absolute differences in category proportions
- Temporal drift splits data at the midpoint after sorting by extracted year
- Schema validation automatically handles
$ prefix in salary column
- Year extraction expects format
"NBA2K{YY}" and handles both 2-digit and 4-digit years
- All drift methods return 0.0 for empty or invalid data rather than raising errors
- Missing values are handled gracefully with
.dropna() before drift computation