Skip to main content

Overview

The feature engineering stage transforms cleaned data into ML-ready features. The FeatureEngineer class creates derived features, computes rolling statistics, handles multicollinearity, and prepares final encoded datasets for training.

FeatureEngineer Class

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:16
class FeatureEngineer:
    # No initialization parameters - stateless transformations
The FeatureEngineer class is stateless by design. State for streaming operations is managed separately via RollingState.

Core Methods

build_features()

Creates comprehensive feature set for batch processing. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:31
def build_features(
    self, 
    df: pd.DataFrame, 
    rolling_window: int = 5
) -> pd.DataFrame
df
pd.DataFrame
required
Cleaned DataFrame from preprocessing stage
rolling_window
int
default:"5"
Window size for rolling statistics (number of historical observations)
Returns: DataFrame with original and engineered features Features Created:

1. Version Year Extraction

year = df['version'].astype(str).str.extract(r'(\d+)$')[0].astype(int)
year = np.where(year < 100, year + 2000, year)
featured['version_year'] = year
Example:
Input:  "NBA2K20" → Extract "20" → 2020
Input:  "NBA2K21" → Extract "21" → 2021

2. Age Calculation

featured['age'] = featured['version_year'] - featured['b_day'].dt.year
Example:
version_year: 2020
b_day: 1995-03-15
age = 2020 - 1995 = 25

3. Experience Calculation

featured['experience'] = featured['version_year'] - featured['draft_year'].dt.year
Example:
version_year: 2020
draft_year: 2015
experience = 2020 - 2015 = 5 years

4. Body Mass Index (BMI)

featured['bmi'] = featured['weight'] / (featured['height'] ** 2)
Formula: BMI = weight (kg) / height² (m²) Example:
weight: 100 kg
height: 2.0 m
BMI = 100 / (2.0)² = 25.0

5. Rolling Salary Statistics

featured = featured.sort_values('version_year').reset_index(drop=True)
featured['salary_roll_mean'] = featured['salary'].rolling(
    window=rolling_window, 
    min_periods=1
).mean()
featured['salary_roll_std'] = featured['salary'].rolling(
    window=rolling_window, 
    min_periods=1
).std().fillna(0.0)
Purpose: Capture salary trends over time Example with window=3:
Salaries: [1M, 2M, 3M, 4M, 5M]

roll_mean: 
  [1M,      # mean of [1M]
   1.5M,    # mean of [1M, 2M]
   2M,      # mean of [1M, 2M, 3M]
   3M,      # mean of [2M, 3M, 4M]
   4M]      # mean of [3M, 4M, 5M]

6. Temporal Features

featured['birth_month'] = featured['b_day'].dt.month
featured['draft_decade'] = (featured['draft_year'].dt.year // 10) * 10
Examples:
b_day: 1995-03-15 → birth_month: 3
draft_year: 2015 → draft_decade: 2010

7. Salary Anomaly Detection

z = (featured['salary'] - featured['salary'].mean()) / featured['salary'].std(ddof=0)
featured['salary_anomaly'] = (z.abs() > 2.5).astype(int)
Logic: Flag salaries more than 2.5 standard deviations from the mean Example:
Mean salary: $5M
Std dev: $2M

Salary: $12M
Z-score = (12M - 5M) / 2M = 3.5
|3.5| > 2.5 → salary_anomaly = 1 (True)
Complete Example:
from pipeline.feature_engineering import FeatureEngineer
import pandas as pd

engineer = FeatureEngineer()

cleaned_df = pd.DataFrame({
    'version': ['NBA2K20', 'NBA2K21'],
    'b_day': pd.to_datetime(['1995-03-15', '1996-07-22']),
    'draft_year': pd.to_datetime(['2015', '2016']),
    'height': [2.0, 1.98],
    'weight': [100, 95],
    'salary': [5000000, 6000000]
})

featured_df = engineer.build_features(cleaned_df, rolling_window=3)

print("New features created:")
print(featured_df[[
    'version_year', 'age', 'experience', 'bmi',
    'salary_roll_mean', 'birth_month', 'salary_anomaly'
]].head())

build_features_streaming()

Stream-compatible feature engineering with stateful rolling statistics. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:54
def build_features_streaming(
    self, 
    df: pd.DataFrame, 
    state: RollingState
) -> pd.DataFrame
df
pd.DataFrame
required
Cleaned DataFrame chunk from streaming preprocessing
state
RollingState
required
Mutable state object maintaining rolling window history across chunks
Returns: Featured DataFrame chunk Key Difference from Batch Mode: Instead of pandas .rolling(), uses stateful computation: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:20-29
def _add_streaming_rolling_features(
    self, 
    featured: pd.DataFrame, 
    state: RollingState
) -> pd.DataFrame:
    means, stds = [], []
    for salary in featured['salary'].astype(float):
        state.salary_window.append(float(salary))  # Update window
        vals = np.array(state.salary_window, dtype=float)
        means.append(float(vals.mean()))
        stds.append(float(vals.std(ddof=0)) if len(vals) > 1 else 0.0)
    featured['salary_roll_mean'] = means
    featured['salary_roll_std'] = stds
    return featured
Streaming Example:
engineer = FeatureEngineer()
state = engineer.init_rolling_state(rolling_window=5)

# Process chunks sequentially
for chunk in stream_chunks:
    featured_chunk = engineer.build_features_streaming(chunk, state)
    # State persists across chunks!
The state object must be passed to maintain rolling window continuity across chunks.

init_rolling_state()

Initializes state for streaming operations. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:17
def init_rolling_state(self, rolling_window: int = 5) -> RollingState
rolling_window
int
default:"5"
Maximum number of historical salary values to retain
Returns: RollingState object with initialized deque
@dataclass
class RollingState:
    rolling_window: int
    salary_window: deque[float]  # Fixed-size FIFO queue

drop_multicollinearity()

Removes highly correlated features to reduce redundancy. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:75
def drop_multicollinearity(
    self, 
    df: pd.DataFrame, 
    threshold: float = 0.5
) -> pd.DataFrame
df
pd.DataFrame
required
Featured DataFrame
threshold
float
default:"0.5"
Correlation threshold. Pairs with |correlation| > threshold are reduced. Range: [0, 1]
Returns: DataFrame with redundant features removed Algorithm: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:79-93
while len(numeric_columns) > 1:
    # Compute correlation matrix
    corr_matrix = transformed[variable_columns].corr().fillna(0.0)
    upper_triangle = corr_matrix.where(
        np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
    )
    pairs = upper_triangle.stack()
    
    # Find highest correlation
    if pairs.empty or pairs.abs().max() <= threshold:
        break
    
    first, second = pairs.abs().idxmax()
    
    # Drop the feature less correlated with target (salary)
    target_corr = transformed[variable_columns].corrwith(
        transformed['salary']
    ).abs().fillna(0.0)
    drop_col = first if target_corr[first] < target_corr[second] else second
    
    transformed = transformed.drop(columns=drop_col)
Example:
Correlation matrix:
           age  experience  years_pro
age        1.0  0.95        0.92
experience 0.95 1.0         0.98
years_pro  0.92 0.98        1.0

Correlation with salary:
age:        0.45
experience: 0.50
years_pro:  0.48

Step 1: experience-years_pro correlation = 0.98 > 0.5
  → Keep experience (0.50 > 0.48), drop years_pro

Step 2: age-experience correlation = 0.95 > 0.5
  → Keep experience (0.50 > 0.45), drop age

Final: Only experience remains
Lower thresholds (e.g., 0.3) are more aggressive, higher thresholds (e.g., 0.8) are more conservative.

encode_and_scale()

Prepares final ML-ready dataset with encoding and normalization. Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:97
def encode_and_scale(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.Series]
df
pd.DataFrame
required
Featured DataFrame (after multicollinearity removal)
Returns: Tuple of (X, y) where:
  • X (DataFrame): Encoded and scaled features
  • y (Series): Target variable (salary)
Processing Steps:

1. Target Separation

y = df['salary'].copy()
x = df.drop(columns=[
    'salary',      # Target
    'version',     # Identifier
    'b_day',       # Raw temporal (replaced by birth_month)
    'draft_year',  # Raw temporal (replaced by draft_decade)
    'weight',      # Raw measurement (replaced by bmi)
    'height'       # Raw measurement (replaced by bmi)
], errors='ignore')

2. High-Cardinality Filtering

for col in list(x.select_dtypes(include='object').columns):
    if x[col].nunique(dropna=False) >= 50:
        x = x.drop(columns=col)
Purpose: Remove columns with too many categories (e.g., player names)

3. Numeric Feature Scaling (Z-score Normalization)

num_cols = list(x.select_dtypes(include='number').columns)
num = x[num_cols].astype(float)
num = num.fillna(num.median())
num = (num - num.mean()) / num.std(ddof=0).replace(0.0, 1.0)
Formula: z = (x - μ) / σ Example:
Original salary_roll_mean: [1M, 3M, 5M]
Mean: 3M
Std: 2M

Scaled: [(1-3)/2, (3-3)/2, (5-3)/2] = [-1.0, 0.0, 1.0]

4. One-Hot Encoding (Categorical Features)

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/feature_engineering/features.py:110-116
cat_cols = list(x.select_dtypes(include='object').columns)
cat_frames = []
for col in cat_cols:
    filled = x[col].fillna(f'Unknown_{col}')
    cats = pd.Index(pd.unique(filled))
    one_hot = pd.DataFrame(
        (filled.to_numpy()[:, None] == cats.to_numpy()).astype(int),
        columns=[f'{col}__{c}' for c in cats.astype(str)],
        index=x.index
    )
    cat_frames.append(one_hot)

cat = pd.concat(cat_frames, axis=1) if cat_frames else pd.DataFrame(index=x.index)
Example:
Original 'team' column: ['Lakers', 'Bulls', 'Lakers']

One-hot encoded:
team__Lakers  team__Bulls
1             0
0             1
1             0

5. Final Assembly

return pd.concat([num, cat], axis=1), y
Complete Example:
engineer = FeatureEngineer()

featured_df = pd.DataFrame({
    'salary': [5000000, 6000000, 7000000],
    'age': [25, 27, 30],
    'bmi': [24.5, 25.0, 26.2],
    'team': ['Lakers', 'Bulls', 'Lakers'],
    'version': ['NBA2K20', 'NBA2K21', 'NBA2K22']  # Dropped
})

X, y = engineer.encode_and_scale(featured_df)

print("Features (X):")
print(X.columns.tolist())
# ['age', 'bmi', 'team__Lakers', 'team__Bulls']

print("\nTarget (y):")
print(y.values)
# [5000000, 6000000, 7000000]

Data Flow

Integration with Pipeline

Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:38
class RealTimePipelineRunner:
    def __init__(self, config: PipelineConfig):
        self.engineer = FeatureEngineer()
Batch Processing:
def _process_df(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.Series]:
    cleaned = self.preprocessor.clean(df)
    featured = self.engineer.build_features(cleaned)
    filtered = self.engineer.drop_multicollinearity(featured)
    X, y = self.engineer.encode_and_scale(filtered)
    return X, y
Streaming Processing: Location: ~/workspace/source/NBA Data Preprocessing/task/pipeline/streaming/engine.py:78-82
def _process_stream_chunk(self, chunk, rolling_state):
    cleaned = self.preprocessor.clean(chunk)
    featured = self.engineer.build_features_streaming(cleaned, rolling_state)
    filtered = self.engineer.drop_multicollinearity(featured)
    return self.engineer.encode_and_scale(filtered)

Performance Considerations

Batch vs Streaming

AspectBatch ModeStreaming Mode
Rolling statspandas .rolling()Stateful deque
MemoryFull dataset in RAMOne chunk at a time
State managementNoneRollingState required
SpeedFaster (vectorized)Slightly slower (iterative)

Time Complexity

  • build_features(): O(n × m) where n = rows, m = numeric columns
  • drop_multicollinearity(): O(k³) where k = numeric features
  • encode_and_scale(): O(n × c) where c = categorical cardinality
All methods create new DataFrames, preserving original data immutability.

Best Practices

Choose Rolling Window Wisely

Smaller windows (3-5) capture recent trends, larger windows (10-20) smooth long-term patterns.

Monitor Multicollinearity

Adjust threshold based on model type: linear models need stricter thresholds (0.5), tree-based models are robust to correlation.

Validate Encodings

Check one-hot encoded column counts before training to avoid dimension mismatches.

Stream State Management

Always initialize RollingState before streaming and pass it consistently to all chunks.

Next Steps

Validation

Validate feature quality and detect drift

Streaming Engine

Execute the full pipeline in real-time

Build docs developers (and LLMs) love