Skip to main content

Why Streaming?

Streaming enables:
  • Real-time user feedback
  • Progressive rendering
  • Lower perceived latency
  • Interruptible generation
  • Tool call visibility
DeepAgents provides multiple streaming interfaces for different use cases.

Basic Streaming

Text Stream

Stream text chunks as they’re generated:
import { agent, execute } from '@deepagents/agent';
import { openai } from '@ai-sdk/openai';

const assistant = agent({
  name: 'assistant',
  model: openai('gpt-4o'),
  prompt: 'You are helpful.',
});

const stream = execute(assistant, 'Tell me about AI agents', {});

for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

Full Stream

Access all stream events (text, tool calls, etc.):
const stream = execute(assistant, message, context);

for await (const chunk of stream.fullStream) {
  switch (chunk.type) {
    case 'text-delta':
      process.stdout.write(chunk.textDelta);
      break;
    case 'tool-call':
      console.log('\nCalling tool:', chunk.toolName);
      break;
    case 'tool-result':
      console.log('Tool result:', chunk.result);
      break;
    case 'finish':
      console.log('\nFinished:', chunk.finishReason);
      break;
  }
}

Stream Results

The execute() function returns a stream result object:
interface StreamResult {
  textStream: AsyncIterable<string>;        // Text chunks only
  fullStream: AsyncIterable<StreamChunk>;   // All events
  text: Promise<string>;                     // Complete text
  usage: Promise<TokenUsage>;                // Token counts
  finishReason: Promise<FinishReason>;       // Why it stopped
  rawResponse: Promise<RawResponse>;         // Raw model response
}

Getting Complete Text

Wait for the full response:
const stream = execute(assistant, message, context);
const completeText = await stream.text;
console.log(completeText);

Getting Usage Info

const stream = execute(assistant, message, context);
const usage = await stream.usage;

console.log('Prompt tokens:', usage.promptTokens);
console.log('Completion tokens:', usage.completionTokens);
console.log('Total tokens:', usage.totalTokens);

Finish Reason

const stream = execute(assistant, message, context);
const reason = await stream.finishReason;

switch (reason) {
  case 'stop':
    console.log('Completed naturally');
    break;
  case 'length':
    console.log('Hit token limit');
    break;
  case 'content-filter':
    console.log('Blocked by content filter');
    break;
  case 'tool-calls':
    console.log('Finished with tool calls');
    break;
}

Stream Types

Different chunk types in the full stream:
type StreamChunk =
  | { type: 'text-delta'; textDelta: string }
  | { type: 'tool-call'; toolCallId: string; toolName: string; args: any }
  | { type: 'tool-result'; toolCallId: string; result: any }
  | { type: 'tool-call-delta'; toolCallId: string; argsTextDelta: string }
  | { type: 'finish'; finishReason: FinishReason; usage: TokenUsage }
  | { type: 'error'; error: Error };

Swarm Streaming

The swarm() function provides high-level streaming for multi-agent systems:
import { swarm } from '@deepagents/agent';

const stream = swarm(
  coordinator,
  'Complete this task',
  context,
  abortSignal
);

for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

Swarm Stream Result

interface SwarmResult {
  textStream: AsyncIterable<string>;
  text: Promise<string>;
  messages: Promise<UIMessage[]>;
  agent: Promise<Agent>;  // Final agent that completed
}

Getting Messages

const stream = swarm(coordinator, message, context);
const messages = await stream.messages;

for (const msg of messages) {
  console.log(msg.role, ':', msg.parts);
}

Abort Signals

Cancel streaming execution:
const controller = new AbortController();

const stream = swarm(
  agent,
  message,
  context,
  controller.signal
);

// Cancel after 10 seconds
setTimeout(() => controller.abort(), 10000);

try {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('\nCancelled by user');
  }
}

Progress Tracking

Track tool calls and agent transitions:
const stream = execute(agent, message, context);

let toolCallCount = 0;
const tools: string[] = [];

for await (const chunk of stream.fullStream) {
  if (chunk.type === 'text-delta') {
    process.stdout.write(chunk.textDelta);
  } else if (chunk.type === 'tool-call') {
    toolCallCount++;
    tools.push(chunk.toolName);
    console.log(`\n[Tool ${toolCallCount}] ${chunk.toolName}`);
  } else if (chunk.type === 'tool-result') {
    console.log(`[Result] ${JSON.stringify(chunk.result).slice(0, 50)}...`);
  }
}

console.log(`\n\nUsed ${toolCallCount} tools: ${tools.join(', ')}`);

Buffering Strategies

Character Buffering

Buffer chunks for smoother output:
const stream = execute(agent, message, context);
let buffer = '';
const BUFFER_SIZE = 10;

for await (const chunk of stream.textStream) {
  buffer += chunk;
  if (buffer.length >= BUFFER_SIZE) {
    process.stdout.write(buffer);
    buffer = '';
  }
}

if (buffer) {
  process.stdout.write(buffer);
}

Line Buffering

Wait for complete lines:
const stream = execute(agent, message, context);
let buffer = '';

for await (const chunk of stream.textStream) {
  buffer += chunk;
  const lines = buffer.split('\n');
  buffer = lines.pop() || '';
  
  for (const line of lines) {
    console.log(line);
  }
}

if (buffer) {
  console.log(buffer);
}

Time Buffering

Flush periodically:
const stream = execute(agent, message, context);
let buffer = '';
let lastFlush = Date.now();
const FLUSH_INTERVAL = 100; // ms

for await (const chunk of stream.textStream) {
  buffer += chunk;
  const now = Date.now();
  
  if (now - lastFlush >= FLUSH_INTERVAL) {
    process.stdout.write(buffer);
    buffer = '';
    lastFlush = now;
  }
}

if (buffer) {
  process.stdout.write(buffer);
}

UI Integration

React Integration

import { useState, useEffect } from 'react';

function ChatComponent() {
  const [text, setText] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  
  const handleSend = async (message: string) => {
    setIsStreaming(true);
    setText('');
    
    const stream = execute(agent, message, context);
    
    for await (const chunk of stream.textStream) {
      setText(prev => prev + chunk);
    }
    
    setIsStreaming(false);
  };
  
  return (
    <div>
      <div>{text}</div>
      {isStreaming && <span>...</span>}
      <button onClick={() => handleSend('Hello')}>Send</button>
    </div>
  );
}

Server-Sent Events (SSE)

import { Readable } from 'stream';

app.get('/stream', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const stream = execute(agent, req.query.message, context);
  
  for await (const chunk of stream.textStream) {
    res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`);
  }
  
  res.write('data: [DONE]\n\n');
  res.end();
});

WebSocket Streaming

import WebSocket from 'ws';

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  ws.on('message', async (message) => {
    const stream = execute(agent, message.toString(), context);
    
    for await (const chunk of stream.fullStream) {
      ws.send(JSON.stringify(chunk));
    }
    
    ws.send(JSON.stringify({ type: 'done' }));
  });
});

Error Handling

Stream Errors

const stream = execute(agent, message, context);

try {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
} catch (error) {
  console.error('Stream error:', error);
  // Handle gracefully
}

Partial Results

Access partial text on error:
const stream = execute(agent, message, context);
let partialText = '';

try {
  for await (const chunk of stream.textStream) {
    partialText += chunk;
    process.stdout.write(chunk);
  }
} catch (error) {
  console.log('\n\nPartial result:', partialText);
}

Performance Tips

Chunk Size

Larger chunks = fewer events, less overhead

Buffer Wisely

Balance smoothness vs latency

Async Iterators

Use for await...of for backpressure

Memory

Clear old chunks from memory

Best Practices

Error Boundaries

Always wrap streams in try-catch

Abort Signals

Provide cancellation for long tasks

Progress Indicators

Show tool calls and status updates

Graceful Degradation

Fall back to complete text on errors

Next Steps

Agent Package

Full API reference

Examples

See streaming examples

Build docs developers (and LLMs) love