Skip to main content
The agent framework provides powerful streaming capabilities for real-time interaction with agents. Stream text, tool calls, and structured output as they’re generated.

Basic Streaming

Use execute() (or stream()) for streaming responses:
import { openai } from '@ai-sdk/openai';
import { agent, execute } from '@deepagents/agent';

const assistant = agent({
  name: 'assistant',
  model: openai('gpt-4o'),
  prompt: 'You are a helpful assistant.',
});

const stream = await execute(assistant, 'Tell me a story', {});

// Stream text chunks
for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

Stream Types

Text Stream

Stream generated text as it’s produced:
const stream = await execute(assistant, 'Write a poem', {});

for await (const chunk of stream.textStream) {
  process.stdout.write(chunk); // Print each chunk immediately
}

Full Stream

Access all events including tool calls:
for await (const event of stream.fullStream) {
  switch (event.type) {
    case 'text-delta':
      process.stdout.write(event.textDelta);
      break;
    case 'tool-call':
      console.log('Tool called:', event.toolName);
      break;
    case 'tool-result':
      console.log('Tool result:', event.result);
      break;
  }
}

UI Message Stream

Stream UI-compatible messages:
for await (const chunk of stream.toUIMessageStream()) {
  if (chunk.type === 'text-delta') {
    console.log('Text:', chunk.delta);
  }
  if (chunk.type === 'reasoning-delta') {
    console.log('Reasoning:', chunk.delta);
  }
}

Getting Final Results

Final Text

const stream = await execute(assistant, 'What is AI?', {});
const text = await stream.text;
console.log(text); // Complete response text

Usage Information

const stream = await execute(assistant, 'Hello', {});
const usage = await stream.totalUsage;

console.log(usage);
// {
//   promptTokens: 150,
//   completionTokens: 50,
//   totalTokens: 200
// }

Sources (if available)

const stream = await execute(assistant, 'Research topic', {});
const sources = await stream.sources;

console.log(sources);
// [
//   { type: 'url', url: 'https://example.com', title: 'Source 1' },
//   { type: 'file', filename: 'data.pdf', mediaType: 'application/pdf' }
// ]

Streaming with Structured Output

Stream partial structured output:
import { z } from 'zod';

const taskSchema = z.object({
  title: z.string(),
  steps: z.array(z.string()),
  estimatedTime: z.number(),
});

const planner = agent({
  name: 'planner',
  model: openai('gpt-4o'),
  prompt: 'Create a task plan.',
  output: taskSchema,
});

const stream = await execute(planner, 'Plan a vacation', {});

// Stream partial output as it's generated
for await (const partial of stream.partialOutputStream) {
  console.log('Partial:', partial);
  // { title: 'Plan a Vacation' }
  // { title: 'Plan a Vacation', steps: ['Book flights'] }
  // { title: 'Plan a Vacation', steps: ['Book flights', 'Reserve hotel'], ... }
}

// Get final complete output
const final = await stream.output;
console.log('Final:', final);

Swarm Streaming

For multi-agent systems, use swarm() to handle handoffs:
import { swarm } from '@deepagents/agent';

const coordinator = agent({
  name: 'coordinator',
  model: openai('gpt-4o'),
  prompt: 'Coordinate tasks between specialists.',
  handoffs: [specialist1, specialist2],
});

const stream = swarm(coordinator, 'Complete this task', {});

for await (const chunk of stream) {
  if (chunk.type === 'text-delta') {
    process.stdout.write(chunk.delta);
  }
  if (chunk.type === 'reasoning-delta') {
    console.log('[Reasoning]', chunk.delta);
  }
}

Abort Streaming

Cancel ongoing streams:
const controller = new AbortController();

const stream = await execute(assistant, 'Write a long story', {}, {
  abortSignal: controller.signal,
});

// Start streaming
const streamingPromise = (async () => {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
})();

// Cancel after 5 seconds
setTimeout(() => {
  controller.abort();
  console.log('\nStream aborted');
}, 5000);

try {
  await streamingPromise;
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('Stream was cancelled');
  }
}

Custom Stream Processing

Transform Streams

Apply custom transformations:
import { smoothStream } from 'ai';

const stream = await execute(assistant, 'Tell me a story', {}, {
  transform: smoothStream(),
});

Process Tool Calls

const stream = await execute(assistant, 'Use tools to help', {});

for await (const event of stream.fullStream) {
  if (event.type === 'tool-call') {
    console.log(`Calling ${event.toolName} with:`, event.args);
  }
  
  if (event.type === 'tool-result') {
    console.log(`Result from ${event.toolName}:`, event.result);
  }
  
  if (event.type === 'text-delta') {
    process.stdout.write(event.textDelta);
  }
}

Streaming Utilities

Last Helper

Get the last item from a stream:
import { last } from '@deepagents/agent';

const stream = await execute(assistant, 'Count to 10', {});

// Get last chunk
const lastChunk = await last(stream.fullStream);
console.log('Last event:', lastChunk);

Printer Utility

Print streams to stdout:
import { printer } from '@deepagents/agent';

const stream = await execute(assistant, 'Tell me a story', {});

// Print to stdout with formatting
await printer.stdout(stream, {
  reasoning: true,  // Include reasoning
  text: true,       // Include text
  wrapInTags: true, // Wrap in XML-style tags
});

Real-World Example

import { openai } from '@ai-sdk/openai';
import { agent, execute } from '@deepagents/agent';
import { tool } from 'ai';
import { z } from 'zod';

const searchTool = tool({
  description: 'Search for information',
  parameters: z.object({
    query: z.string(),
  }),
  execute: async ({ query }) => {
    // Simulate search
    return `Results for: ${query}`;
  },
});

const researcher = agent({
  name: 'researcher',
  model: openai('gpt-4o'),
  prompt: 'Research topics and provide detailed information.',
  tools: { search: searchTool },
});

const stream = await execute(
  researcher,
  'Research the history of TypeScript',
  {}
);

console.log('Research starting...\n');

// Track tool calls
let toolCallCount = 0;

for await (const event of stream.fullStream) {
  switch (event.type) {
    case 'text-delta':
      process.stdout.write(event.textDelta);
      break;
      
    case 'tool-call':
      toolCallCount++;
      console.log(`\n[Tool Call #${toolCallCount}]: ${event.toolName}`);
      console.log(`[Args]: ${JSON.stringify(event.args)}`);
      break;
      
    case 'tool-result':
      console.log(`[Result]: ${event.result}\n`);
      break;
  }
}

console.log('\n\nResearch complete!');

const usage = await stream.totalUsage;
console.log('\nToken usage:');
console.log(`  Prompt: ${usage.promptTokens}`);
console.log(`  Completion: ${usage.completionTokens}`);
console.log(`  Total: ${usage.totalTokens}`);

Streaming to Files

Write streams directly to files:
import { createWriteStream } from 'fs';
import { Readable } from 'stream';

const stream = await execute(assistant, 'Write a long article', {});

const writeStream = createWriteStream('output.md');
Readable.fromWeb(stream.textStream as any).pipe(writeStream);

await new Promise((resolve, reject) => {
  writeStream.on('finish', resolve);
  writeStream.on('error', reject);
});

console.log('Article written to output.md');

Error Handling

Handle errors in streams:
try {
  const stream = await execute(assistant, 'Help me', {});
  
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
  
  const text = await stream.text;
  console.log('\nCompleted:', text.length, 'characters');
  
} catch (error) {
  if (error.name === 'AbortError') {
    console.error('Stream was aborted');
  } else if (error.message.includes('rate limit')) {
    console.error('Rate limit exceeded');
  } else {
    console.error('Stream error:', error.message);
  }
}

Best Practices

Always consume streams completely to avoid memory leaks:
// ✅ Good
for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

// ❌ Incomplete consumption
const firstChunk = await stream.textStream[Symbol.asyncIterator]().next();
// Stream not fully consumed!
Wrap stream processing in try-catch:
try {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
} catch (error) {
  console.error('Stream error:', error);
}
Provide abort signals for long-running streams:
const controller = new AbortController();
const stream = await execute(agent, message, {}, {
  abortSignal: controller.signal
});
Track token usage to manage costs:
const usage = await stream.totalUsage;
console.log('Tokens used:', usage.totalTokens);

Next Steps

API Reference

Full API documentation

Execution Functions

execute(), generate(), swarm() API

Build docs developers (and LLMs) love