Overview
The StreamProcessor module provides functions to process the fullStream from an AI SDK streamText call. It forwards all stream events to WebSocket clients in real-time and collects the complete response data.
Unlike the other core managers, this is not a class but a standalone function module. It has no persistent state and operates as a pure stream processing pipeline.
Key responsibilities:
- Process AI SDK fullStream events
- Forward text deltas, reasoning, tool calls to WebSocket clients
- Collect complete response data (text, reasoning, tool results)
- Emit events on the agent for local handling
- Support streaming speech integration via callbacks
- Handle stream lifecycle (start, finish, error, abort)
Location: src/core/StreamProcessor.ts:40
Functions
processFullStream()
Process the fullStream from an AI SDK streamText call, forwarding events and collecting results.
processFullStream(
result: ReturnType<typeof streamText>,
callbacks: StreamProcessorCallbacks,
extraResponseFields?: Record<string, unknown>
): Promise<StreamResult>
result
ReturnType<typeof streamText>
required
The result object from AI SDK’s streamText() call
callbacks
StreamProcessorCallbacks
required
Callbacks for handling stream events:
onTextDelta(text: string) - Called for each text chunk (for streaming speech)
onTextEnd() - Called when text stream ends (flush speech buffer)
sendMessage(msg) - Function to send WebSocket messages
emitEvent(event, data) - Function to emit events on the agent
Optional additional fields to include in the response_complete message
Returns: Promise resolving to StreamResult containing:
fullText - Complete response text
fullReasoning - Complete reasoning text (if any)
allToolCalls - Array of all tool calls made
allToolResults - Array of all tool results
allSources - Array of all sources (if any)
allFiles - Array of all files (if any)
Example:
import { streamText } from 'ai';
import { processFullStream } from './core/StreamProcessor';
const result = streamText({
model: openai('gpt-4-turbo'),
messages: conversationHistory,
tools: { /* ... */ }
});
const streamResult = await processFullStream(result, {
onTextDelta: (text) => {
// Send to speech manager for streaming TTS
speechManager.processTextDelta(text);
},
onTextEnd: () => {
// Flush remaining text to speech
speechManager.flushPendingText();
},
sendMessage: (msg) => {
// Send to WebSocket client
wsManager.send(msg);
},
emitEvent: (event, data) => {
// Emit on agent
agent.emit(event, data);
}
});
console.log('Full response:', streamResult.fullText);
handleStreamChunk()
Handle individual stream chunks and emit them as events (for onChunk callback).
handleStreamChunk(
chunk: any,
emitEvent: (event: string, data?: unknown) => void
): void
A stream chunk from the AI SDK fullStream
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events
Example:
const result = streamText({
model: openai('gpt-4-turbo'),
messages,
onChunk: (chunk) => {
handleStreamChunk(chunk, (event, data) => {
agent.emit(event, data);
});
}
});
Events emitted:
chunk:text_delta - Text chunk received
chunk:reasoning_delta - Reasoning chunk received
chunk:tool_call - Tool call chunk received
chunk:tool_result - Tool result chunk received
chunk:tool_input_start - Tool input started
chunk:tool_input_delta - Tool input chunk received
chunk:source - Source chunk received
Types
StreamResult
interface StreamResult {
fullText: string;
fullReasoning: string;
allToolCalls: Array<{
toolName: string;
toolCallId: string;
input: unknown;
}>;
allToolResults: Array<{
toolName: string;
toolCallId: string;
output: unknown;
}>;
allSources: Array<unknown>;
allFiles: Array<unknown>;
}
The complete text response accumulated from all text-delta events
The complete reasoning text accumulated from all reasoning-delta events (empty string if no reasoning)
All tool calls made during the stream, in order
All tool results returned during the stream, in order
All sources emitted during the stream (for RAG/citation)
All files emitted during the stream
StreamProcessorCallbacks
interface StreamProcessorCallbacks {
onTextDelta?: (text: string) => void;
onTextEnd?: () => void;
sendMessage: (message: Record<string, unknown>) => void;
emitEvent: (event: string, data?: unknown) => void;
}
Optional callback for each text chunk. Use this to feed text to SpeechManager for streaming TTS.
Optional callback when text stream ends. Use this to flush the SpeechManager buffer.
sendMessage
(message: Record<string, unknown>) => void
required
Function to send messages to the WebSocket client
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events on the agent EventEmitter
WebSocket Message Protocol
The StreamProcessor sends the following messages to clients:
Stream Lifecycle
Stream start:
{ "type": "stream_start" }
Stream finish:
{
"type": "stream_finish",
"finishReason": "stop",
"usage": {
"promptTokens": 150,
"completionTokens": 75,
"totalTokens": 225
}
}
Stream error:
{
"type": "stream_error",
"error": "Model timeout"
}
Stream abort:
{
"type": "stream_abort",
"reason": "User interrupted"
}
Step Lifecycle
Step start:
{
"type": "step_start",
"warnings": []
}
Step finish:
{
"type": "step_finish",
"finishReason": "stop",
"usage": { /* ... */ }
}
Text Streaming
Text start:
{
"type": "text_start",
"id": "text-0"
}
Text delta:
{
"type": "text_delta",
"id": "text-0",
"text": "Hello, "
}
Text end:
{
"type": "text_end",
"id": "text-0"
}
Reasoning Streaming
Reasoning start:
{
"type": "reasoning_start",
"id": "reasoning-0"
}
Reasoning delta:
{
"type": "reasoning_delta",
"id": "reasoning-0",
"text": "Let me think... "
}
Reasoning end:
{
"type": "reasoning_end",
"id": "reasoning-0"
}
Tool call:
{
"type": "tool_call",
"toolName": "get_weather",
"toolCallId": "call_abc123",
"input": { "location": "San Francisco" }
}
Tool result:
{
"type": "tool_result",
"toolName": "get_weather",
"toolCallId": "call_abc123",
"result": { "temperature": 72, "conditions": "sunny" }
}
Tool error:
{
"type": "tool_error",
"toolName": "get_weather",
"toolCallId": "call_abc123",
"error": "API timeout"
}
Tool input start:
{
"type": "tool_input_start",
"id": "input-0",
"toolName": "search"
}
Tool input delta:
{
"type": "tool_input_delta",
"id": "input-0",
"delta": { "query": "weather" }
}
Tool input end:
{
"type": "tool_input_end",
"id": "input-0"
}
Sources and Files
Source:
{
"type": "source",
"source": { /* source object */ }
}
File:
{
"type": "file",
"file": { /* file object */ }
}
Response Complete
Complete response:
{
"type": "response_complete",
"text": "The weather in San Francisco is 72 degrees and sunny.",
"reasoning": "",
"toolCalls": [
{
"toolName": "get_weather",
"toolCallId": "call_abc123",
"input": { "location": "San Francisco" }
}
],
"toolResults": [
{
"toolName": "get_weather",
"toolCallId": "call_abc123",
"output": { "temperature": 72, "conditions": "sunny" }
}
],
"sources": [],
"files": []
}
Usage in Agent Architecture
class VoiceAgent {
async handleUserInput(text: string) {
// Add user message to history
this.conversationManager.addMessage({
role: 'user',
content: text
});
// Start LLM stream
const result = streamText({
model: this.model,
messages: this.conversationManager.getHistory(),
tools: this.tools
});
// Process the stream
const streamResult = await processFullStream(result, {
onTextDelta: (text) => {
// Feed text to speech manager for streaming TTS
this.speechManager.processTextDelta(text);
},
onTextEnd: () => {
// Flush remaining text to speech
this.speechManager.flushPendingText();
},
sendMessage: (msg) => {
// Send to WebSocket client
this.wsManager.send(msg);
},
emitEvent: (event, data) => {
// Emit on agent
this.emit(event, data);
}
});
// Add assistant message to history
this.conversationManager.addMessage({
role: 'assistant',
content: streamResult.fullText
});
// Wait for speech to complete
await this.speechManager.queueDonePromise;
}
}
Event Flow
Typical event sequence for a simple text response:
1. stream_start
2. step_start
3. text_start (id: "text-0")
4. text_delta (id: "text-0", text: "Hello")
5. text_delta (id: "text-0", text: ", how")
6. text_delta (id: "text-0", text: " can I")
7. text_delta (id: "text-0", text: " help you?")
8. text_end (id: "text-0")
9. step_finish
10. stream_finish
11. response_complete
With tool calls:
1. stream_start
2. step_start
3. tool_input_start (id: "input-0", toolName: "get_weather")
4. tool_input_delta (id: "input-0", delta: {"location": "SF"})
5. tool_input_end (id: "input-0")
6. tool_call (toolName: "get_weather", input: {"location": "SF"})
7. tool_result (toolName: "get_weather", result: {"temp": 72})
8. step_finish
9. step_start (new step for response after tool call)
10. text_start (id: "text-0")
11. text_delta (id: "text-0", text: "The weather...")
12. text_end (id: "text-0")
13. step_finish
14. stream_finish
15. response_complete
Integration with Speech Manager
The StreamProcessor enables streaming TTS by calling onTextDelta for each text chunk:
// In processFullStream
case "text-delta":
fullText += part.text;
onTextDelta?.(part.text); // Feed to SpeechManager
sendMessage({
type: "text_delta",
id: part.id,
text: part.text
});
break;
case "text-end":
onTextEnd?.(); // Flush SpeechManager buffer
sendMessage({ type: "text_end", id: part.id });
break;
This allows speech generation to start before the full response is complete, significantly reducing perceived latency.
Error Handling
Stream Errors
When the AI SDK stream encounters an error:
case "error":
emitEvent("error", part.error);
sendMessage({
type: "stream_error",
error: String(part.error)
});
break;
Abort Handling
When the stream is aborted:
case "abort":
emitEvent("abort", { reason: part.reason });
sendMessage({
type: "stream_abort",
reason: part.reason
});
break;
Memory Usage
- Text accumulation: Stores complete text/reasoning in memory
- Tool data: Stores all tool calls and results
- Stream processing: Minimal overhead, processes events one at a time
Optimization Tips
- Use onTextDelta wisely - Avoid heavy processing in this callback
- Batch WebSocket sends - The processor sends one message per event
- Monitor memory - For very long responses, consider streaming to storage
- Handle backpressure - WebSocket may not keep up with fast streams
Thread Safety
The StreamProcessor is stateless and thread-safe for the processing logic itself. However, the callbacks must be called from the same event loop as the agent and managers.