Overview
The WorkerPool manages ephemeral Modal sandboxes for task execution. Unlike traditional worker pools, each task gets its own short-lived sandbox that is created, executed, and terminated automatically.
Location: packages/orchestrator/src/worker-pool.ts
Architecture
Ephemeral Model:
Task → spawn sandbox → clone repo → exec worker → push branch → terminate
No persistent workers exist. start() and stop() are no-ops.
Class: WorkerPool
Constructor
new WorkerPool(
config: {
maxWorkers: number
workerTimeout: number
llm: HarnessConfig['llm']
git: HarnessConfig['git']
pythonPath: string
gitToken?: string
},
workerPrompt: string,
deps?: Partial<WorkerPoolDeps>
)
Maximum concurrent sandboxes (default: 50)
Sandbox timeout in seconds (default: 1800 = 30 min)
LLM endpoint configuration
Git repository URL and branch settings
Path to Python interpreter
Worker agent system prompt loaded from prompts/worker.md
Core Methods
assignTask()
Spawns an ephemeral sandbox and executes a task.
async assignTask(task: Task, parentSpan?: Span): Promise<Handoff>
Task to execute with description, scope, and acceptance criteria
Parent tracing span for distributed tracing
Returns: Handoff object with task results
Execution Flow:
- Creates worker entry in
activeWorkers map
- Prepares JSON payload with task, prompts, and LLM config
- Spawns Python subprocess:
python3 -u infra/spawn_sandbox.py <payload>
- Streams stdout line-by-line for real-time progress
- Parses final JSON line as Handoff
- Cleans up worker entry
- Times out after
workerTimeout seconds
start()
No-op in ephemeral mode.
async start(): Promise<void>
Logs: "Worker pool ready (ephemeral mode)"
stop()
No-op in ephemeral mode.
async stop(): Promise<void>
Logs active worker count.
Sandbox Payload
The payload sent to spawn_sandbox.py:
interface SandboxPayload {
task: Task
systemPrompt: string
repoUrl: string
gitToken: string
llmConfig: {
endpoint: string // Normalized to end with /v1
model: string
maxTokens: number
temperature: number
apiKey: string
}
trace?: PropagationContext // Distributed tracing context
}
Endpoint Normalization:
const baseUrl = endpoint.endpoint.replace(/\/+$/, '')
const llmEndpointUrl = baseUrl.endsWith('/v1')
? baseUrl
: `${baseUrl}/v1`
Streaming Output
Real-time Progress Forwarding
The worker pool streams sandbox output as structured logs:
private forwardWorkerLine(taskId: string, line: string): void
Output Patterns:
Spawn Events ([spawn] ...):
[spawn] Creating Modal sandbox...
[spawn] Repo cloned successfully
[spawn] Starting worker agent
Worker Events ([worker:*] ...):
[worker:task-001] Tool calls: 15
[worker:task-001] Files modified: 3
[worker:task-001] Pushing branch worker/task-001-...
Tool Call Tracking:
const toolCallMatch = detail.match(/Tool calls:\s*(\d+)/)
if (toolCallMatch) {
this.activeToolCalls.set(taskId, parseInt(toolCallMatch[1], 10))
}
Timeout Handling
const timer = setTimeout(() => {
proc.kill('SIGKILL')
this.timedOutBranches.push(branchName)
reject(new Error(`Sandbox timed out after ${timeout}s`))
}, timeout * 1000)
Timed-out branches are preserved for finalization retry:
const timedOut = workerPool.drainTimedOutBranches()
for (const branch of timedOut) {
// Branches preserved on remote for manual recovery
}
Handoff Validation
function isHandoff(value: unknown): value is Handoff
Required Fields:
taskId: string
status: string
summary: string
filesChanged: string[]
metrics: object
If validation fails, sandbox output is rejected with error.
Worker Interface
interface Worker {
id: string // Format: "ephemeral-{taskId}"
currentTask: Task
startedAt: number // Timestamp in ms
}
Metrics Methods
getAvailableWorkers()
getAvailableWorkers(): { id: string }[]
Returns virtual slots based on maxWorkers - activeWorkers.size.
getAllWorkers()
getAllWorkers(): Worker[]
Returns currently active workers.
getWorkerCount()
getActiveTaskCount()
getActiveTaskCount(): number
getTotalActiveToolCalls(): number
Sums tool calls across all active workers.
getTimedOutBranches()
getTimedOutBranches(): string[]
drainTimedOutBranches()
drainTimedOutBranches(): string[]
Returns and clears timed-out branches list.
Callbacks
onTaskComplete()
onTaskComplete(callback: (handoff: Handoff) => void): void
onWorkerFailed()
onWorkerFailed(callback: (taskId: string, error: Error) => void): void
Usage Example
import { WorkerPool } from '@longshot/orchestrator'
const workerPool = new WorkerPool(
{
maxWorkers: 50,
workerTimeout: 1800,
llm: config.llm,
git: config.git,
pythonPath: 'python3',
gitToken: process.env.GIT_TOKEN
},
workerPrompt
)
await workerPool.start() // No-op, but maintains interface
// Assign task to ephemeral sandbox
const handoff = await workerPool.assignTask({
id: 'task-001',
description: 'Implement user authentication',
scope: ['src/auth/login.ts', 'src/auth/session.ts'],
acceptance: 'Users can log in and receive JWT tokens',
branch: 'worker/task-001-auth',
status: 'pending',
createdAt: Date.now(),
priority: 5
})
console.log(`Task ${handoff.taskId} ${handoff.status}`)
console.log(`Files changed: ${handoff.filesChanged.join(', ')}`)
console.log(`Tokens used: ${handoff.metrics.tokensUsed}`)
await workerPool.stop() // No-op
Dependency Injection
interface WorkerPoolDeps {
spawn: typeof spawn
createInterface: typeof createInterface
setTimeout: typeof setTimeout
clearTimeout: typeof clearTimeout
now: () => number
}
Allows deterministic testing by injecting fake subprocess and timer behavior.
Error Handling
Subprocess Errors:
proc.on('error', (err) => {
reject(err)
})
No Output:
if (stdoutLines.length === 0) {
reject(new Error('Sandbox produced no output'))
}
Invalid JSON:
try {
const parsed = JSON.parse(lastLine)
if (!isHandoff(parsed)) {
throw new Error('Invalid Handoff shape')
}
} catch {
reject(new Error('Failed to parse Handoff JSON'))
}
Tracing Integration
const workerSpan = this.tracer?.startSpan('worker.execute', {
taskId: task.id,
agentId: 'worker-pool'
})
// Events logged during execution:
workerSpan.event('sandbox.created')
workerSpan.event('sandbox.cloned')
workerSpan.event('sandbox.workerStarted')
workerSpan.event('sandbox.pushed')
// Final attributes:
workerSpan.setAttributes({
status: handoff.status,
filesChanged: handoff.filesChanged.length,
tokensUsed: handoff.metrics.tokensUsed,
toolCallCount: handoff.metrics.toolCallCount,
durationMs: handoff.metrics.durationMs
})