The QuestEngine is NeuraTrade’s autonomous orchestration system that schedules and executes trading quests with distributed coordination, priority management, and fault tolerance.
Overview
Quests are schedulable tasks representing autonomous trading activities:
Routine Quests : Time-triggered (cron-based)
Triggered Quests : Event-driven (arbitrage opportunities, alerts)
Goal Quests : Milestone-driven (profit targets, position goals)
Arbitrage Quests : Opportunity execution workflows
// services/backend-api/internal/services/quest_engine.go:85-104
type Quest struct {
ID string
Name string
Type QuestType
Cadence QuestCadence // micro, hourly, daily, weekly, onetime
Status QuestStatus // pending, active, completed, failed, paused
Priority Priority // CRITICAL > HIGH > NORMAL > LOW
Prompt string // AI agent prompt
Handler QuestHandler // Execution function
LastExecutedAt * time . Time
Checkpoint map [ string ] interface {}
}
Architecture
Core Components
Cron Scheduling
Routine quests run on predefined schedules using cron expressions.
Cadence Types
Cadence Interval Use Case Micro 1-5 minutes Real-time arbitrage scanning Hourly Every hour Market analysis, funding rate checks Daily Once per day Portfolio rebalancing, performance reports Weekly Once per week Deep analysis, strategy optimization Onetime Single execution One-off tasks, migrations
Cron Expressions
// Define quest with custom cron schedule
quest := & Quest {
Name : "Funding Rate Analysis" ,
Type : QuestTypeRoutine ,
CronExpr : "0 */1 * * *" , // Every hour on the hour
Handler : analyzeFundingRates ,
}
questEngine . RegisterQuest ( quest )
Scheduler Loop
The main scheduler runs every 5 seconds:
func ( qe * QuestEngine ) schedulerLoop () {
ticker := time . NewTicker ( 5 * time . Second )
defer ticker . Stop ()
for {
select {
case <- ticker . C :
qe . evaluateQuests ()
case <- qe . ctx . Done ():
return
}
}
}
func ( qe * QuestEngine ) evaluateQuests () {
for _ , quest := range qe . getActiveQuests () {
if qe . shouldExecute ( quest ) {
qe . executeQuest ( quest )
}
}
}
Scheduler poll interval: services/backend-api/internal/services/quest_engine.go:30
Event Triggers
Triggered quests execute immediately when specific events occur.
Event Types
When arbitrage engines detect profitable opportunities: arbitrageService . OnOpportunity ( func ( opp * ArbitrageOpportunity ) {
quest := & Quest {
Name : fmt . Sprintf ( "Arbitrage: %s " , opp . Symbol ),
Type : QuestTypeArbitrage ,
Prompt : BuildArbitragePrompt ( opp ),
Handler : ExecuteArbitrageQuest ,
}
questEngine . TriggerQuest ( ctx , quest )
})
When risk thresholds are exceeded: riskManager . OnDrawdownAlert ( func ( alert * DrawdownAlert ) {
quest := & Quest {
Name : "Emergency Risk Review" ,
Type : QuestTypeTriggered ,
Priority : PriorityCritical ,
Handler : ReviewRiskAndAct ,
}
questEngine . TriggerQuest ( ctx , quest )
})
When market conditions shift significantly: regimeDetector . OnRegimeChange ( func ( newRegime MarketRegime ) {
quest := & Quest {
Name : "Regime Adaptation" ,
Type : QuestTypeTriggered ,
Priority : PriorityHigh ,
Handler : AdaptToRegime ,
}
questEngine . TriggerQuest ( ctx , quest )
})
When positions hit stop-loss levels: positionTracker . OnStopLoss ( func ( position * Position ) {
quest := & Quest {
Name : fmt . Sprintf ( "Stop Loss: %s " , position . Symbol ),
Type : QuestTypeTriggered ,
Priority : PriorityCritical ,
Handler : ExecuteStopLoss ,
}
questEngine . TriggerQuest ( ctx , quest )
})
Priority Levels
Quests are executed in priority order to ensure critical tasks run first.
Priority Hierarchy
type Priority string
const (
PriorityCritical Priority = "CRITICAL" // Emergency actions, stop-losses
PriorityHigh Priority = "HIGH" // Arbitrage execution, risk alerts
PriorityNormal Priority = "NORMAL" // Regular trading decisions
PriorityLow Priority = "LOW" // Analysis, reports
)
Execution Order
func ( qe * QuestEngine ) getNextQuest () * Quest {
qe . mu . RLock ()
defer qe . mu . RUnlock ()
// Sort quests by priority, then by creation time
sorted := sortQuestsByPriority ( qe . pendingQuests )
for _ , quest := range sorted {
if ! qe . isExecuting ( quest . ID ) {
return quest
}
}
return nil
}
CRITICAL priority quests can interrupt NORMAL or LOW priority quests if goroutine pool capacity is needed.
Redis Coordination
Redis provides distributed coordination for multi-instance deployments.
Distributed Locks
Ensure only one instance executes each quest:
func ( qe * QuestEngine ) acquireLock ( questID string ) ( bool , error ) {
lockKey := fmt . Sprintf ( "quest:lock: %s " , questID )
lockTTL := 3 * time . Minute
// Try to acquire lock with NX (not exists) flag
acquired , err := qe . redis . SetNX (
ctx ,
lockKey ,
qe . instanceID ,
lockTTL ,
). Result ()
return acquired , err
}
func ( qe * QuestEngine ) releaseLock ( questID string ) error {
lockKey := fmt . Sprintf ( "quest:lock: %s " , questID )
// Only release if we own the lock
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
return qe . redis . Eval ( ctx , script , [] string { lockKey }, qe . instanceID ). Err ()
}
Lock implementation: services/backend-api/internal/services/quest_engine.go:161
Lock TTL and Stale Detection
Locks automatically expire to prevent deadlocks:
const (
defaultQuestExecutionStale = 3 * time . Minute
questExecutionLockTail = 35 * time . Second
)
// Lock TTL = execution timeout + lock tail buffer
lockTTL := executionTimeout + questExecutionLockTail
Stale quest detection:
func ( qe * QuestEngine ) detectStaleQuests () {
for questID , startTime := range qe . executionStarts {
if time . Since ( startTime ) > defaultQuestExecutionStale {
qe . logger . Warn ( "Stale quest detected" ,
"quest_id" , questID ,
"elapsed" , time . Since ( startTime ))
qe . forceReleaseLock ( questID )
}
}
}
Goroutine Pool
Concurrency is controlled via a goroutine pool with configurable limits.
Pool Configuration
type QuestEngineConfig struct {
MaxConcurrentQuests int // Default: 5
WorkerPoolSize int // Default: 10
}
Pool Implementation
type WorkerPool struct {
maxWorkers int
sem chan struct {} // Semaphore for concurrency control
}
func ( p * WorkerPool ) Execute ( ctx context . Context , fn func ()) error {
select {
case p . sem <- struct {}{}: // Acquire slot
defer func () { <- p . sem }() // Release slot
fn ()
return nil
case <- ctx . Done ():
return ctx . Err ()
}
}
Execution Flow
State Persistence
Quest state is persisted to survive restarts.
Checkpoint System
type Quest struct {
Checkpoint map [ string ] interface {} // Arbitrary state
}
// Update checkpoint during execution
func ( qe * QuestEngine ) updateCheckpoint ( questID string , data map [ string ] interface {}) {
quest := qe . getQuest ( questID )
for k , v := range data {
quest . Checkpoint [ k ] = v
}
qe . persistQuest ( quest )
}
Storage
Quests are stored in both Redis (hot) and SQLite (persistent):
// Hot state in Redis (fast access)
func ( qe * QuestEngine ) cacheQuest ( quest * Quest ) error {
key := fmt . Sprintf ( "quest: %s " , quest . ID )
data , _ := json . Marshal ( quest )
return qe . redis . Set ( ctx , key , data , 24 * time . Hour ). Err ()
}
// Persistent state in SQLite (durability)
func ( qe * QuestEngine ) persistQuest ( quest * Quest ) error {
query := `
INSERT OR REPLACE INTO quests (
id, name, type, status, checkpoint, last_executed_at
) VALUES (?, ?, ?, ?, ?, ?)
`
checkpointJSON , _ := json . Marshal ( quest . Checkpoint )
return qe . db . Exec ( query , quest . ID , quest . Name , quest . Type ,
quest . Status , checkpointJSON , quest . LastExecutedAt ). Error
}
Fault Tolerance
Timeout Management
func ( qe * QuestEngine ) executeWithTimeout ( quest * Quest ) error {
timeout := qe . getQuestTimeout ( quest )
ctx , cancel := context . WithTimeout ( qe . ctx , timeout )
defer cancel ()
errChan := make ( chan error , 1 )
go func () {
errChan <- quest . Handler ( ctx , quest )
}()
select {
case err := <- errChan :
return err
case <- ctx . Done ():
return fmt . Errorf ( "quest timeout after %v " , timeout )
}
}
Retry Logic
func ( qe * QuestEngine ) executeWithRetry ( quest * Quest ) error {
maxRetries := 3
backoff := time . Second
for attempt := 0 ; attempt < maxRetries ; attempt ++ {
err := qe . execute ( quest )
if err == nil {
return nil
}
if ! isRetryable ( err ) {
return err // Permanent failure
}
time . Sleep ( backoff )
backoff *= 2 // Exponential backoff
}
return fmt . Errorf ( "quest failed after %d retries" , maxRetries )
}
Circuit Breaker
func ( qe * QuestEngine ) executeWithCircuitBreaker ( quest * Quest ) error {
cb := qe . circuitBreakers [ quest . Type ]
return cb . Execute ( func () error {
return qe . execute ( quest )
})
}
Circuit breaker config: services/backend-api/internal/services/quest_engine.go:38-52
Quest Lifecycle
Status Transitions
func ( qe * QuestEngine ) transitionStatus ( questID string , newStatus QuestStatus ) {
quest := qe . getQuest ( questID )
oldStatus := quest . Status
// Validate transition
if ! isValidTransition ( oldStatus , newStatus ) {
return
}
quest . Status = newStatus
quest . UpdatedAt = time . Now ()
// Emit event
qe . emitEvent ( QuestStatusChanged {
QuestID : questID ,
OldStatus : oldStatus ,
NewStatus : newStatus ,
})
qe . persistQuest ( quest )
}
Monitoring
Metrics
type QuestEngineMetrics struct {
TotalQuests int64
ActiveQuests int64
CompletedQuests int64
FailedQuests int64
AverageExecTime time . Duration
LockContentionRate float64
}
Health Checks
func ( qe * QuestEngine ) HealthCheck () HealthStatus {
return HealthStatus {
Healthy : qe . isRunning ,
ActiveQuests : len ( qe . executing ),
PoolCapacity : qe . pool . Available (),
RedisHealth : qe . redis . Ping ( ctx ). Err () == nil ,
}
}
Next Steps
AI Agents Multi-agent system invoked by quests
AI Reasoning LLM provider registry and failover