Enki is built around pure synchronous state machines in the core crate, with a thin async coordinator layer in the cli crate. This design makes the orchestration logic trivially testable and easy to reason about.
Critical Pattern: All ACP code uses Rc<RefCell<...>>, making it !Send. The coordinator runs on its own OS thread with a current_thread runtime + LocalSet. Never send ACP types across threads.
Without two phases: Tier slots remain occupied during merge, blocking other workersWith two phases: Merge runs in background while new workers spawn, maximizing parallelismExample:
Tier limit: 3 standard workersTime 0: Worker A, B, C running (3/3 slots)Time 1: Worker A finishes → WorkerDone (2/3 slots, merge queued)Time 2: Worker D spawns (3/3 slots)Time 3: Worker A merge completes → MergeDone → spawns Worker E (if dependencies allow)
If cp fails during worker spawn (disk full, filesystem issue), the coordinator sets infra_broken = true and auto-fails all subsequent spawn attempts without retrying.
if infra_broken { // Skip spawn attempt, immediately fail let events = orchestrator.handle( Command::WorkerFailed { task_id, error: "Infrastructure broken".into(), } ); return events;}// Try to create CoW copyif let Err(e) = create_copy(task_id) { error!("Copy failed: {e}"); infra_broken = true; // Prevent cascading retries let events = orchestrator.handle( Command::WorkerFailed { task_id, error: e } ); return events;}
pub enum EdgeCondition { Merged, // Default: dep must be Done (worker + merge complete) Completed, // Dep worker finished (WorkerDone or Done) Started, // Dep just needs to be Running (or further)}
Example: Tests can start as soon as implementation is running:
dag.add_edge("test", "impl", EdgeCondition::Started);// impl transitions to Running → test becomes Ready// impl doesn't need to finish or merge first
pub fn tick(&mut self) -> Vec<SchedulerAction> { let mut actions = Vec::new(); // Count active workers per tier let active = self.count_active_by_tier(); // Find ready nodes across all executions for exec in &self.executions { for node in exec.dag.get_ready_nodes() { let tier = node.tier; // Check if we have capacity if active[tier] < self.limits[tier] { actions.push(SchedulerAction::Spawn { task_id: node.task_id, tier, }); active[tier] += 1; } } } actions}
Fair scheduling: Executions are evaluated round-robin to prevent starvation.
The DAG is the single source of truth. The database is write-behind for crash recovery and external visibility.
Schema (simplified):
CREATE TABLE executions ( id TEXT PRIMARY KEY, status TEXT, dag TEXT, -- JSON blob of the entire DAG created_at TEXT);CREATE TABLE tasks ( id TEXT PRIMARY KEY, title TEXT, status TEXT, tier TEXT, copy_path TEXT, branch TEXT);CREATE TABLE merge_requests ( id INTEGER PRIMARY KEY, task_id TEXT, branch TEXT, status TEXT, priority INTEGER);
Auto-migration on every DB open:
pub fn auto_migrate(conn: &Connection) -> Result<()> { // Parse expected schema let expected = parse_schema(SCHEMA_DDL); // Get current schema let current = get_current_schema(conn)?; // Add missing columns for (table, columns) in expected { for col in columns { if !current[table].contains(&col) { conn.execute( &format!("ALTER TABLE {} ADD COLUMN {}", table, col), [], )?; } } } Ok(())}
No migration files, no downmigrations — just parse the schema and add missing columns.