Overview
The AgentMultiplexer class enables concurrent processing of multiple async iterables. It races all registered iterators and yields events from whichever is ready first, with support for pausing and resuming individual agents without blocking others.
Key Features
- Multiplex multiple async iterables into a single stream
- Race all iterators and yield from the first ready
- Pause and resume individual agents independently
- Wake mechanism for adding new agents dynamically
- Automatic cleanup when agents complete
Constructor
const multiplexer = new AgentMultiplexer<EventType>();
The type parameter T specifies the event type that agents will yield.
Methods
register()
Register an agent with its async iterable stream.
multiplexer.register("agent1", asyncIterable);
Unique identifier for this agent
The async iterable stream that produces events
The agent immediately starts being polled and its events will be yielded through events().
unregister()
Unregister an agent and clean up resources.
multiplexer.unregister("agent1");
The ID of the agent to unregister
Calls iterator.return() to allow the iterator to clean up properly.
pause()
Pause an agent without blocking other agents.
multiplexer.pause("agent1");
The ID of the agent to pause
The agent’s pending promise is kept but not included in racing. Other agents continue producing events normally.
resume()
Resume a paused agent.
multiplexer.resume("agent1");
The ID of the agent to resume
If the agent has a pending value, it will be yielded. Otherwise, starts polling for the next value. Wakes up the consumer to process the resumed agent.
events()
Async iterator that yields events from all registered agents.
for await (const { agentId, event } of multiplexer.events()) {
console.log(`${agentId}: ${event}`);
}
The agent that produced this event
The event data from the agent’s stream
Uses Promise.race() to yield from whichever agent is ready first. When all agents are done or paused with no pending values, the iterator waits for a wake signal.
Types
MultiplexedEvent
The event structure yielded by the multiplexer.
type MultiplexedEvent<T> = {
agentId: string;
event: T;
};
AgentState (Internal)
Internal state tracking for each agent.
type AgentState<T> = {
iterator: AsyncIterator<T>;
paused: boolean;
pending: Promise<{ agentId: string; result: IteratorResult<T> }> | null;
};
Examples
Basic Multiplexing
import { AgentMultiplexer } from "@llm-gateway/ai";
const multiplexer = new AgentMultiplexer<string>();
// Create some async generators
async function* agent1() {
yield "Hello from agent 1";
await delay(100);
yield "Agent 1 done";
}
async function* agent2() {
await delay(50);
yield "Hello from agent 2";
yield "Agent 2 done";
}
// Register agents
multiplexer.register("agent1", agent1());
multiplexer.register("agent2", agent2());
// Process events as they arrive
for await (const { agentId, event } of multiplexer.events()) {
console.log(`[${agentId}] ${event}`);
}
// Output:
// [agent2] Hello from agent 2
// [agent1] Hello from agent 1
// [agent2] Agent 2 done
// [agent1] Agent 1 done
Pause and Resume
const multiplexer = new AgentMultiplexer<{ type: string; data: any }>();
multiplexer.register("agent1", stream1);
multiplexer.register("agent2", stream2);
for await (const { agentId, event } of multiplexer.events()) {
if (event.type === "needs_approval") {
// Pause this agent while awaiting approval
multiplexer.pause(agentId);
const approved = await getUserApproval(event.data);
if (approved) {
// Resume the agent
multiplexer.resume(agentId);
} else {
// Don't resume - agent stays paused
multiplexer.unregister(agentId);
}
}
}
Dynamic Registration
const multiplexer = new AgentMultiplexer<Event>();
// Start with one agent
multiplexer.register("initial", initialStream);
for await (const { agentId, event } of multiplexer.events()) {
if (event.type === "spawn_subagent") {
// Register new agent dynamically
const subId = generateId();
multiplexer.register(subId, createSubagentStream(event.task));
}
if (event.type === "complete") {
multiplexer.unregister(agentId);
}
}
Concurrent Task Processing
import { AgentMultiplexer } from "@llm-gateway/ai";
async function* processTask(taskId: string, task: Task) {
yield { type: "start", taskId };
const result = await performWork(task);
yield { type: "progress", taskId, progress: 50 };
const final = await finalizeWork(result);
yield { type: "complete", taskId, result: final };
}
const multiplexer = new AgentMultiplexer<TaskEvent>();
const results = new Map<string, any>();
// Start multiple tasks concurrently
for (const task of tasks) {
multiplexer.register(task.id, processTask(task.id, task));
}
// Process events as they complete
for await (const { agentId, event } of multiplexer.events()) {
switch (event.type) {
case "start":
console.log(`Task ${agentId} started`);
break;
case "progress":
console.log(`Task ${agentId}: ${event.progress}%`);
break;
case "complete":
results.set(agentId, event.result);
console.log(`Task ${agentId} completed`);
break;
}
}
console.log("All tasks completed:", results);
Implementation Details
Racing Algorithm
The multiplexer uses Promise.race() to compete all non-paused pending promises:
- Collect all pending promises from non-paused agents
- Race them along with a wakeup promise
- Yield the winning event
- Pull the next value from that agent
- Repeat
Wake Mechanism
When all agents are paused or no pending promises exist, the multiplexer waits for a wake signal. Wake signals are triggered by:
register() - New agent added
resume() - Agent resumed
This allows dynamic agent addition without blocking the event loop.
Completion
The multiplexer completes when:
- All agents have finished (yielded done)
- All agents are unregistered
Paused agents keep the multiplexer alive - they may be resumed later.