Overview
This guide covers optimization strategies to maximize prediction accuracy and system performance in CryptoView Pro.Data Optimization
Optimal Data Quantity
Minimum Requirements
# Recommended data points by model
MIN_DATA_REQUIREMENTS = {
'xgboost': 500, # Minimum for stable training
'prophet': 100, # Minimum for trend detection
'hybrid': 500 # Uses XGBoost minimum
}
RECOMMENDED_DATA = {
'xgboost': 2000, # Optimal performance
'prophet': 1000, # Better seasonality detection
'hybrid': 2000 # Best results
}
def check_data_quality(df, model_type='xgboost'):
"""
Validate data quality for model training
"""
min_required = MIN_DATA_REQUIREMENTS[model_type]
recommended = RECOMMENDED_DATA[model_type]
actual_points = len(df)
if actual_points < min_required:
print(f"❌ ERROR: Need {min_required} points, have {actual_points}")
return False
elif actual_points < recommended:
print(f"⚠️ WARNING: Have {actual_points} points, recommended {recommended}")
print(f" Accuracy may be reduced by ~{(1 - actual_points/recommended)*20:.1f}%")
return True
else:
print(f"✅ Data quality good: {actual_points} points")
return True
# Usage
if check_data_quality(df, 'xgboost'):
predictor.train(df)
Data Preprocessing
Handle Missing Values
def preprocess_data(df):
"""
Clean and prepare data for optimal model performance
"""
df = df.copy()
# 1. Remove duplicates
df = df[~df.index.duplicated(keep='first')]
# 2. Handle missing values
if df.isnull().sum().sum() > 0:
print(f"⚠️ Found {df.isnull().sum().sum()} missing values")
# Forward fill for price data (conservative)
price_cols = ['open', 'high', 'low', 'close']
df[price_cols] = df[price_cols].fillna(method='ffill')
# Volume: use median
if 'volume' in df.columns:
df['volume'] = df['volume'].fillna(df['volume'].median())
# 3. Remove outliers (> 20% change in 1 period)
returns = df['close'].pct_change()
outliers = abs(returns) > 0.20
if outliers.sum() > 0:
print(f"⚠️ Found {outliers.sum()} outliers (>20% moves)")
# Replace outliers with interpolated values
df.loc[outliers, 'close'] = np.nan
df['close'] = df['close'].interpolate(method='linear')
# 4. Ensure chronological order
df = df.sort_index()
# 5. Validate OHLC relationships
invalid = (df['high'] < df['low']) | (df['close'] > df['high']) | (df['close'] < df['low'])
if invalid.sum() > 0:
print(f"⚠️ Fixed {invalid.sum()} invalid OHLC relationships")
df.loc[invalid, 'high'] = df.loc[invalid, [['open', 'close']].max(axis=1)
df.loc[invalid, 'low'] = df.loc[invalid, ['open', 'close']].min(axis=1)
print("✅ Data preprocessing complete")
return df
# Usage
df = preprocess_data(df)
Optimal Timeframe Selection
def select_timeframe(forecast_hours: int):
"""
Choose optimal data timeframe for prediction horizon
Rule: Use timeframe that gives 50-200x data points vs forecast
"""
recommendations = {
'short': { # 1-24 hours
'timeframes': ['5m', '15m', '1h'],
'preferred': '15m',
'data_points': 2000
},
'medium': { # 24-168 hours (1 week)
'timeframes': ['1h', '4h'],
'preferred': '1h',
'data_points': 2000
},
'long': { # 1+ weeks
'timeframes': ['4h', '1d'],
'preferred': '1d',
'data_points': 1000
}
}
if forecast_hours <= 24:
category = 'short'
elif forecast_hours <= 168:
category = 'medium'
else:
category = 'long'
rec = recommendations[category]
print(f"For {forecast_hours}h forecast:")
print(f" Recommended timeframe: {rec['preferred']}")
print(f" Alternative timeframes: {', '.join(rec['timeframes'])}")
print(f" Suggested data points: {rec['data_points']}")
return rec['preferred'], rec['data_points']
# Usage
timeframe, data_limit = select_timeframe(48)
Model-Specific Optimization
XGBoost Hyperparameter Tuning
Quick Optimization
def optimize_xgboost_quick(df):
"""
Fast hyperparameter optimization for XGBoost
Tests 3-5 configurations
"""
from models.xgboost_model import XGBoostCryptoPredictor, backtest_model
configs = [
# Conservative (stable)
{'n_estimators': 150, 'learning_rate': 0.05, 'max_depth': 5},
# Balanced (default)
{'n_estimators': 200, 'learning_rate': 0.07, 'max_depth': 6},
# Aggressive (captures complexity)
{'n_estimators': 250, 'learning_rate': 0.08, 'max_depth': 7},
]
best_config = None
best_mape = float('inf')
print("Testing XGBoost configurations...\n")
for i, config in enumerate(configs, 1):
print(f"Config {i}/3: {config}")
predictor = XGBoostCryptoPredictor(**config)
results = backtest_model(df, predictor, train_size=0.8)
mape = results['metrics']['test_mape']
print(f" MAPE: {mape:.3f}%")
print(f" Direction Accuracy: {results['metrics']['test_direction_accuracy']:.1f}%\n")
if mape < best_mape:
best_mape = mape
best_config = config
print(f"✅ Best config: {best_config}")
print(f" MAPE: {best_mape:.3f}%")
return XGBoostCryptoPredictor(**best_config)
# Usage
optimized_predictor = optimize_xgboost_quick(df)
Grid Search (Comprehensive)
import itertools
def optimize_xgboost_grid(df, verbose=True):
"""
Comprehensive grid search for XGBoost
Warning: Can take 5-10 minutes
"""
from models.xgboost_model import XGBoostCryptoPredictor
param_grid = {
'n_estimators': [150, 200, 250],
'learning_rate': [0.05, 0.07, 0.09],
'max_depth': [5, 6, 7],
'subsample': [0.8, 0.9],
'colsample_bytree': [0.8, 0.9]
}
keys = param_grid.keys()
values = param_grid.values()
combinations = list(itertools.product(*values))
print(f"Testing {len(combinations)} configurations...\n")
best_config = None
best_score = float('inf')
results = []
for i, combo in enumerate(combinations, 1):
config = dict(zip(keys, combo))
try:
predictor = XGBoostCryptoPredictor(**config)
metrics = predictor.train(df, train_size=0.8)
# Composite score: MAPE + direction penalty
score = metrics['test_mape'] * (1 + (70 - metrics['test_direction_accuracy'])/100)
results.append({
'config': config,
'mape': metrics['test_mape'],
'direction_acc': metrics['test_direction_accuracy'],
'score': score
})
if score < best_score:
best_score = score
best_config = config
if verbose and i % 10 == 0:
print(f"Progress: {i}/{len(combinations)} ({i/len(combinations)*100:.1f}%)")
except Exception as e:
if verbose:
print(f"Config failed: {config} - {e}")
# Sort results
results = sorted(results, key=lambda x: x['score'])
print(f"\n✅ Optimization complete!")
print(f"\nTop 3 configurations:")
for i, r in enumerate(results[:3], 1):
print(f"\n{i}. MAPE: {r['mape']:.3f}% | Dir Acc: {r['direction_acc']:.1f}%")
print(f" {r['config']}")
return XGBoostCryptoPredictor(**best_config), results
# Usage
predictor, all_results = optimize_xgboost_grid(df)
Feature Selection
def optimize_features(df, predictor):
"""
Identify and keep only important features
"""
# Train model
predictor.train(df, train_size=0.8)
# Get feature importance
importance_df = predictor.get_feature_importance()
# Keep top features that account for 95% of importance
importance_df['cumsum'] = importance_df['importance'].cumsum()
total_importance = importance_df['importance'].sum()
important_features = importance_df[
importance_df['cumsum'] <= total_importance * 0.95
]['feature'].tolist()
print(f"Reduced features from {len(predictor.feature_columns)} to {len(important_features)}")
print(f"Top 10 features:")
print(importance_df.head(10))
# Update predictor to use only important features
predictor.feature_columns = important_features
return predictor
# Usage
predictor = optimize_features(df, predictor)
Prophet Optimization
Parameter Tuning
def optimize_prophet(df):
"""
Find optimal Prophet parameters
"""
from models.prophet_model import ProphetCryptoPredictor, backtest_prophet
param_combinations = [
# Conservative
{'changepoint_prior_scale': 0.2, 'seasonality_prior_scale': 5},
# Balanced
{'changepoint_prior_scale': 0.3, 'seasonality_prior_scale': 10},
# Moderate
{'changepoint_prior_scale': 0.4, 'seasonality_prior_scale': 12},
# Aggressive (best for crypto)
{'changepoint_prior_scale': 0.5, 'seasonality_prior_scale': 15},
]
best_config = None
best_mape = float('inf')
print("Testing Prophet configurations...\n")
for i, config in enumerate(param_combinations, 1):
print(f"Config {i}/{len(param_combinations)}: {config}")
predictor = ProphetCryptoPredictor(**config)
results = backtest_prophet(df, predictor, test_periods=168)
mape = results['test_metrics']['mape']
print(f" MAPE: {mape:.3f}%")
print(f" Direction Accuracy: {results['test_metrics']['direction_accuracy']:.1f}%\n")
if mape < best_mape:
best_mape = mape
best_config = config
print(f"✅ Best config: {best_config}")
print(f" MAPE: {best_mape:.3f}%")
return ProphetCryptoPredictor(**best_config)
# Usage
optimized_prophet = optimize_prophet(df)
Adding Custom Seasonality
def add_crypto_seasonality(predictor):
"""
Add crypto-specific seasonal patterns
"""
# 4-hour cycle (common in crypto)
predictor.model.add_seasonality(
name='four_hourly',
period=4/24, # 4 hours in days
fourier_order=5
)
# Monthly cycle (alt season, etc)
predictor.model.add_seasonality(
name='monthly',
period=30.5,
fourier_order=10
)
return predictor
Training Optimization
Cross-Validation
from sklearn.model_selection import TimeSeriesSplit
def cross_validate_model(df, predictor, n_splits=5):
"""
Perform time series cross-validation
"""
tscv = TimeSeriesSplit(n_splits=n_splits)
scores = []
print(f"Running {n_splits}-fold cross-validation...\n")
for i, (train_idx, test_idx) in enumerate(tscv.split(df), 1):
train_df = df.iloc[train_idx]
test_df = df.iloc[test_idx]
# Train
metrics = predictor.train(train_df, train_size=0.9)
# Test
predictions = predictor.predict_future(train_df, periods=len(test_df))
# Evaluate
actual = test_df['close'].values[:len(predictions)]
predicted = predictions['predicted_price'].values
mape = np.mean(np.abs((actual - predicted) / actual)) * 100
scores.append(mape)
print(f"Fold {i}: MAPE = {mape:.3f}%")
print(f"\nAverage MAPE: {np.mean(scores):.3f}% ± {np.std(scores):.3f}%")
return scores
# Usage
cv_scores = cross_validate_model(df, predictor, n_splits=5)
Ensemble Methods
def create_ensemble(df, periods=24):
"""
Create ensemble of multiple XGBoost models with different seeds
"""
from models.xgboost_model import XGBoostCryptoPredictor
import numpy as np
n_models = 5
predictions_list = []
print(f"Training {n_models} models for ensemble...\n")
for i in range(n_models):
# Different random seed for each model
predictor = XGBoostCryptoPredictor()
predictor.model.random_state = 42 + i
predictor.train(df, train_size=0.8)
pred = predictor.predict_future(df, periods=periods)
predictions_list.append(pred['predicted_price'].values)
print(f"Model {i+1}/{n_models} trained")
# Average predictions
ensemble_pred = np.mean(predictions_list, axis=0)
# Calculate prediction std (uncertainty)
ensemble_std = np.std(predictions_list, axis=0)
# Create result DataFrame
result = pred.copy()
result['predicted_price'] = ensemble_pred
result['ensemble_std'] = ensemble_std
result['lower_bound'] = ensemble_pred - 2 * ensemble_std
result['upper_bound'] = ensemble_pred + 2 * ensemble_std
print(f"\n✅ Ensemble complete")
print(f"Average std: {ensemble_std.mean():.2f}")
return result
# Usage
ensemble_predictions = create_ensemble(df, periods=48)
Prediction Optimization
Adaptive Retraining
class AdaptivePredictor:
"""
Automatically retrains when performance degrades
"""
def __init__(self, predictor, performance_threshold=2.0):
self.predictor = predictor
self.threshold = performance_threshold
self.last_train_time = None
self.recent_errors = []
def predict(self, df, periods=24):
# Train if never trained
if not self.predictor.trained:
print("Initial training...")
self.predictor.train(df)
self.last_train_time = datetime.now()
return self.predictor.predict_future(df, periods)
# Make prediction
predictions = self.predictor.predict_future(df, periods)
# Check if we should retrain
actual_next = df['close'].iloc[-1]
predicted_next = predictions['predicted_price'].iloc[0]
error_pct = abs((actual_next - predicted_next) / actual_next) * 100
self.recent_errors.append(error_pct)
if len(self.recent_errors) > 10:
self.recent_errors.pop(0)
avg_error = np.mean(self.recent_errors)
if avg_error > self.threshold:
print(f"⚠️ Average error {avg_error:.2f}% > threshold {self.threshold}%")
print("Retraining model...")
self.predictor.train(df)
self.last_train_time = datetime.now()
predictions = self.predictor.predict_future(df, periods)
self.recent_errors = []
return predictions
# Usage
from models.xgboost_model import XGBoostCryptoPredictor
base_predictor = XGBoostCryptoPredictor()
adaptive = AdaptivePredictor(base_predictor, performance_threshold=1.5)
predictions = adaptive.predict(df, periods=24)
Prediction Caching
import hashlib
import pickle
from pathlib import Path
def cached_prediction(df, predictor, periods, cache_dir='./cache'):
"""
Cache predictions to avoid recomputation
"""
# Create cache key from data and parameters
data_hash = hashlib.md5(
df.tail(100).to_json().encode()
).hexdigest()
cache_key = f"{predictor.__class__.__name__}_{periods}_{data_hash}.pkl"
cache_path = Path(cache_dir) / cache_key
# Check cache
if cache_path.exists():
cache_age = datetime.now() - datetime.fromtimestamp(cache_path.stat().st_mtime)
if cache_age.seconds < 300: # 5 minutes
print("✅ Using cached prediction")
with open(cache_path, 'rb') as f:
return pickle.load(f)
# Generate new prediction
print("Generating new prediction...")
if not predictor.trained:
predictor.train(df)
predictions = predictor.predict_future(df, periods)
# Cache result
cache_path.parent.mkdir(exist_ok=True)
with open(cache_path, 'wb') as f:
pickle.dump(predictions, f)
return predictions
System Performance
Memory Optimization
import gc
def optimize_memory(df):
"""
Reduce memory usage of DataFrame
"""
memory_before = df.memory_usage(deep=True).sum() / 1024**2
# Convert float64 to float32
float_cols = df.select_dtypes(include=['float64']).columns
df[float_cols] = df[float_cols].astype('float32')
# Convert int64 to int32
int_cols = df.select_dtypes(include=['int64']).columns
df[int_cols] = df[int_cols].astype('int32')
memory_after = df.memory_usage(deep=True).sum() / 1024**2
print(f"Memory usage: {memory_before:.2f} MB → {memory_after:.2f} MB")
print(f"Saved: {memory_before - memory_after:.2f} MB ({(1 - memory_after/memory_before)*100:.1f}%)")
# Force garbage collection
gc.collect()
return df
Parallel Processing
from concurrent.futures import ThreadPoolExecutor
def parallel_multi_symbol_predictions(symbols, df_dict, periods=24):
"""
Generate predictions for multiple symbols in parallel
"""
from models.hybrid_model import HybridCryptoPredictor
def predict_symbol(symbol):
predictor = HybridCryptoPredictor()
df = df_dict[symbol]
predictor.train(df)
predictions = predictor.predict_future(df, periods)
return symbol, predictions
results = {}
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(predict_symbol, sym) for sym in symbols]
for future in futures:
symbol, predictions = future.result()
results[symbol] = predictions
print(f"✅ {symbol} complete")
return results
# Usage
symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']
results = parallel_multi_symbol_predictions(symbols, df_dict, periods=24)
Monitoring & Maintenance
Performance Tracking
class PerformanceMonitor:
def __init__(self):
self.predictions = []
self.actuals = []
self.timestamps = []
def log_prediction(self, timestamp, predicted, actual=None):
self.timestamps.append(timestamp)
self.predictions.append(predicted)
if actual is not None:
self.actuals.append(actual)
def get_recent_performance(self, periods=10):
if len(self.actuals) < periods:
return None
recent_predictions = self.predictions[-periods:]
recent_actuals = self.actuals[-periods:]
mae = np.mean(np.abs(np.array(recent_predictions) - np.array(recent_actuals)))
mape = np.mean(np.abs((np.array(recent_actuals) - np.array(recent_predictions)) / np.array(recent_actuals))) * 100
return {'mae': mae, 'mape': mape, 'periods': periods}
def should_retrain(self, mape_threshold=2.0):
perf = self.get_recent_performance()
if perf is None:
return False
return perf['mape'] > mape_threshold
# Usage
monitor = PerformanceMonitor()
# Log predictions over time
for timestamp, pred, actual in prediction_history:
monitor.log_prediction(timestamp, pred, actual)
if monitor.should_retrain():
print("⚠️ Retraining recommended")
predictor.train(df)
Best Practices Summary
- Use 2000 data points for XGBoost, 1000 for Prophet
- Preprocess data - handle missing values and outliers
- Optimize hyperparameters - even quick tuning helps significantly
- Use cross-validation - don’t trust single train/test split
- Create ensembles - average 3-5 models for stability
- Implement adaptive retraining - retrain when errors increase
- Cache predictions - avoid redundant computations
- Monitor performance - track accuracy over time
- Optimize memory - use float32 instead of float64
- Test multiple timeframes - 1h usually best for most cases
Next Steps
- Review making predictions workflow
- Learn result analysis techniques
- Understand model selection criteria