Skip to main content

Overview

This guide covers optimization strategies to maximize prediction accuracy and system performance in CryptoView Pro.

Data Optimization

Optimal Data Quantity

Minimum Requirements

# Recommended data points by model
MIN_DATA_REQUIREMENTS = {
    'xgboost': 500,   # Minimum for stable training
    'prophet': 100,   # Minimum for trend detection
    'hybrid': 500     # Uses XGBoost minimum
}

RECOMMENDED_DATA = {
    'xgboost': 2000,  # Optimal performance
    'prophet': 1000,  # Better seasonality detection
    'hybrid': 2000    # Best results
}

def check_data_quality(df, model_type='xgboost'):
    """
    Validate data quality for model training
    """
    min_required = MIN_DATA_REQUIREMENTS[model_type]
    recommended = RECOMMENDED_DATA[model_type]
    
    actual_points = len(df)
    
    if actual_points < min_required:
        print(f"❌ ERROR: Need {min_required} points, have {actual_points}")
        return False
    elif actual_points < recommended:
        print(f"⚠️ WARNING: Have {actual_points} points, recommended {recommended}")
        print(f"   Accuracy may be reduced by ~{(1 - actual_points/recommended)*20:.1f}%")
        return True
    else:
        print(f"✅ Data quality good: {actual_points} points")
        return True

# Usage
if check_data_quality(df, 'xgboost'):
    predictor.train(df)

Data Preprocessing

Handle Missing Values

def preprocess_data(df):
    """
    Clean and prepare data for optimal model performance
    """
    df = df.copy()
    
    # 1. Remove duplicates
    df = df[~df.index.duplicated(keep='first')]
    
    # 2. Handle missing values
    if df.isnull().sum().sum() > 0:
        print(f"⚠️ Found {df.isnull().sum().sum()} missing values")
        
        # Forward fill for price data (conservative)
        price_cols = ['open', 'high', 'low', 'close']
        df[price_cols] = df[price_cols].fillna(method='ffill')
        
        # Volume: use median
        if 'volume' in df.columns:
            df['volume'] = df['volume'].fillna(df['volume'].median())
    
    # 3. Remove outliers (> 20% change in 1 period)
    returns = df['close'].pct_change()
    outliers = abs(returns) > 0.20
    
    if outliers.sum() > 0:
        print(f"⚠️ Found {outliers.sum()} outliers (>20% moves)")
        # Replace outliers with interpolated values
        df.loc[outliers, 'close'] = np.nan
        df['close'] = df['close'].interpolate(method='linear')
    
    # 4. Ensure chronological order
    df = df.sort_index()
    
    # 5. Validate OHLC relationships
    invalid = (df['high'] < df['low']) | (df['close'] > df['high']) | (df['close'] < df['low'])
    if invalid.sum() > 0:
        print(f"⚠️ Fixed {invalid.sum()} invalid OHLC relationships")
        df.loc[invalid, 'high'] = df.loc[invalid, [['open', 'close']].max(axis=1)
        df.loc[invalid, 'low'] = df.loc[invalid, ['open', 'close']].min(axis=1)
    
    print("✅ Data preprocessing complete")
    return df

# Usage
df = preprocess_data(df)

Optimal Timeframe Selection

def select_timeframe(forecast_hours: int):
    """
    Choose optimal data timeframe for prediction horizon
    
    Rule: Use timeframe that gives 50-200x data points vs forecast
    """
    recommendations = {
        'short': {  # 1-24 hours
            'timeframes': ['5m', '15m', '1h'],
            'preferred': '15m',
            'data_points': 2000
        },
        'medium': {  # 24-168 hours (1 week)
            'timeframes': ['1h', '4h'],
            'preferred': '1h',
            'data_points': 2000
        },
        'long': {  # 1+ weeks
            'timeframes': ['4h', '1d'],
            'preferred': '1d',
            'data_points': 1000
        }
    }
    
    if forecast_hours <= 24:
        category = 'short'
    elif forecast_hours <= 168:
        category = 'medium'
    else:
        category = 'long'
    
    rec = recommendations[category]
    print(f"For {forecast_hours}h forecast:")
    print(f"  Recommended timeframe: {rec['preferred']}")
    print(f"  Alternative timeframes: {', '.join(rec['timeframes'])}")
    print(f"  Suggested data points: {rec['data_points']}")
    
    return rec['preferred'], rec['data_points']

# Usage
timeframe, data_limit = select_timeframe(48)

Model-Specific Optimization

XGBoost Hyperparameter Tuning

Quick Optimization

def optimize_xgboost_quick(df):
    """
    Fast hyperparameter optimization for XGBoost
    Tests 3-5 configurations
    """
    from models.xgboost_model import XGBoostCryptoPredictor, backtest_model
    
    configs = [
        # Conservative (stable)
        {'n_estimators': 150, 'learning_rate': 0.05, 'max_depth': 5},
        # Balanced (default)
        {'n_estimators': 200, 'learning_rate': 0.07, 'max_depth': 6},
        # Aggressive (captures complexity)
        {'n_estimators': 250, 'learning_rate': 0.08, 'max_depth': 7},
    ]
    
    best_config = None
    best_mape = float('inf')
    
    print("Testing XGBoost configurations...\n")
    
    for i, config in enumerate(configs, 1):
        print(f"Config {i}/3: {config}")
        
        predictor = XGBoostCryptoPredictor(**config)
        results = backtest_model(df, predictor, train_size=0.8)
        
        mape = results['metrics']['test_mape']
        print(f"  MAPE: {mape:.3f}%")
        print(f"  Direction Accuracy: {results['metrics']['test_direction_accuracy']:.1f}%\n")
        
        if mape < best_mape:
            best_mape = mape
            best_config = config
    
    print(f"✅ Best config: {best_config}")
    print(f"   MAPE: {best_mape:.3f}%")
    
    return XGBoostCryptoPredictor(**best_config)

# Usage
optimized_predictor = optimize_xgboost_quick(df)

Grid Search (Comprehensive)

import itertools

def optimize_xgboost_grid(df, verbose=True):
    """
    Comprehensive grid search for XGBoost
    Warning: Can take 5-10 minutes
    """
    from models.xgboost_model import XGBoostCryptoPredictor
    
    param_grid = {
        'n_estimators': [150, 200, 250],
        'learning_rate': [0.05, 0.07, 0.09],
        'max_depth': [5, 6, 7],
        'subsample': [0.8, 0.9],
        'colsample_bytree': [0.8, 0.9]
    }
    
    keys = param_grid.keys()
    values = param_grid.values()
    combinations = list(itertools.product(*values))
    
    print(f"Testing {len(combinations)} configurations...\n")
    
    best_config = None
    best_score = float('inf')
    results = []
    
    for i, combo in enumerate(combinations, 1):
        config = dict(zip(keys, combo))
        
        try:
            predictor = XGBoostCryptoPredictor(**config)
            metrics = predictor.train(df, train_size=0.8)
            
            # Composite score: MAPE + direction penalty
            score = metrics['test_mape'] * (1 + (70 - metrics['test_direction_accuracy'])/100)
            
            results.append({
                'config': config,
                'mape': metrics['test_mape'],
                'direction_acc': metrics['test_direction_accuracy'],
                'score': score
            })
            
            if score < best_score:
                best_score = score
                best_config = config
            
            if verbose and i % 10 == 0:
                print(f"Progress: {i}/{len(combinations)} ({i/len(combinations)*100:.1f}%)")
        
        except Exception as e:
            if verbose:
                print(f"Config failed: {config} - {e}")
    
    # Sort results
    results = sorted(results, key=lambda x: x['score'])
    
    print(f"\n✅ Optimization complete!")
    print(f"\nTop 3 configurations:")
    for i, r in enumerate(results[:3], 1):
        print(f"\n{i}. MAPE: {r['mape']:.3f}% | Dir Acc: {r['direction_acc']:.1f}%")
        print(f"   {r['config']}")
    
    return XGBoostCryptoPredictor(**best_config), results

# Usage
predictor, all_results = optimize_xgboost_grid(df)

Feature Selection

def optimize_features(df, predictor):
    """
    Identify and keep only important features
    """
    # Train model
    predictor.train(df, train_size=0.8)
    
    # Get feature importance
    importance_df = predictor.get_feature_importance()
    
    # Keep top features that account for 95% of importance
    importance_df['cumsum'] = importance_df['importance'].cumsum()
    total_importance = importance_df['importance'].sum()
    
    important_features = importance_df[
        importance_df['cumsum'] <= total_importance * 0.95
    ]['feature'].tolist()
    
    print(f"Reduced features from {len(predictor.feature_columns)} to {len(important_features)}")
    print(f"Top 10 features:")
    print(importance_df.head(10))
    
    # Update predictor to use only important features
    predictor.feature_columns = important_features
    
    return predictor

# Usage
predictor = optimize_features(df, predictor)

Prophet Optimization

Parameter Tuning

def optimize_prophet(df):
    """
    Find optimal Prophet parameters
    """
    from models.prophet_model import ProphetCryptoPredictor, backtest_prophet
    
    param_combinations = [
        # Conservative
        {'changepoint_prior_scale': 0.2, 'seasonality_prior_scale': 5},
        # Balanced
        {'changepoint_prior_scale': 0.3, 'seasonality_prior_scale': 10},
        # Moderate
        {'changepoint_prior_scale': 0.4, 'seasonality_prior_scale': 12},
        # Aggressive (best for crypto)
        {'changepoint_prior_scale': 0.5, 'seasonality_prior_scale': 15},
    ]
    
    best_config = None
    best_mape = float('inf')
    
    print("Testing Prophet configurations...\n")
    
    for i, config in enumerate(param_combinations, 1):
        print(f"Config {i}/{len(param_combinations)}: {config}")
        
        predictor = ProphetCryptoPredictor(**config)
        results = backtest_prophet(df, predictor, test_periods=168)
        
        mape = results['test_metrics']['mape']
        print(f"  MAPE: {mape:.3f}%")
        print(f"  Direction Accuracy: {results['test_metrics']['direction_accuracy']:.1f}%\n")
        
        if mape < best_mape:
            best_mape = mape
            best_config = config
    
    print(f"✅ Best config: {best_config}")
    print(f"   MAPE: {best_mape:.3f}%")
    
    return ProphetCryptoPredictor(**best_config)

# Usage
optimized_prophet = optimize_prophet(df)

Adding Custom Seasonality

def add_crypto_seasonality(predictor):
    """
    Add crypto-specific seasonal patterns
    """
    # 4-hour cycle (common in crypto)
    predictor.model.add_seasonality(
        name='four_hourly',
        period=4/24,  # 4 hours in days
        fourier_order=5
    )
    
    # Monthly cycle (alt season, etc)
    predictor.model.add_seasonality(
        name='monthly',
        period=30.5,
        fourier_order=10
    )
    
    return predictor

Training Optimization

Cross-Validation

from sklearn.model_selection import TimeSeriesSplit

def cross_validate_model(df, predictor, n_splits=5):
    """
    Perform time series cross-validation
    """
    tscv = TimeSeriesSplit(n_splits=n_splits)
    scores = []
    
    print(f"Running {n_splits}-fold cross-validation...\n")
    
    for i, (train_idx, test_idx) in enumerate(tscv.split(df), 1):
        train_df = df.iloc[train_idx]
        test_df = df.iloc[test_idx]
        
        # Train
        metrics = predictor.train(train_df, train_size=0.9)
        
        # Test
        predictions = predictor.predict_future(train_df, periods=len(test_df))
        
        # Evaluate
        actual = test_df['close'].values[:len(predictions)]
        predicted = predictions['predicted_price'].values
        
        mape = np.mean(np.abs((actual - predicted) / actual)) * 100
        scores.append(mape)
        
        print(f"Fold {i}: MAPE = {mape:.3f}%")
    
    print(f"\nAverage MAPE: {np.mean(scores):.3f}% ± {np.std(scores):.3f}%")
    
    return scores

# Usage
cv_scores = cross_validate_model(df, predictor, n_splits=5)

Ensemble Methods

def create_ensemble(df, periods=24):
    """
    Create ensemble of multiple XGBoost models with different seeds
    """
    from models.xgboost_model import XGBoostCryptoPredictor
    import numpy as np
    
    n_models = 5
    predictions_list = []
    
    print(f"Training {n_models} models for ensemble...\n")
    
    for i in range(n_models):
        # Different random seed for each model
        predictor = XGBoostCryptoPredictor()
        predictor.model.random_state = 42 + i
        
        predictor.train(df, train_size=0.8)
        pred = predictor.predict_future(df, periods=periods)
        predictions_list.append(pred['predicted_price'].values)
        
        print(f"Model {i+1}/{n_models} trained")
    
    # Average predictions
    ensemble_pred = np.mean(predictions_list, axis=0)
    
    # Calculate prediction std (uncertainty)
    ensemble_std = np.std(predictions_list, axis=0)
    
    # Create result DataFrame
    result = pred.copy()
    result['predicted_price'] = ensemble_pred
    result['ensemble_std'] = ensemble_std
    result['lower_bound'] = ensemble_pred - 2 * ensemble_std
    result['upper_bound'] = ensemble_pred + 2 * ensemble_std
    
    print(f"\n✅ Ensemble complete")
    print(f"Average std: {ensemble_std.mean():.2f}")
    
    return result

# Usage
ensemble_predictions = create_ensemble(df, periods=48)

Prediction Optimization

Adaptive Retraining

class AdaptivePredictor:
    """
    Automatically retrains when performance degrades
    """
    def __init__(self, predictor, performance_threshold=2.0):
        self.predictor = predictor
        self.threshold = performance_threshold
        self.last_train_time = None
        self.recent_errors = []
        
    def predict(self, df, periods=24):
        # Train if never trained
        if not self.predictor.trained:
            print("Initial training...")
            self.predictor.train(df)
            self.last_train_time = datetime.now()
            return self.predictor.predict_future(df, periods)
        
        # Make prediction
        predictions = self.predictor.predict_future(df, periods)
        
        # Check if we should retrain
        actual_next = df['close'].iloc[-1]
        predicted_next = predictions['predicted_price'].iloc[0]
        error_pct = abs((actual_next - predicted_next) / actual_next) * 100
        
        self.recent_errors.append(error_pct)
        if len(self.recent_errors) > 10:
            self.recent_errors.pop(0)
        
        avg_error = np.mean(self.recent_errors)
        
        if avg_error > self.threshold:
            print(f"⚠️ Average error {avg_error:.2f}% > threshold {self.threshold}%")
            print("Retraining model...")
            self.predictor.train(df)
            self.last_train_time = datetime.now()
            predictions = self.predictor.predict_future(df, periods)
            self.recent_errors = []
        
        return predictions

# Usage
from models.xgboost_model import XGBoostCryptoPredictor

base_predictor = XGBoostCryptoPredictor()
adaptive = AdaptivePredictor(base_predictor, performance_threshold=1.5)

predictions = adaptive.predict(df, periods=24)

Prediction Caching

import hashlib
import pickle
from pathlib import Path

def cached_prediction(df, predictor, periods, cache_dir='./cache'):
    """
    Cache predictions to avoid recomputation
    """
    # Create cache key from data and parameters
    data_hash = hashlib.md5(
        df.tail(100).to_json().encode()
    ).hexdigest()
    
    cache_key = f"{predictor.__class__.__name__}_{periods}_{data_hash}.pkl"
    cache_path = Path(cache_dir) / cache_key
    
    # Check cache
    if cache_path.exists():
        cache_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
        if cache_age.seconds < 300:  # 5 minutes
            print("✅ Using cached prediction")
            with open(cache_path, 'rb') as f:
                return pickle.load(f)
    
    # Generate new prediction
    print("Generating new prediction...")
    if not predictor.trained:
        predictor.train(df)
    predictions = predictor.predict_future(df, periods)
    
    # Cache result
    cache_path.parent.mkdir(exist_ok=True)
    with open(cache_path, 'wb') as f:
        pickle.dump(predictions, f)
    
    return predictions

System Performance

Memory Optimization

import gc

def optimize_memory(df):
    """
    Reduce memory usage of DataFrame
    """
    memory_before = df.memory_usage(deep=True).sum() / 1024**2
    
    # Convert float64 to float32
    float_cols = df.select_dtypes(include=['float64']).columns
    df[float_cols] = df[float_cols].astype('float32')
    
    # Convert int64 to int32
    int_cols = df.select_dtypes(include=['int64']).columns
    df[int_cols] = df[int_cols].astype('int32')
    
    memory_after = df.memory_usage(deep=True).sum() / 1024**2
    
    print(f"Memory usage: {memory_before:.2f} MB → {memory_after:.2f} MB")
    print(f"Saved: {memory_before - memory_after:.2f} MB ({(1 - memory_after/memory_before)*100:.1f}%)")
    
    # Force garbage collection
    gc.collect()
    
    return df

Parallel Processing

from concurrent.futures import ThreadPoolExecutor

def parallel_multi_symbol_predictions(symbols, df_dict, periods=24):
    """
    Generate predictions for multiple symbols in parallel
    """
    from models.hybrid_model import HybridCryptoPredictor
    
    def predict_symbol(symbol):
        predictor = HybridCryptoPredictor()
        df = df_dict[symbol]
        predictor.train(df)
        predictions = predictor.predict_future(df, periods)
        return symbol, predictions
    
    results = {}
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(predict_symbol, sym) for sym in symbols]
        
        for future in futures:
            symbol, predictions = future.result()
            results[symbol] = predictions
            print(f"✅ {symbol} complete")
    
    return results

# Usage
symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']
results = parallel_multi_symbol_predictions(symbols, df_dict, periods=24)

Monitoring & Maintenance

Performance Tracking

class PerformanceMonitor:
    def __init__(self):
        self.predictions = []
        self.actuals = []
        self.timestamps = []
    
    def log_prediction(self, timestamp, predicted, actual=None):
        self.timestamps.append(timestamp)
        self.predictions.append(predicted)
        if actual is not None:
            self.actuals.append(actual)
    
    def get_recent_performance(self, periods=10):
        if len(self.actuals) < periods:
            return None
        
        recent_predictions = self.predictions[-periods:]
        recent_actuals = self.actuals[-periods:]
        
        mae = np.mean(np.abs(np.array(recent_predictions) - np.array(recent_actuals)))
        mape = np.mean(np.abs((np.array(recent_actuals) - np.array(recent_predictions)) / np.array(recent_actuals))) * 100
        
        return {'mae': mae, 'mape': mape, 'periods': periods}
    
    def should_retrain(self, mape_threshold=2.0):
        perf = self.get_recent_performance()
        if perf is None:
            return False
        return perf['mape'] > mape_threshold

# Usage
monitor = PerformanceMonitor()

# Log predictions over time
for timestamp, pred, actual in prediction_history:
    monitor.log_prediction(timestamp, pred, actual)

if monitor.should_retrain():
    print("⚠️ Retraining recommended")
    predictor.train(df)

Best Practices Summary

  1. Use 2000 data points for XGBoost, 1000 for Prophet
  2. Preprocess data - handle missing values and outliers
  3. Optimize hyperparameters - even quick tuning helps significantly
  4. Use cross-validation - don’t trust single train/test split
  5. Create ensembles - average 3-5 models for stability
  6. Implement adaptive retraining - retrain when errors increase
  7. Cache predictions - avoid redundant computations
  8. Monitor performance - track accuracy over time
  9. Optimize memory - use float32 instead of float64
  10. Test multiple timeframes - 1h usually best for most cases

Next Steps

Build docs developers (and LLMs) love