Skip to main content

Streaming Responses

Learn how to stream agent responses in real-time, providing immediate feedback and better user experience for long-running tasks.

Overview

Streaming enables:
  • Real-time output as the agent thinks
  • Better user experience with immediate feedback
  • Progress monitoring for long tasks
  • Integration with dashboards and UIs
  • Token-by-token response visualization

Basic Streaming

Enable streaming with a simple flag:
from swarms import Agent

# Create agent with streaming enabled
agent = Agent(
    agent_name="Streaming-Agent",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,  # Enable streaming
)

# Run task - output streams in real-time
response = agent.run(
    "Write a detailed explanation of how neural networks work"
)

print(response)  # Complete response after streaming

Streaming with Callback

Use callbacks to process streaming tokens in real-time:
from swarms import Agent

def streaming_callback(token: str):
    """
    Callback function called for each streaming token
    
    Args:
        token (str): The new token from the stream
    """
    # Process token in real-time
    print(token, end="", flush=True)
    
    # Could also:
    # - Send to websocket
    # - Update UI
    # - Log to file
    # - Analyze sentiment

agent = Agent(
    agent_name="Interactive-Agent",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,
)

response = agent.run(
    task="Explain quantum computing",
    streaming_callback=streaming_callback,
)

Advanced Streaming Modes

1. Detailed Streaming (stream=True)

Get detailed token information with metadata:
from swarms import Agent
import json

def detailed_callback(token_info: dict):
    """
    Receive detailed token information
    
    Args:
        token_info (dict): Contains token, usage, finish_reason, etc.
    """
    # Access detailed information
    if 'content' in token_info:
        print(token_info['content'], end="", flush=True)
    
    # Access metadata
    if 'usage' in token_info:
        print(f"\n\nTokens used: {token_info['usage']}")
    
    if 'finish_reason' in token_info:
        print(f"Finished: {token_info['finish_reason']}")

agent = Agent(
    agent_name="Detailed-Streaming-Agent",
    model_name="gpt-4o-mini",
    max_loops=1,
    stream=True,  # Enable detailed streaming
)

response = agent.run(
    task="Analyze the current state of AI technology",
    streaming_callback=detailed_callback,
)

2. Silent Streaming

Collect chunks without printing:
from swarms import Agent

# Silent streaming - no console output
agent = Agent(
    agent_name="Silent-Streamer",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,
    print_on=False,  # Disable printing
)

# Chunks are collected but not displayed
response = agent.run("Generate a report on market trends")

# Process complete response
print("\n=== Final Response ===")
print(response)

3. Custom Streaming Panel

Create custom visual displays:
from swarms import Agent
from rich.live import Live
from rich.panel import Panel
from rich.console import Console

console = Console()
collected_text = []

def custom_panel_callback(token: str):
    """
    Display streaming in custom panel
    """
    collected_text.append(token)
    full_text = "".join(collected_text)
    
    # Update display
    console.clear()
    console.print(Panel(
        full_text,
        title="🤖 Agent Thinking...",
        border_style="blue",
    ))

agent = Agent(
    agent_name="Custom-Display-Agent",
    model_name="gpt-4o-mini",
    streaming_on=True,
    print_on=False,
)

response = agent.run(
    task="Write a creative story",
    streaming_callback=custom_panel_callback,
)

Streaming in Multi-Agent Workflows

Concurrent Workflow with Streaming

from swarms import Agent, ConcurrentWorkflow

# Create agents with streaming
market_researcher = Agent(
    agent_name="Market-Researcher",
    system_prompt="""You are a market research specialist. Analyze market trends
    and provide actionable insights.""",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,
    print_on=False,  # Silent mode for concurrent workflows
)

financial_analyst = Agent(
    agent_name="Financial-Analyst",
    system_prompt="""You are a financial analysis expert. Evaluate investment
    opportunities and assess risks.""",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,
    print_on=False,
)

technical_analyst = Agent(
    agent_name="Technical-Analyst",
    system_prompt="""You are a technical analysis specialist. Analyze price patterns
    and provide trading recommendations.""",
    model_name="gpt-4o-mini",
    max_loops=1,
    streaming_on=True,
    print_on=False,
)

# Create concurrent workflow with dashboard
workflow = ConcurrentWorkflow(
    name="market-analysis-workflow",
    agents=[market_researcher, financial_analyst, technical_analyst],
    max_loops=1,
    show_dashboard=True,  # Shows real-time streaming from all agents
)

# Run workflow - streams from all agents simultaneously
result = workflow.run(
    "Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)

print(result)

Sequential Workflow with Streaming

from swarms import Agent, SequentialWorkflow

# Agent 1: Researcher with streaming
researcher = Agent(
    agent_name="Researcher",
    system_prompt="Research the topic and provide detailed analysis",
    model_name="gpt-4o-mini",
    streaming_on=True,
)

# Agent 2: Writer with streaming
writer = Agent(
    agent_name="Writer",
    system_prompt="Write engaging content based on research",
    model_name="gpt-4o-mini",
    streaming_on=True,
)

# Create sequential workflow
workflow = SequentialWorkflow(
    agents=[researcher, writer],
    max_loops=1,
)

# Each agent streams output in sequence
final_output = workflow.run(
    "The future of renewable energy technology"
)

Streaming Configuration

Agent-Level Configuration

from swarms import Agent

agent = Agent(
    agent_name="Configured-Streamer",
    model_name="gpt-4o-mini",
    
    # Streaming options
    streaming_on=True,     # Enable basic streaming
    stream=False,          # Disable detailed streaming (default)
    print_on=True,         # Show output (default)
    verbose=True,          # Show detailed logs
    
    # Other options
    max_loops=1,
    interactive=True,
)

Runtime Configuration

def dynamic_callback(token: str):
    print(token, end="", flush=True)

# Pass callback at runtime
response = agent.run(
    task="Analyze data",
    streaming_callback=dynamic_callback,  # Override default behavior
)

Real-World Examples

1. WebSocket Integration

import asyncio
import websockets
from swarms import Agent

class WebSocketStreamer:
    def __init__(self, websocket):
        self.websocket = websocket
    
    async def send_token(self, token: str):
        """Send token to WebSocket client"""
        await self.websocket.send(json.dumps({
            "type": "token",
            "content": token,
        }))

async def handle_client(websocket, path):
    # Create agent
    agent = Agent(
        model_name="gpt-4o-mini",
        streaming_on=True,
        print_on=False,
    )
    
    # Create streamer
    streamer = WebSocketStreamer(websocket)
    
    # Stream to WebSocket
    response = agent.run(
        task="Generate analysis",
        streaming_callback=lambda token: asyncio.create_task(
            streamer.send_token(token)
        ),
    )
    
    # Send completion
    await websocket.send(json.dumps({
        "type": "complete",
        "content": response,
    }))

# Start WebSocket server
start_server = websockets.serve(handle_client, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)

2. Progress Bar Integration

from swarms import Agent
from rich.progress import Progress, TextColumn, BarColumn
from rich.console import Console

console = Console()

def progress_streaming(task_description: str) -> str:
    """
    Stream with progress bar
    """
    tokens = []
    
    with Progress(
        TextColumn("[bold blue]{task.description}"),
        BarColumn(),
        TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
    ) as progress:
        
        task = progress.add_task(task_description, total=100)
        
        def callback(token: str):
            tokens.append(token)
            # Update progress (estimate)
            progress.update(task, advance=1)
        
        agent = Agent(
            model_name="gpt-4o-mini",
            streaming_on=True,
            print_on=False,
        )
        
        response = agent.run(
            task=task_description,
            streaming_callback=callback,
        )
    
    return response

result = progress_streaming("Generate comprehensive report")

3. File Logging

from swarms import Agent
import time

class StreamLogger:
    def __init__(self, log_file: str):
        self.log_file = log_file
        self.tokens = []
    
    def log_token(self, token: str):
        """
        Log token to file with timestamp
        """
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        with open(self.log_file, "a") as f:
            f.write(f"[{timestamp}] {token}")
        
        self.tokens.append(token)
    
    def get_full_response(self) -> str:
        return "".join(self.tokens)

# Create logger
logger = StreamLogger("agent_stream.log")

agent = Agent(
    model_name="gpt-4o-mini",
    streaming_on=True,
)

response = agent.run(
    task="Analyze quarterly results",
    streaming_callback=logger.log_token,
)

print(f"Full response logged to: {logger.log_file}")

Best Practices

1. Buffer Management

def buffered_callback(min_chunk_size: int = 10):
    """
    Buffer tokens before processing
    """
    buffer = []
    
    def callback(token: str):
        buffer.append(token)
        
        # Process when buffer reaches minimum size
        if len(buffer) >= min_chunk_size:
            chunk = "".join(buffer)
            print(chunk, end="", flush=True)
            buffer.clear()
    
    return callback

agent = Agent(
    model_name="gpt-4o-mini",
    streaming_on=True,
    print_on=False,
)

response = agent.run(
    task="Generate report",
    streaming_callback=buffered_callback(min_chunk_size=20),
)

2. Error Handling

def safe_streaming_callback(token: str):
    """
    Callback with error handling
    """
    try:
        # Process token
        print(token, end="", flush=True)
        
        # Additional processing
        # send_to_api(token)
        
    except Exception as e:
        logger.error(f"Streaming error: {e}")
        # Continue streaming despite error

agent = Agent(
    model_name="gpt-4o-mini",
    streaming_on=True,
)

response = agent.run(
    task="Generate content",
    streaming_callback=safe_streaming_callback,
)

3. Rate Limiting

import time

class RateLimitedStreamer:
    def __init__(self, min_interval: float = 0.1):
        self.min_interval = min_interval
        self.last_time = 0
    
    def callback(self, token: str):
        # Enforce minimum interval between outputs
        current_time = time.time()
        elapsed = current_time - self.last_time
        
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        
        print(token, end="", flush=True)
        self.last_time = time.time()

streamer = RateLimitedStreamer(min_interval=0.05)

agent = Agent(
    model_name="gpt-4o-mini",
    streaming_on=True,
    print_on=False,
)

response = agent.run(
    task="Write story",
    streaming_callback=streamer.callback,
)

Output Examples

Basic Streaming Output

🤖 Agent: Streaming-Agent | Loops: 1
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Neural networks are computational models inspired by the human brain...
[text appears token by token in real-time]
...and this makes them powerful tools for pattern recognition.

Concurrent Workflow Dashboard

╭─────────────────────── Market Analysis Workflow ───────────────────────╮
│                                                                         │
│ 🔄 Market-Researcher                                                   │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ The current market shows strong momentum...                            │
│                                                                         │
│ 🔄 Financial-Analyst                                                   │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ Revenue growth indicates solid fundamentals...                         │
│                                                                         │
│ 🔄 Technical-Analyst                                                   │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ Price action suggests bullish trend...                                 │
│                                                                         │
╰─────────────────────────────────────────────────────────────────────────╯

Troubleshooting

Streaming Not Working

# Check if model supports streaming
from litellm.utils import supports_function_calling

model_name = "gpt-4o-mini"
if hasattr(agent.llm, "stream"):
    print("Model supports streaming")
else:
    print("Model may not support streaming")

# Ensure streaming is enabled
agent.streaming_on = True

Tokens Not Appearing

# Make sure to flush output
def callback(token: str):
    print(token, end="", flush=True)  # flush=True is important!

Next Steps

Learn More

Build docs developers (and LLMs) love