Overview
The PredictionEngine class is the top-level orchestrator that combines:
EWMA Volatility — per-second volatility estimation
Black-Scholes — base probability from geometric Brownian motion
Momentum Analyzer — ROC signals across multiple windows
Mean Reversion — deviation from 2-minute SMA
Platt Calibration — post-hoc probability recalibration
It produces a final probability estimate or abstains when conditions are unfavorable.
API Reference
Constructor
import { PredictionEngine } from './engine/predictor.js'
import config from './config.js'
const engine = new PredictionEngine ()
Internal state initialization:
Creates EWMAVolatility instance with λ from config
Creates MomentumAnalyzer with buffer size from config
Initializes PlattScaler (starts unfitted)
Allocates rolling outcome buffer for cold-streak detection
feedTick()
Ingest a price tick into both the volatility estimator and momentum analyzer.
engine . feedTick ({ timestamp , price })
Parameters :
Parameter Type Description timestampnumberTick timestamp in milliseconds (Unix epoch) pricenumberCurrent market price (e.g., 0.52 for 52%)
Returns : void
Example :
const tick = {
timestamp: Date . now (),
price: 0.5234
}
engine . feedTick ( tick )
Call this method for every price tick received from the market. The more ticks, the better the volatility and momentum estimates.
predict()
Produce a probability prediction for a binary market outcome.
const result = engine . predict ({
currentPrice ,
strikePrice ,
timeRemainingSeconds
})
Parameters :
Parameter Type Description currentPricenumberLatest market price strikePricenumberTarget threshold (“Will price exceed this?”) timeRemainingSecondsnumberSeconds until market close
Returns :
type PredictionResult =
| {
probability : number // Calibrated probability [0.01, 0.99]
direction : 'UP' | 'DOWN' // Predicted direction
volatility : number // Current per-second volatility
momentum : number // Combined ROC signal (raw)
reversion : number // Mean-reversion signal (raw)
calibrated : boolean // Whether Platt calibration was applied
abstained ?: false
}
| {
abstained : true
reason : string // Abstention reason
volatility ?: number
probability ?: number // Partial result (if available)
direction ?: 'UP' | 'DOWN'
}
Abstention Reasons :
Reason Trigger insufficient_dataσ = 0 or ticks < minTicks dead_zone|p - 0.5| < deadZone anomalous_regimeσ > sigmaMultiplier × mean(σ) cold_streakRecent accuracy < minAccuracy
Example :
const result = engine . predict ({
currentPrice: 0.52 ,
strikePrice: 0.55 ,
timeRemainingSeconds: 300 // 5 minutes
})
if ( result . abstained ) {
console . log ( `Abstained: ${ result . reason } ` )
} else {
console . log ( `Prediction: ${ result . direction } at ${ ( result . probability * 100 ). toFixed ( 2 ) } %` )
console . log ( `Volatility: ${ result . volatility . toFixed ( 6 ) } , Calibrated: ${ result . calibrated } ` )
}
recordOutcome()
Record the outcome of a prediction for cold-streak tracking and Platt calibration.
engine . recordOutcome ( correct , predictedProb )
Parameters :
Parameter Type Description correctbooleanWas the prediction correct? predictedProbnumber?The probability that was predicted (optional)
Returns : void
Example :
// After market closes
const prediction = engine . predict ({ ... })
if ( ! prediction . abstained ) {
// ... execute trade ...
// Later, record outcome
const actualResult = 'UP' // from market API
const correct = ( prediction . direction === actualResult )
engine . recordOutcome ( correct , prediction . probability )
}
Critical for Calibration : If you never call recordOutcome(), the Platt scaler will never accumulate data and calibration will never activate.
getRecentAccuracy()
Get accuracy over the most recent outcomes window.
const accuracy = engine . getRecentAccuracy ()
Returns : number — accuracy in [0, 1], or 1 if no outcomes recorded.
Example :
if ( engine . getRecentAccuracy () < 0.5 ) {
console . warn ( 'Model is in a cold streak — predictions may be unreliable' )
}
reset()
Reset all internal state (volatility + momentum + calibration).
Returns : void
Use cases :
Market close / start of new trading session
Context switch (different market or asset)
Emergency flush (e.g., detected data corruption)
Destructive Operation : This clears all accumulated data, including EWMA volatility history and Platt calibration. The engine will need to re-warm-up.
resetMomentum()
Reset only the momentum analyzer, preserving volatility and calibration state.
Returns : void
Use cases :
Start of a new prediction interval (volatility should persist)
Clear momentum signals without disrupting long-term volatility estimate
Example :
// At the start of each new market interval
engine . resetMomentum ()
Typical Pattern : Call resetMomentum() at interval boundaries, but only call reset() at market close or when switching contexts.
Prediction Pipeline
Step-by-Step Flow
Code Walkthrough
predict ({ currentPrice , strikePrice , timeRemainingSeconds }) {
const sigma = this . _volatility . getVolatility ()
const abstentionCfg = config . engine . abstention
// ── Abstention condition 1: insufficient data ──
if ( sigma === 0 || this . _volatility . getTickCount () < abstentionCfg . minTicks ) {
return { abstained: true , reason: 'insufficient_data' , volatility: sigma }
}
// ── Base probability from Black-Scholes ──
const baseProb = binaryCallProbability ({
currentPrice ,
strikePrice ,
volatility: sigma ,
timeRemainingSeconds ,
riskFreeRate: 0 ,
})
// ── Abstention condition 2: dead zone ──
if ( Math . abs ( baseProb - 0.5 ) < abstentionCfg . deadZone ) {
return { abstained: true , reason: 'dead_zone' , volatility: sigma , probability: baseProb }
}
// ── Abstention condition 3: anomalous regime ──
const meanSigma = this . _volatility . getMeanSigma ()
if ( meanSigma > 0 && sigma > abstentionCfg . sigmaMultiplier * meanSigma ) {
return { abstained: true , reason: 'anomalous_regime' , volatility: sigma , probability: baseProb }
}
// ── Abstention condition 4: cold streak ──
const window = abstentionCfg . minAccuracyWindow
if ( this . _recentOutcomes . length >= window && this . getRecentAccuracy () < abstentionCfg . minAccuracy ) {
return { abstained: true , reason: 'cold_streak' , volatility: sigma , probability: baseProb }
}
// ── Momentum and reversion factors ──
const { combined : momentumFactor } = this . _momentum . getMomentum ()
const { signal : reversionFactor } = this . _momentum . getMeanReversion ()
// ── Near-expiry guard: skip adjustments when <= 5 seconds remain ──
let finalProb
if ( timeRemainingSeconds <= config . engine . prediction . nearExpiryGuardSec ) {
finalProb = baseProb
} else {
// Combine in logit space
const logitBase = logit ( baseProb )
const logitAdj = logitBase
+ config . engine . prediction . logitMomentumWeight * momentumFactor
+ config . engine . prediction . logitReversionWeight * reversionFactor
finalProb = sigmoid ( logitAdj )
}
// ── Safety clamp to [0.01, 0.99] ──
finalProb = clamp ( finalProb , 0.01 , 0.99 )
// ── Platt calibration (auto-activates at 200+ samples) ──
let calibrated = false
if ( this . _scaler . canFit ()) {
if ( ! this . _scaler . getStats (). fitted ) {
this . _scaler . fit ()
}
finalProb = this . _scaler . calibrate ( finalProb )
finalProb = clamp ( finalProb , 0.01 , 0.99 )
calibrated = true
}
const direction = finalProb >= 0.5 ? 'UP' : 'DOWN'
this . _lastPrediction = finalProb
return {
probability: finalProb ,
direction ,
volatility: sigma ,
momentum: momentumFactor ,
reversion: reversionFactor ,
calibrated ,
}
}
Configuration
Key engine parameters from config.js:
engine : {
ewma : {
lambda : 0.94 // EWMA decay factor
},
momentum : {
bufferSize : 300 // Max ticks retained
},
prediction : {
logitMomentumWeight : 2.0 , // Momentum signal weight (logit space)
logitReversionWeight : 1.5 , // Reversion signal weight (logit space)
nearExpiryGuardSec : 5 // Disable adjustments when t ≤ 5s
},
abstention : {
minTicks : 5 , // Minimum ticks before predicting
deadZone : 0.05 , // ±5% around 0.5
sigmaMultiplier : 3.0 , // Anomaly threshold (3× mean volatility)
minAccuracyWindow : 20 , // Rolling accuracy window size
minAccuracy : 0.50 // Cold-streak threshold
}
}
Usage Example
Complete Workflow
import { PredictionEngine } from './engine/predictor.js'
const engine = new PredictionEngine ()
// ── Phase 1: Data Collection ──
// Stream price ticks (e.g., from WebSocket)
marketStream . on ( 'tick' , ( tick ) => {
engine . feedTick ({ timestamp: tick . time , price: tick . price })
})
// ── Phase 2: Prediction ──
setInterval (() => {
const result = engine . predict ({
currentPrice: marketStream . getLatestPrice (),
strikePrice: 0.55 ,
timeRemainingSeconds: marketStream . getTimeRemaining ()
})
if ( result . abstained ) {
console . log ( `[ABSTAIN] ${ result . reason } ` )
return
}
console . log ( `[PREDICT] ${ result . direction } @ ${ ( result . probability * 100 ). toFixed ( 2 ) } %` )
console . log ( ` Volatility: ${ result . volatility . toFixed ( 6 ) } ` )
console . log ( ` Momentum: ${ result . momentum . toFixed ( 4 ) } , Reversion: ${ result . reversion . toFixed ( 4 ) } ` )
console . log ( ` Calibrated: ${ result . calibrated } ` )
// Decide whether to trade based on EV, Kelly, etc.
const shouldTrade = evaluateTrade ( result )
if ( shouldTrade ) {
executeTrade ( result . direction , result . probability )
}
}, 10_000 ) // Every 10 seconds
// ── Phase 3: Outcome Recording ──
marketStream . on ( 'close' , ( outcome ) => {
const lastResult = getLastPrediction ()
if ( lastResult && ! lastResult . abstained ) {
const correct = ( lastResult . direction === outcome )
engine . recordOutcome ( correct , lastResult . probability )
}
// Reset momentum for next interval
engine . resetMomentum ()
})
// ── Phase 4: Session End ──
process . on ( 'SIGINT' , () => {
engine . reset () // Full reset on shutdown
process . exit ( 0 )
})
Advanced Topics
Custom Abstention Filters
You can implement additional abstention logic after calling predict():
const result = engine . predict ({ ... })
if ( ! result . abstained ) {
// Custom filter: refuse to predict if market is too illiquid
if ( marketDepth < MIN_DEPTH ) {
result . abstained = true
result . reason = 'insufficient_liquidity'
}
// Custom filter: refuse if spread is too wide
if ( marketSpread > MAX_SPREAD ) {
result . abstained = true
result . reason = 'wide_spread'
}
}
Multi-Model Ensemble
You can run multiple prediction engines with different configurations and ensemble their outputs:
const engineA = new PredictionEngine () // Conservative (λ=0.97)
const engineB = new PredictionEngine () // Aggressive (λ=0.90)
const resultA = engineA . predict ({ ... })
const resultB = engineB . predict ({ ... })
if ( ! resultA . abstained && ! resultB . abstained ) {
// Weighted average in logit space
const logitA = logit ( resultA . probability )
const logitB = logit ( resultB . probability )
const ensembleLogit = 0.6 * logitA + 0.4 * logitB
const ensembleProb = sigmoid ( ensembleLogit )
console . log ( `Ensemble: ${ ( ensembleProb * 100 ). toFixed ( 2 ) } %` )
}
Backtesting Integration
For backtesting, disable real-time dependencies and feed historical ticks:
const engine = new PredictionEngine ()
for ( const interval of historicalIntervals ) {
engine . resetMomentum ()
for ( const tick of interval . ticks ) {
engine . feedTick ( tick )
}
const result = engine . predict ({
currentPrice: interval . closePrice ,
strikePrice: interval . strikePrice ,
timeRemainingSeconds: 60
})
if ( ! result . abstained ) {
const correct = ( result . direction === interval . outcome )
engine . recordOutcome ( correct , result . probability )
// Log for analysis
backtestLog . push ({ interval: interval . id , result , correct })
}
}
// Analyze backtest results
const accuracy = backtestLog . filter ( x => x . correct ). length / backtestLog . length
console . log ( `Backtest accuracy: ${ ( accuracy * 100 ). toFixed ( 2 ) } %` )
Time Complexity
Operation Complexity Notes feedTick()O(1) EWMA update + buffer append predict()O(N) N = momentum buffer size (~300) recordOutcome()O(1) Array push + Platt collect Platt fit() O(M × I) M = samples (~200-2000), I = iterations (1000)
Typical prediction latency : ~1-2ms on modern hardware.
Memory Usage
Component Memory EWMA state O(1) + 100 × 8 bytes (sigma history) = ~1 KB Momentum buffer 300 ticks × 16 bytes = ~5 KB Platt data 2000 samples × 16 bytes = ~32 KB Total ~40 KB per engine instance
Scalability : You can run hundreds of engine instances in parallel (e.g., one per market) without memory concerns.
Error Handling
The engine is designed to be robust:
Invalid inputs : Clamped or defaulted (e.g., negative prices → return 0.5)
Edge cases : Handled gracefully (e.g., zero volatility → abstain)
Numerical stability : Logit/sigmoid use clamping to avoid log(0) or exp(∞)
Exception-Free Design : The engine does not throw exceptions under normal circumstances. Abstention is the primary error-signaling mechanism.
Testing
Unit tests for the prediction engine:
import { PredictionEngine } from './engine/predictor.js'
import assert from 'assert'
// Test: Abstain on insufficient data
const engine = new PredictionEngine ()
const result = engine . predict ({ currentPrice: 0.5 , strikePrice: 0.5 , timeRemainingSeconds: 60 })
assert ( result . abstained === true )
assert ( result . reason === 'insufficient_data' )
// Test: Prediction after warm-up
for ( let i = 0 ; i < 10 ; i ++ ) {
engine . feedTick ({ timestamp: i * 1000 , price: 0.5 + Math . random () * 0.01 })
}
const result2 = engine . predict ({ currentPrice: 0.52 , strikePrice: 0.50 , timeRemainingSeconds: 60 })
assert ( result2 . abstained === false || result2 . reason !== 'insufficient_data' )
// Test: Outcome recording
engine . recordOutcome ( true , 0.65 )
assert ( engine . getRecentAccuracy () === 1.0 )
References
See individual component docs:
Next Steps
Architecture Overview High-level system design and data flow
Configuration Tuning engine parameters for your market
Tuning Parameters Historical evaluation and parameter optimization
Metrics Performance evaluation using proper scoring rules