Skip to main content
Streaming responses provide real-time feedback as the AI generates text, creating a more interactive experience. ElizaOS includes a sophisticated streaming system with multiple extractors for different use cases.

Streaming Architecture

ElizaOS streaming uses extractors to filter and process chunks as they arrive:
interface IStreamExtractor {
  done: boolean;                    // Whether streaming is complete
  push(chunk: string): string;      // Process chunk, return streamable content
  flush?(): string;                 // Optional final flush
  reset?(): void;                   // Reset state for retry
}

Built-in Extractors

ElizaOS provides several extractors for different scenarios:

PassthroughExtractor

Streams all content as-is:
import { PassthroughExtractor } from '@elizaos/core';

const extractor = new PassthroughExtractor();

// Everything passes through
extractor.push('Hello'); // Returns: 'Hello'
extractor.push(' world'); // Returns: ' world'

XmlTagExtractor

Extracts content from a specific XML tag:
import { XmlTagExtractor } from '@elizaos/core';

const extractor = new XmlTagExtractor('text');

extractor.push('<response><text>Hello'); // Returns: 'Hel' (keeps margin)
extractor.push(' world!</text></response>'); // Returns: 'lo world!'

console.log(extractor.done); // true

ResponseStreamExtractor

Action-aware extraction used by DefaultMessageService:
Reference: packages/typescript/src/utils/streaming.ts
import { ResponseStreamExtractor } from '@elizaos/core';

// Understands <actions> and <text> tags
const extractor = new ResponseStreamExtractor();

// If <actions> contains REPLY, streams <text>
extractor.push('<actions>REPLY</actions>');
extractor.push('<text>Hello user!</text>'); // Streams: 'Hello user!'

// If <actions> contains other actions, skips <text>
extractor.push('<actions>SEND_EMAIL</actions>');
extractor.push('<text>Sending...</text>'); // Streams nothing (action will respond)

ActionStreamFilter

Auto-detects content type for action handlers:
import { ActionStreamFilter } from '@elizaos/core';

const filter = new ActionStreamFilter();

// JSON - doesn't stream
filter.push('{"result": true}'); // Returns: ''

// XML - extracts <text> tag
filter.push('<response><text>Hello'); // Returns: 'Hel'
filter.push('!</text></response>'); // Returns: 'lo!'

// Plain text - streams immediately  
filter.push('Plain text response'); // Returns: 'Plain text response'

Creating a Streaming Context

1

Choose an Extractor

Select the appropriate extractor for your use case:
import { XmlTagExtractor } from '@elizaos/core';

// For simple XML responses
const extractor = new XmlTagExtractor('text');
2

Create Streaming Context

import { createStreamingContext } from '@elizaos/core';

const onChunk = async (chunk: string, messageId?: string) => {
  // Send chunk to user (WebSocket, SSE, etc.)
  console.log('Chunk:', chunk);
  ws.send(JSON.stringify({ type: 'chunk', data: chunk }));
};

const context = createStreamingContext(
  extractor,
  onChunk,
  messageId
);
3

Use with Model

const response = await runtime.useModel(
  ModelType.TEXT_LARGE,
  {
    prompt: 'Tell me about AI',
    streamingContext: context  // Enable streaming
  }
);

Streaming in Actions

Actions can stream responses to users:
import { ActionStreamFilter, createStreamingContext } from '@elizaos/core';

const streamingAction: Action = {
  name: 'STREAM_RESPONSE',
  description: 'Generate a streaming response',
  
  handler: async (runtime, message, state, options, callback) => {
    const filter = new ActionStreamFilter();
    
    // Create streaming context
    const streamingContext = createStreamingContext(
      filter,
      async (chunk: string) => {
        // Stream to user via callback
        await callback({ 
          text: chunk, 
          streaming: true 
        });
      }
    );
    
    // Generate with streaming
    const response = await runtime.useModel(
      ModelType.TEXT_LARGE,
      {
        prompt: `Generate a detailed response about: ${message.content.text}`,
        streamingContext
      }
    );
    
    // Send final response
    await callback({
      text: response,
      actions: ['STREAM_RESPONSE'],
      streaming: false  // Mark as complete
    });
    
    return { success: true };
  }
};

WebSocket Streaming Example

Complete example with WebSocket:
import { WebSocket, WebSocketServer } from 'ws';
import { XmlTagExtractor, createStreamingContext } from '@elizaos/core';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws: WebSocket) => {
  ws.on('message', async (data: string) => {
    const message = JSON.parse(data);
    
    // Create extractor for this request
    const extractor = new XmlTagExtractor('text');
    
    // Stream chunks to client
    const context = createStreamingContext(
      extractor,
      async (chunk: string) => {
        ws.send(JSON.stringify({
          type: 'chunk',
          content: chunk
        }));
      }
    );
    
    // Generate with streaming
    const response = await runtime.useModel(
      ModelType.TEXT_LARGE,
      {
        prompt: message.prompt,
        streamingContext: context
      }
    );
    
    // Send completion
    ws.send(JSON.stringify({
      type: 'complete',
      fullResponse: response
    }));
  });
});

Server-Sent Events (SSE)

Stream using SSE for HTTP-based streaming:
import express from 'express';
import { PassthroughExtractor, createStreamingContext } from '@elizaos/core';

const app = express();

app.get('/stream', async (req, res) => {
  // Set up SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const extractor = new PassthroughExtractor();
  
  const context = createStreamingContext(
    extractor,
    async (chunk: string) => {
      // Send SSE event
      res.write(`data: ${JSON.stringify({ chunk })}\n\n`);
    }
  );
  
  try {
    const response = await runtime.useModel(
      ModelType.TEXT_LARGE,
      {
        prompt: req.query.prompt as string,
        streamingContext: context
      }
    );
    
    // Send completion event
    res.write(`data: ${JSON.stringify({ done: true })}\n\n`);
    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    res.end();
  }
});

app.listen(3000);

Validation-Aware Streaming

For advanced use cases, use ValidationStreamExtractor with retry support:
import { ValidationStreamExtractor } from '@elizaos/core';

const extractor = new ValidationStreamExtractor({
  level: 1,  // Progressive validation
  schema: [
    { field: 'summary', type: 'string' },
    { field: 'details', type: 'string' }
  ],
  streamFields: ['summary', 'details'],
  expectedCodes: new Map(),
  onChunk: (chunk, field) => {
    console.log(`Field ${field}: ${chunk}`);
  },
  onEvent: (event) => {
    if (event.eventType === 'field_validated') {
      console.log(`✓ ${event.field} validated`);
    }
  }
});

Error Handling

import { StreamError } from '@elizaos/core';

try {
  const context = createStreamingContext(
    extractor,
    async (chunk) => {
      await sendChunk(chunk);
    }
  );
  
  await runtime.useModel(ModelType.TEXT_LARGE, {
    prompt,
    streamingContext: context
  });
} catch (error) {
  if (StreamError.isStreamError(error)) {
    console.error(`Stream error: ${error.code} - ${error.message}`);
    console.error('Details:', error.details);
  } else {
    console.error('Unexpected error:', error);
  }
}

Streaming Retry State

Handle retries with accumulated text:
import { createStreamingRetryState } from '@elizaos/core';

const extractor = new XmlTagExtractor('text');
const retryState = createStreamingRetryState(extractor);

// First attempt
extractor.push('<text>Hello');

if (!retryState.isComplete()) {
  // Get what we have so far
  const partial = retryState.getStreamedText();
  console.log('Partial:', partial);
  
  // Retry
  retryState.reset();
  // Try again...
}

Best Practices

  • Use XmlTagExtractor for structured XML responses
  • Use ActionStreamFilter in action handlers for automatic content detection
  • Use PassthroughExtractor for plain text responses
  • Buffer chunks on the client side for smooth rendering
  • Implement connection retry logic for dropped connections
  • Show loading indicators until first chunk arrives
  • Don’t assume chunks arrive at word boundaries - they may split mid-word
  • Don’t forget to handle the done state - streaming may not be complete
  • Don’t skip error handling - network issues can interrupt streams
  • Don’t send huge chunks - break them up for smoother UX
  • Don’t use streaming for short responses - overhead isn’t worth it

Performance Considerations

Chunk Size

The streaming system keeps a safe margin to avoid splitting tags:
// From streaming.ts
const SAFE_MARGIN = 10;
const MAX_BUFFER = 100 * 1024;  // 100KB max buffer
const MAX_CHUNK_SIZE = 1024 * 1024;  // 1MB max chunk

Buffer Management

// Buffers are automatically trimmed to prevent memory exhaustion
function trimBuffer(
  buffer: string,
  maxSize: number = MAX_BUFFER,
  keepSize: number = 1024
): string {
  if (buffer.length > maxSize) {
    return buffer.slice(-keepSize);
  }
  return buffer;
}

Client-Side Implementation

React Hook Example

import { useState, useEffect } from 'react';

function useStreamingResponse(prompt: string) {
  const [response, setResponse] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  
  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8080');
    
    ws.onopen = () => {
      setIsStreaming(true);
      ws.send(JSON.stringify({ prompt }));
    };
    
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      
      if (data.type === 'chunk') {
        setResponse(prev => prev + data.content);
      } else if (data.type === 'complete') {
        setIsStreaming(false);
      }
    };
    
    return () => ws.close();
  }, [prompt]);
  
  return { response, isStreaming };
}

Next Steps

Custom Actions

Build actions with streaming support

Deployment

Deploy streaming applications

Build docs developers (and LLMs) love