Skip to main content

Overview

The StreamProcessor module provides functions to process the fullStream from an AI SDK streamText call. It forwards all stream events to WebSocket clients in real-time and collects the complete response data. Unlike the other core managers, this is not a class but a standalone function module. It has no persistent state and operates as a pure stream processing pipeline. Key responsibilities:
  • Process AI SDK fullStream events
  • Forward text deltas, reasoning, tool calls to WebSocket clients
  • Collect complete response data (text, reasoning, tool results)
  • Emit events on the agent for local handling
  • Support streaming speech integration via callbacks
  • Handle stream lifecycle (start, finish, error, abort)
Location: src/core/StreamProcessor.ts:40

Functions

processFullStream()

Process the fullStream from an AI SDK streamText call, forwarding events and collecting results.
processFullStream(
  result: ReturnType<typeof streamText>,
  callbacks: StreamProcessorCallbacks,
  extraResponseFields?: Record<string, unknown>
): Promise<StreamResult>
result
ReturnType<typeof streamText>
required
The result object from AI SDK’s streamText() call
callbacks
StreamProcessorCallbacks
required
Callbacks for handling stream events:
  • onTextDelta(text: string) - Called for each text chunk (for streaming speech)
  • onTextEnd() - Called when text stream ends (flush speech buffer)
  • sendMessage(msg) - Function to send WebSocket messages
  • emitEvent(event, data) - Function to emit events on the agent
extraResponseFields
Record<string, unknown>
Optional additional fields to include in the response_complete message
Returns: Promise resolving to StreamResult containing:
  • fullText - Complete response text
  • fullReasoning - Complete reasoning text (if any)
  • allToolCalls - Array of all tool calls made
  • allToolResults - Array of all tool results
  • allSources - Array of all sources (if any)
  • allFiles - Array of all files (if any)
Example:
import { streamText } from 'ai';
import { processFullStream } from './core/StreamProcessor';

const result = streamText({
  model: openai('gpt-4-turbo'),
  messages: conversationHistory,
  tools: { /* ... */ }
});

const streamResult = await processFullStream(result, {
  onTextDelta: (text) => {
    // Send to speech manager for streaming TTS
    speechManager.processTextDelta(text);
  },
  onTextEnd: () => {
    // Flush remaining text to speech
    speechManager.flushPendingText();
  },
  sendMessage: (msg) => {
    // Send to WebSocket client
    wsManager.send(msg);
  },
  emitEvent: (event, data) => {
    // Emit on agent
    agent.emit(event, data);
  }
});

console.log('Full response:', streamResult.fullText);

handleStreamChunk()

Handle individual stream chunks and emit them as events (for onChunk callback).
handleStreamChunk(
  chunk: any,
  emitEvent: (event: string, data?: unknown) => void
): void
chunk
any
required
A stream chunk from the AI SDK fullStream
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events
Example:
const result = streamText({
  model: openai('gpt-4-turbo'),
  messages,
  onChunk: (chunk) => {
    handleStreamChunk(chunk, (event, data) => {
      agent.emit(event, data);
    });
  }
});
Events emitted:
  • chunk:text_delta - Text chunk received
  • chunk:reasoning_delta - Reasoning chunk received
  • chunk:tool_call - Tool call chunk received
  • chunk:tool_result - Tool result chunk received
  • chunk:tool_input_start - Tool input started
  • chunk:tool_input_delta - Tool input chunk received
  • chunk:source - Source chunk received

Types

StreamResult

interface StreamResult {
  fullText: string;
  fullReasoning: string;
  allToolCalls: Array<{
    toolName: string;
    toolCallId: string;
    input: unknown;
  }>;
  allToolResults: Array<{
    toolName: string;
    toolCallId: string;
    output: unknown;
  }>;
  allSources: Array<unknown>;
  allFiles: Array<unknown>;
}
fullText
string
The complete text response accumulated from all text-delta events
fullReasoning
string
The complete reasoning text accumulated from all reasoning-delta events (empty string if no reasoning)
allToolCalls
Array<ToolCall>
All tool calls made during the stream, in order
allToolResults
Array<ToolResult>
All tool results returned during the stream, in order
allSources
Array<unknown>
All sources emitted during the stream (for RAG/citation)
allFiles
Array<unknown>
All files emitted during the stream

StreamProcessorCallbacks

interface StreamProcessorCallbacks {
  onTextDelta?: (text: string) => void;
  onTextEnd?: () => void;
  sendMessage: (message: Record<string, unknown>) => void;
  emitEvent: (event: string, data?: unknown) => void;
}
onTextDelta
(text: string) => void
Optional callback for each text chunk. Use this to feed text to SpeechManager for streaming TTS.
onTextEnd
() => void
Optional callback when text stream ends. Use this to flush the SpeechManager buffer.
sendMessage
(message: Record<string, unknown>) => void
required
Function to send messages to the WebSocket client
emitEvent
(event: string, data?: unknown) => void
required
Function to emit events on the agent EventEmitter

WebSocket Message Protocol

The StreamProcessor sends the following messages to clients:

Stream Lifecycle

Stream start:
{ "type": "stream_start" }
Stream finish:
{
  "type": "stream_finish",
  "finishReason": "stop",
  "usage": {
    "promptTokens": 150,
    "completionTokens": 75,
    "totalTokens": 225
  }
}
Stream error:
{
  "type": "stream_error",
  "error": "Model timeout"
}
Stream abort:
{
  "type": "stream_abort",
  "reason": "User interrupted"
}

Step Lifecycle

Step start:
{
  "type": "step_start",
  "warnings": []
}
Step finish:
{
  "type": "step_finish",
  "finishReason": "stop",
  "usage": { /* ... */ }
}

Text Streaming

Text start:
{
  "type": "text_start",
  "id": "text-0"
}
Text delta:
{
  "type": "text_delta",
  "id": "text-0",
  "text": "Hello, "
}
Text end:
{
  "type": "text_end",
  "id": "text-0"
}

Reasoning Streaming

Reasoning start:
{
  "type": "reasoning_start",
  "id": "reasoning-0"
}
Reasoning delta:
{
  "type": "reasoning_delta",
  "id": "reasoning-0",
  "text": "Let me think... "
}
Reasoning end:
{
  "type": "reasoning_end",
  "id": "reasoning-0"
}

Tool Execution

Tool call:
{
  "type": "tool_call",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "input": { "location": "San Francisco" }
}
Tool result:
{
  "type": "tool_result",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "result": { "temperature": 72, "conditions": "sunny" }
}
Tool error:
{
  "type": "tool_error",
  "toolName": "get_weather",
  "toolCallId": "call_abc123",
  "error": "API timeout"
}

Tool Input Streaming

Tool input start:
{
  "type": "tool_input_start",
  "id": "input-0",
  "toolName": "search"
}
Tool input delta:
{
  "type": "tool_input_delta",
  "id": "input-0",
  "delta": { "query": "weather" }
}
Tool input end:
{
  "type": "tool_input_end",
  "id": "input-0"
}

Sources and Files

Source:
{
  "type": "source",
  "source": { /* source object */ }
}
File:
{
  "type": "file",
  "file": { /* file object */ }
}

Response Complete

Complete response:
{
  "type": "response_complete",
  "text": "The weather in San Francisco is 72 degrees and sunny.",
  "reasoning": "",
  "toolCalls": [
    {
      "toolName": "get_weather",
      "toolCallId": "call_abc123",
      "input": { "location": "San Francisco" }
    }
  ],
  "toolResults": [
    {
      "toolName": "get_weather",
      "toolCallId": "call_abc123",
      "output": { "temperature": 72, "conditions": "sunny" }
    }
  ],
  "sources": [],
  "files": []
}

Usage in Agent Architecture

class VoiceAgent {
  async handleUserInput(text: string) {
    // Add user message to history
    this.conversationManager.addMessage({
      role: 'user',
      content: text
    });
    
    // Start LLM stream
    const result = streamText({
      model: this.model,
      messages: this.conversationManager.getHistory(),
      tools: this.tools
    });
    
    // Process the stream
    const streamResult = await processFullStream(result, {
      onTextDelta: (text) => {
        // Feed text to speech manager for streaming TTS
        this.speechManager.processTextDelta(text);
      },
      onTextEnd: () => {
        // Flush remaining text to speech
        this.speechManager.flushPendingText();
      },
      sendMessage: (msg) => {
        // Send to WebSocket client
        this.wsManager.send(msg);
      },
      emitEvent: (event, data) => {
        // Emit on agent
        this.emit(event, data);
      }
    });
    
    // Add assistant message to history
    this.conversationManager.addMessage({
      role: 'assistant',
      content: streamResult.fullText
    });
    
    // Wait for speech to complete
    await this.speechManager.queueDonePromise;
  }
}

Event Flow

Typical event sequence for a simple text response:
1. stream_start
2. step_start
3. text_start (id: "text-0")
4. text_delta (id: "text-0", text: "Hello")
5. text_delta (id: "text-0", text: ", how")
6. text_delta (id: "text-0", text: " can I")
7. text_delta (id: "text-0", text: " help you?")
8. text_end (id: "text-0")
9. step_finish
10. stream_finish
11. response_complete
With tool calls:
1. stream_start
2. step_start
3. tool_input_start (id: "input-0", toolName: "get_weather")
4. tool_input_delta (id: "input-0", delta: {"location": "SF"})
5. tool_input_end (id: "input-0")
6. tool_call (toolName: "get_weather", input: {"location": "SF"})
7. tool_result (toolName: "get_weather", result: {"temp": 72})
8. step_finish
9. step_start (new step for response after tool call)
10. text_start (id: "text-0")
11. text_delta (id: "text-0", text: "The weather...")
12. text_end (id: "text-0")
13. step_finish
14. stream_finish
15. response_complete

Integration with Speech Manager

The StreamProcessor enables streaming TTS by calling onTextDelta for each text chunk:
// In processFullStream
case "text-delta":
  fullText += part.text;
  onTextDelta?.(part.text); // Feed to SpeechManager
  sendMessage({
    type: "text_delta",
    id: part.id,
    text: part.text
  });
  break;

case "text-end":
  onTextEnd?.(); // Flush SpeechManager buffer
  sendMessage({ type: "text_end", id: part.id });
  break;
This allows speech generation to start before the full response is complete, significantly reducing perceived latency.

Error Handling

Stream Errors

When the AI SDK stream encounters an error:
case "error":
  emitEvent("error", part.error);
  sendMessage({
    type: "stream_error",
    error: String(part.error)
  });
  break;

Abort Handling

When the stream is aborted:
case "abort":
  emitEvent("abort", { reason: part.reason });
  sendMessage({
    type: "stream_abort",
    reason: part.reason
  });
  break;

Performance Considerations

Memory Usage

  • Text accumulation: Stores complete text/reasoning in memory
  • Tool data: Stores all tool calls and results
  • Stream processing: Minimal overhead, processes events one at a time

Optimization Tips

  1. Use onTextDelta wisely - Avoid heavy processing in this callback
  2. Batch WebSocket sends - The processor sends one message per event
  3. Monitor memory - For very long responses, consider streaming to storage
  4. Handle backpressure - WebSocket may not keep up with fast streams

Thread Safety

The StreamProcessor is stateless and thread-safe for the processing logic itself. However, the callbacks must be called from the same event loop as the agent and managers.

Build docs developers (and LLMs) love