Skip to main content

Streaming Responses

Streaming enables real-time token-by-token responses from agents, providing immediate feedback to users and better UX for long-running operations.

Why Stream?

Benefits of streaming:
  • Immediate feedback - Users see responses as they’re generated
  • Better UX - Perceived performance improvement
  • Progress visibility - See tool calls and reasoning in real-time
  • Cancellation - Abort long-running operations
  • Lower latency - First token arrives faster

Basic Streaming

From: packages/core/src/agent/agent.test.ts

Text Streaming

Stream plain text responses:
import { Agent } from '@mastra/core/agent';

const agent = new Agent({
  id: 'assistant',
  name: 'Assistant',
  instructions: 'You are a helpful assistant',
  model: 'openai/gpt-4o',
});

// Start streaming
const stream = await agent.stream('Tell me a story about dragons');

// Consume text stream
for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

// Get final text
const finalText = await stream.text;
console.log('\nComplete:', finalText);

Full Stream Events

Access all stream events including tool calls:
const stream = await agent.stream('What is the weather in NYC?');

for await (const event of stream.fullStream) {
  switch (event.type) {
    case 'text-delta':
      process.stdout.write(event.textDelta);
      break;
      
    case 'tool-call':
      console.log(`\nCalling tool: ${event.toolName}`);
      console.log('Args:', event.args);
      break;
      
    case 'tool-result':
      console.log(`Tool ${event.toolName} completed`);
      console.log('Result:', event.result);
      break;
      
    case 'finish':
      console.log('\nStream finished');
      break;
  }
}

Stream Options

Basic Options

const stream = await agent.stream('Hello', {
  // Memory configuration
  memory: {
    resource: 'user-123',
    thread: { id: 'thread-456' },
  },
  
  // Model settings
  modelSettings: {
    temperature: 0.7,
    maxTokens: 1000,
  },
  
  // Maximum agentic steps (tool calls)
  maxSteps: 5,
  
  // Request context for dynamic config
  requestContext: ctx,
});

Callbacks

React to stream events:
const stream = await agent.stream('Analyze this data', {
  // Called after each step completes
  onStepFinish: async ({ text, toolCalls, toolResults }) => {
    console.log('Step completed');
    console.log('Tool calls:', toolCalls.length);
  },
  
  // Called when stream finishes
  onFinish: async ({ text, finishReason }) => {
    console.log('Final text:', text);
    console.log('Finish reason:', finishReason);
  },
  
  // Called for each chunk
  onChunk: async ({ chunk }) => {
    // Process individual chunks
  },
  
  // Called on errors
  onError: async ({ error }) => {
    console.error('Stream error:', error);
  },
});

Streaming with Tools

From: examples/agent/src/mastra/agents/model-v2-agent.ts
import { Agent } from '@mastra/core/agent';
import { createTool } from '@mastra/core/tools';
import { z } from 'zod';

const weatherTool = createTool({
  id: 'get-weather',
  description: 'Get current weather',
  inputSchema: z.object({
    location: z.string(),
  }),
  execute: async ({ location }) => {
    return { temp: 72, condition: 'sunny' };
  },
});

const agent = new Agent({
  id: 'weather-agent',
  name: 'Weather Agent',
  instructions: 'Help with weather information',
  model: 'openai/gpt-4o',
  tools: { weatherTool },
});

const stream = await agent.stream('What is the weather in San Francisco?');

for await (const event of stream.fullStream) {
  if (event.type === 'text-delta') {
    process.stdout.write(event.textDelta);
  }
  
  if (event.type === 'tool-call') {
    console.log(`\n[Calling ${event.toolName}]`);
  }
  
  if (event.type === 'tool-result') {
    console.log(`[${event.toolName} completed]\n`);
  }
}

// Wait for completion
const result = await stream.text;
const toolCalls = await stream.toolCalls;

console.log('\nFinal result:', result);
console.log('Tools used:', toolCalls.length);

Stream Abort

Cancel streaming operations:
const abortController = new AbortController();

const stream = await agent.stream('Generate a long document', {
  abortSignal: abortController.signal,
  onAbort: async () => {
    console.log('Stream aborted by user');
  },
});

// Cancel after 5 seconds
setTimeout(() => {
  abortController.abort();
}, 5000);

try {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
} catch (error) {
  if (error.name === 'AbortError') {
    console.log('\nStream was cancelled');
  }
}

Streaming with Memory

import { Memory } from '@mastra/memory';

const agent = new Agent({
  id: 'chat-agent',
  name: 'Chat Agent',
  instructions: 'You are a conversational assistant',
  model: 'openai/gpt-4o',
  memory: new Memory(),
});

// First message
const stream1 = await agent.stream('My name is Alice', {
  memory: {
    resource: 'user-123',
    thread: { id: 'conversation-1' },
  },
});

for await (const chunk of stream1.textStream) {
  process.stdout.write(chunk);
}

// Second message - agent remembers context
const stream2 = await agent.stream('What is my name?', {
  memory: {
    resource: 'user-123',
    thread: { id: 'conversation-1' },
  },
});

for await (const chunk of stream2.textStream) {
  process.stdout.write(chunk);
}
// Output: "Your name is Alice."

Advanced Patterns

Progress Tracking

From: examples/agent/src/mastra/agents/model-v2-agent.ts
const stream = await agent.stream('Analyze this dataset', {
  maxSteps: 10,
  onIterationComplete: async ({ iteration, maxIterations, toolCalls, text }) => {
    const progress = maxIterations ? (iteration / maxIterations) * 100 : 0;
    console.log(`Progress: ${progress.toFixed(0)}%`);
    console.log(`Iteration ${iteration}: Called ${toolCalls.length} tools`);
    
    // Optionally control continuation
    if (text.includes('COMPLETE')) {
      return { continue: false };
    }
    
    return { continue: true };
  },
});

Tool Approval

Require approval for sensitive tools:
import { createTool } from '@mastra/core/tools';

const deleteFileTool = createTool({
  id: 'delete-file',
  description: 'Delete a file',
  requireApproval: true,
  inputSchema: z.object({
    path: z.string(),
  }),
  execute: async ({ path }) => {
    await fs.unlink(path);
    return { deleted: true };
  },
});

const agent = new Agent({
  id: 'file-agent',
  name: 'File Agent',
  instructions: 'Manage files',
  model: 'openai/gpt-4o',
  tools: { deleteFileTool },
});

const stream = await agent.stream('Delete old logs', {
  requireToolApproval: true,
});

for await (const event of stream.fullStream) {
  if (event.type === 'tool-call' && event.approval) {
    console.log(`Approve deletion of ${event.args.path}? (y/n)`);
    
    // Get user input (simplified)
    const answer = await getUserInput();
    
    if (answer === 'y') {
      await event.approval.approve();
    } else {
      await event.approval.reject('User declined');
    }
  }
  
  if (event.type === 'text-delta') {
    process.stdout.write(event.textDelta);
  }
}

Multi-Step Operations

const stream = await agent.stream(
  'Research quantum computing and create a report',
  {
    maxSteps: 15,
    onStepFinish: async ({ stepNumber, toolCalls, toolResults }) => {
      console.log(`\n--- Step ${stepNumber} ---`);
      
      toolCalls.forEach((call, i) => {
        console.log(`Tool: ${call.toolName}`);
        console.log(`Result: ${JSON.stringify(toolResults[i]?.result)}`);
      });
    },
  }
);

for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}

const finalResult = await stream.text;
console.log('\n\nFinal report:', finalResult);

Server-Sent Events (SSE)

Stream responses to web clients:
import { Agent } from '@mastra/core/agent';
import { streamToResponse } from '@mastra/core/stream';

const agent = new Agent({
  id: 'chat',
  name: 'Chat Agent',
  instructions: 'You are helpful',
  model: 'openai/gpt-4o',
});

// Express/Node.js endpoint
app.post('/api/chat', async (req, res) => {
  const { message } = req.body;
  
  const stream = await agent.stream(message);
  
  // Convert to SSE
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  for await (const event of stream.fullStream) {
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  }
  
  res.end();
});

WebSocket Streaming

import { WebSocket, WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  ws.on('message', async (message) => {
    const { text } = JSON.parse(message);
    
    const stream = await agent.stream(text);
    
    for await (const event of stream.fullStream) {
      ws.send(JSON.stringify({
        type: event.type,
        data: event,
      }));
    }
    
    ws.send(JSON.stringify({ type: 'done' }));
  });
});

React Integration

import { useState } from 'react';
import { useChat } from '@mastra/react';

function ChatComponent() {
  const { messages, input, setInput, sendMessage, isLoading } = useChat({
    api: '/api/chat',
    onChunk: (chunk) => {
      // Handle streaming chunks
      console.log('Chunk:', chunk);
    },
  });
  
  return (
    <div>
      <div>
        {messages.map((msg, i) => (
          <div key={i}>
            <strong>{msg.role}:</strong> {msg.content}
          </div>
        ))}
      </div>
      
      <input
        value={input}
        onChange={(e) => setInput(e.target.value)}
        onKeyDown={(e) => {
          if (e.key === 'Enter') sendMessage();
        }}
        disabled={isLoading}
      />
      
      {isLoading && <div>Streaming...</div>}
    </div>
  );
}

Best Practices

Always wrap streaming in try-catch:
try {
  for await (const chunk of stream.textStream) {
    process.stdout.write(chunk);
  }
} catch (error) {
  console.error('Streaming failed:', error);
  // Show fallback UI
}
Allow users to cancel long operations:
const controller = new AbortController();

const stream = await agent.stream(prompt, {
  abortSignal: controller.signal,
});

// Cancel button
button.onclick = () => controller.abort();
Accumulate tokens before displaying:
let buffer = '';

for await (const chunk of stream.textStream) {
  buffer += chunk;
  
  // Display every 5 chars or on punctuation
  if (buffer.length >= 5 || /[.!?]/.test(chunk)) {
    display(buffer);
    buffer = '';
  }
}
Show streaming indicators:
setStatus('streaming');

try {
  for await (const chunk of stream.textStream) {
    appendToUI(chunk);
  }
  setStatus('complete');
} catch (error) {
  setStatus('error');
}

Next Steps

Structured Output

Get typed, validated responses from agents

Memory

Add conversation persistence

Network

Orchestrate multi-agent workflows

Stream API Reference

Complete streaming API documentation

Build docs developers (and LLMs) love