Skip to main content

Overview

The WorkerPool manages ephemeral Modal sandboxes for task execution. Unlike traditional worker pools, each task gets its own short-lived sandbox that is created, executed, and terminated automatically. Location: packages/orchestrator/src/worker-pool.ts

Architecture

Ephemeral Model:
Task → spawn sandbox → clone repo → exec worker → push branch → terminate
No persistent workers exist. start() and stop() are no-ops.

Class: WorkerPool

Constructor

new WorkerPool(
  config: {
    maxWorkers: number
    workerTimeout: number
    llm: HarnessConfig['llm']
    git: HarnessConfig['git']
    pythonPath: string
    gitToken?: string
  },
  workerPrompt: string,
  deps?: Partial<WorkerPoolDeps>
)
config.maxWorkers
number
required
Maximum concurrent sandboxes (default: 50)
config.workerTimeout
number
required
Sandbox timeout in seconds (default: 1800 = 30 min)
config.llm
LLMConfig
required
LLM endpoint configuration
config.git
GitConfig
required
Git repository URL and branch settings
config.pythonPath
string
default:"python3"
Path to Python interpreter
workerPrompt
string
required
Worker agent system prompt loaded from prompts/worker.md

Core Methods

assignTask()

Spawns an ephemeral sandbox and executes a task.
async assignTask(task: Task, parentSpan?: Span): Promise<Handoff>
task
Task
required
Task to execute with description, scope, and acceptance criteria
parentSpan
Span
Parent tracing span for distributed tracing
Returns: Handoff object with task results Execution Flow:
  1. Creates worker entry in activeWorkers map
  2. Prepares JSON payload with task, prompts, and LLM config
  3. Spawns Python subprocess: python3 -u infra/spawn_sandbox.py <payload>
  4. Streams stdout line-by-line for real-time progress
  5. Parses final JSON line as Handoff
  6. Cleans up worker entry
  7. Times out after workerTimeout seconds

start()

No-op in ephemeral mode.
async start(): Promise<void>
Logs: "Worker pool ready (ephemeral mode)"

stop()

No-op in ephemeral mode.
async stop(): Promise<void>
Logs active worker count.

Sandbox Payload

The payload sent to spawn_sandbox.py:
interface SandboxPayload {
  task: Task
  systemPrompt: string
  repoUrl: string
  gitToken: string
  llmConfig: {
    endpoint: string      // Normalized to end with /v1
    model: string
    maxTokens: number
    temperature: number
    apiKey: string
  }
  trace?: PropagationContext  // Distributed tracing context
}
Endpoint Normalization:
const baseUrl = endpoint.endpoint.replace(/\/+$/, '')
const llmEndpointUrl = baseUrl.endsWith('/v1') 
  ? baseUrl 
  : `${baseUrl}/v1`

Streaming Output

Real-time Progress Forwarding

The worker pool streams sandbox output as structured logs:
private forwardWorkerLine(taskId: string, line: string): void
Output Patterns: Spawn Events ([spawn] ...):
[spawn] Creating Modal sandbox...
[spawn] Repo cloned successfully
[spawn] Starting worker agent
Worker Events ([worker:*] ...):
[worker:task-001] Tool calls: 15
[worker:task-001] Files modified: 3
[worker:task-001] Pushing branch worker/task-001-...
Tool Call Tracking:
const toolCallMatch = detail.match(/Tool calls:\s*(\d+)/)
if (toolCallMatch) {
  this.activeToolCalls.set(taskId, parseInt(toolCallMatch[1], 10))
}

Timeout Handling

const timer = setTimeout(() => {
  proc.kill('SIGKILL')
  this.timedOutBranches.push(branchName)
  reject(new Error(`Sandbox timed out after ${timeout}s`))
}, timeout * 1000)
Timed-out branches are preserved for finalization retry:
const timedOut = workerPool.drainTimedOutBranches()
for (const branch of timedOut) {
  // Branches preserved on remote for manual recovery
}

Handoff Validation

function isHandoff(value: unknown): value is Handoff
Required Fields:
  • taskId: string
  • status: string
  • summary: string
  • filesChanged: string[]
  • metrics: object
If validation fails, sandbox output is rejected with error.

Worker Interface

interface Worker {
  id: string            // Format: "ephemeral-{taskId}"
  currentTask: Task
  startedAt: number     // Timestamp in ms
}

Metrics Methods

getAvailableWorkers()

getAvailableWorkers(): { id: string }[]
Returns virtual slots based on maxWorkers - activeWorkers.size.

getAllWorkers()

getAllWorkers(): Worker[]
Returns currently active workers.

getWorkerCount()

getWorkerCount(): number

getActiveTaskCount()

getActiveTaskCount(): number

getTotalActiveToolCalls()

getTotalActiveToolCalls(): number
Sums tool calls across all active workers.

getTimedOutBranches()

getTimedOutBranches(): string[]

drainTimedOutBranches()

drainTimedOutBranches(): string[]
Returns and clears timed-out branches list.

Callbacks

onTaskComplete()

onTaskComplete(callback: (handoff: Handoff) => void): void

onWorkerFailed()

onWorkerFailed(callback: (taskId: string, error: Error) => void): void

Usage Example

import { WorkerPool } from '@longshot/orchestrator'

const workerPool = new WorkerPool(
  {
    maxWorkers: 50,
    workerTimeout: 1800,
    llm: config.llm,
    git: config.git,
    pythonPath: 'python3',
    gitToken: process.env.GIT_TOKEN
  },
  workerPrompt
)

await workerPool.start()  // No-op, but maintains interface

// Assign task to ephemeral sandbox
const handoff = await workerPool.assignTask({
  id: 'task-001',
  description: 'Implement user authentication',
  scope: ['src/auth/login.ts', 'src/auth/session.ts'],
  acceptance: 'Users can log in and receive JWT tokens',
  branch: 'worker/task-001-auth',
  status: 'pending',
  createdAt: Date.now(),
  priority: 5
})

console.log(`Task ${handoff.taskId} ${handoff.status}`)
console.log(`Files changed: ${handoff.filesChanged.join(', ')}`)
console.log(`Tokens used: ${handoff.metrics.tokensUsed}`)

await workerPool.stop()  // No-op

Dependency Injection

interface WorkerPoolDeps {
  spawn: typeof spawn
  createInterface: typeof createInterface
  setTimeout: typeof setTimeout
  clearTimeout: typeof clearTimeout
  now: () => number
}
Allows deterministic testing by injecting fake subprocess and timer behavior.

Error Handling

Subprocess Errors:
proc.on('error', (err) => {
  reject(err)
})
No Output:
if (stdoutLines.length === 0) {
  reject(new Error('Sandbox produced no output'))
}
Invalid JSON:
try {
  const parsed = JSON.parse(lastLine)
  if (!isHandoff(parsed)) {
    throw new Error('Invalid Handoff shape')
  }
} catch {
  reject(new Error('Failed to parse Handoff JSON'))
}

Tracing Integration

const workerSpan = this.tracer?.startSpan('worker.execute', {
  taskId: task.id,
  agentId: 'worker-pool'
})

// Events logged during execution:
workerSpan.event('sandbox.created')
workerSpan.event('sandbox.cloned')
workerSpan.event('sandbox.workerStarted')
workerSpan.event('sandbox.pushed')

// Final attributes:
workerSpan.setAttributes({
  status: handoff.status,
  filesChanged: handoff.filesChanged.length,
  tokensUsed: handoff.metrics.tokensUsed,
  toolCallCount: handoff.metrics.toolCallCount,
  durationMs: handoff.metrics.durationMs
})

Build docs developers (and LLMs) love