Streaming Responses
Learn how to stream agent responses in real-time, providing immediate feedback and better user experience for long-running tasks.Overview
Streaming enables:- Real-time output as the agent thinks
- Better user experience with immediate feedback
- Progress monitoring for long tasks
- Integration with dashboards and UIs
- Token-by-token response visualization
Basic Streaming
Enable streaming with a simple flag:from swarms import Agent
# Create agent with streaming enabled
agent = Agent(
agent_name="Streaming-Agent",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True, # Enable streaming
)
# Run task - output streams in real-time
response = agent.run(
"Write a detailed explanation of how neural networks work"
)
print(response) # Complete response after streaming
Streaming with Callback
Use callbacks to process streaming tokens in real-time:from swarms import Agent
def streaming_callback(token: str):
"""
Callback function called for each streaming token
Args:
token (str): The new token from the stream
"""
# Process token in real-time
print(token, end="", flush=True)
# Could also:
# - Send to websocket
# - Update UI
# - Log to file
# - Analyze sentiment
agent = Agent(
agent_name="Interactive-Agent",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
)
response = agent.run(
task="Explain quantum computing",
streaming_callback=streaming_callback,
)
Advanced Streaming Modes
1. Detailed Streaming (stream=True)
Get detailed token information with metadata:from swarms import Agent
import json
def detailed_callback(token_info: dict):
"""
Receive detailed token information
Args:
token_info (dict): Contains token, usage, finish_reason, etc.
"""
# Access detailed information
if 'content' in token_info:
print(token_info['content'], end="", flush=True)
# Access metadata
if 'usage' in token_info:
print(f"\n\nTokens used: {token_info['usage']}")
if 'finish_reason' in token_info:
print(f"Finished: {token_info['finish_reason']}")
agent = Agent(
agent_name="Detailed-Streaming-Agent",
model_name="gpt-4o-mini",
max_loops=1,
stream=True, # Enable detailed streaming
)
response = agent.run(
task="Analyze the current state of AI technology",
streaming_callback=detailed_callback,
)
2. Silent Streaming
Collect chunks without printing:from swarms import Agent
# Silent streaming - no console output
agent = Agent(
agent_name="Silent-Streamer",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False, # Disable printing
)
# Chunks are collected but not displayed
response = agent.run("Generate a report on market trends")
# Process complete response
print("\n=== Final Response ===")
print(response)
3. Custom Streaming Panel
Create custom visual displays:from swarms import Agent
from rich.live import Live
from rich.panel import Panel
from rich.console import Console
console = Console()
collected_text = []
def custom_panel_callback(token: str):
"""
Display streaming in custom panel
"""
collected_text.append(token)
full_text = "".join(collected_text)
# Update display
console.clear()
console.print(Panel(
full_text,
title="🤖 Agent Thinking...",
border_style="blue",
))
agent = Agent(
agent_name="Custom-Display-Agent",
model_name="gpt-4o-mini",
streaming_on=True,
print_on=False,
)
response = agent.run(
task="Write a creative story",
streaming_callback=custom_panel_callback,
)
Streaming in Multi-Agent Workflows
Concurrent Workflow with Streaming
from swarms import Agent, ConcurrentWorkflow
# Create agents with streaming
market_researcher = Agent(
agent_name="Market-Researcher",
system_prompt="""You are a market research specialist. Analyze market trends
and provide actionable insights.""",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False, # Silent mode for concurrent workflows
)
financial_analyst = Agent(
agent_name="Financial-Analyst",
system_prompt="""You are a financial analysis expert. Evaluate investment
opportunities and assess risks.""",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
)
technical_analyst = Agent(
agent_name="Technical-Analyst",
system_prompt="""You are a technical analysis specialist. Analyze price patterns
and provide trading recommendations.""",
model_name="gpt-4o-mini",
max_loops=1,
streaming_on=True,
print_on=False,
)
# Create concurrent workflow with dashboard
workflow = ConcurrentWorkflow(
name="market-analysis-workflow",
agents=[market_researcher, financial_analyst, technical_analyst],
max_loops=1,
show_dashboard=True, # Shows real-time streaming from all agents
)
# Run workflow - streams from all agents simultaneously
result = workflow.run(
"Analyze Tesla (TSLA) stock from market, financial, and technical perspectives"
)
print(result)
Sequential Workflow with Streaming
from swarms import Agent, SequentialWorkflow
# Agent 1: Researcher with streaming
researcher = Agent(
agent_name="Researcher",
system_prompt="Research the topic and provide detailed analysis",
model_name="gpt-4o-mini",
streaming_on=True,
)
# Agent 2: Writer with streaming
writer = Agent(
agent_name="Writer",
system_prompt="Write engaging content based on research",
model_name="gpt-4o-mini",
streaming_on=True,
)
# Create sequential workflow
workflow = SequentialWorkflow(
agents=[researcher, writer],
max_loops=1,
)
# Each agent streams output in sequence
final_output = workflow.run(
"The future of renewable energy technology"
)
Streaming Configuration
Agent-Level Configuration
from swarms import Agent
agent = Agent(
agent_name="Configured-Streamer",
model_name="gpt-4o-mini",
# Streaming options
streaming_on=True, # Enable basic streaming
stream=False, # Disable detailed streaming (default)
print_on=True, # Show output (default)
verbose=True, # Show detailed logs
# Other options
max_loops=1,
interactive=True,
)
Runtime Configuration
def dynamic_callback(token: str):
print(token, end="", flush=True)
# Pass callback at runtime
response = agent.run(
task="Analyze data",
streaming_callback=dynamic_callback, # Override default behavior
)
Real-World Examples
1. WebSocket Integration
import asyncio
import websockets
from swarms import Agent
class WebSocketStreamer:
def __init__(self, websocket):
self.websocket = websocket
async def send_token(self, token: str):
"""Send token to WebSocket client"""
await self.websocket.send(json.dumps({
"type": "token",
"content": token,
}))
async def handle_client(websocket, path):
# Create agent
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
print_on=False,
)
# Create streamer
streamer = WebSocketStreamer(websocket)
# Stream to WebSocket
response = agent.run(
task="Generate analysis",
streaming_callback=lambda token: asyncio.create_task(
streamer.send_token(token)
),
)
# Send completion
await websocket.send(json.dumps({
"type": "complete",
"content": response,
}))
# Start WebSocket server
start_server = websockets.serve(handle_client, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
2. Progress Bar Integration
from swarms import Agent
from rich.progress import Progress, TextColumn, BarColumn
from rich.console import Console
console = Console()
def progress_streaming(task_description: str) -> str:
"""
Stream with progress bar
"""
tokens = []
with Progress(
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
) as progress:
task = progress.add_task(task_description, total=100)
def callback(token: str):
tokens.append(token)
# Update progress (estimate)
progress.update(task, advance=1)
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
print_on=False,
)
response = agent.run(
task=task_description,
streaming_callback=callback,
)
return response
result = progress_streaming("Generate comprehensive report")
3. File Logging
from swarms import Agent
import time
class StreamLogger:
def __init__(self, log_file: str):
self.log_file = log_file
self.tokens = []
def log_token(self, token: str):
"""
Log token to file with timestamp
"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(self.log_file, "a") as f:
f.write(f"[{timestamp}] {token}")
self.tokens.append(token)
def get_full_response(self) -> str:
return "".join(self.tokens)
# Create logger
logger = StreamLogger("agent_stream.log")
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
)
response = agent.run(
task="Analyze quarterly results",
streaming_callback=logger.log_token,
)
print(f"Full response logged to: {logger.log_file}")
Best Practices
1. Buffer Management
def buffered_callback(min_chunk_size: int = 10):
"""
Buffer tokens before processing
"""
buffer = []
def callback(token: str):
buffer.append(token)
# Process when buffer reaches minimum size
if len(buffer) >= min_chunk_size:
chunk = "".join(buffer)
print(chunk, end="", flush=True)
buffer.clear()
return callback
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
print_on=False,
)
response = agent.run(
task="Generate report",
streaming_callback=buffered_callback(min_chunk_size=20),
)
2. Error Handling
def safe_streaming_callback(token: str):
"""
Callback with error handling
"""
try:
# Process token
print(token, end="", flush=True)
# Additional processing
# send_to_api(token)
except Exception as e:
logger.error(f"Streaming error: {e}")
# Continue streaming despite error
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
)
response = agent.run(
task="Generate content",
streaming_callback=safe_streaming_callback,
)
3. Rate Limiting
import time
class RateLimitedStreamer:
def __init__(self, min_interval: float = 0.1):
self.min_interval = min_interval
self.last_time = 0
def callback(self, token: str):
# Enforce minimum interval between outputs
current_time = time.time()
elapsed = current_time - self.last_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
print(token, end="", flush=True)
self.last_time = time.time()
streamer = RateLimitedStreamer(min_interval=0.05)
agent = Agent(
model_name="gpt-4o-mini",
streaming_on=True,
print_on=False,
)
response = agent.run(
task="Write story",
streaming_callback=streamer.callback,
)
Output Examples
Basic Streaming Output
🤖 Agent: Streaming-Agent | Loops: 1
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Neural networks are computational models inspired by the human brain...
[text appears token by token in real-time]
...and this makes them powerful tools for pattern recognition.
Concurrent Workflow Dashboard
╭─────────────────────── Market Analysis Workflow ───────────────────────╮
│ │
│ 🔄 Market-Researcher │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ The current market shows strong momentum... │
│ │
│ 🔄 Financial-Analyst │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ Revenue growth indicates solid fundamentals... │
│ │
│ 🔄 Technical-Analyst │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ Price action suggests bullish trend... │
│ │
╰─────────────────────────────────────────────────────────────────────────╯
Troubleshooting
Streaming Not Working
# Check if model supports streaming
from litellm.utils import supports_function_calling
model_name = "gpt-4o-mini"
if hasattr(agent.llm, "stream"):
print("Model supports streaming")
else:
print("Model may not support streaming")
# Ensure streaming is enabled
agent.streaming_on = True
Tokens Not Appearing
# Make sure to flush output
def callback(token: str):
print(token, end="", flush=True) # flush=True is important!
Next Steps
- Basic Agent - Learn agent fundamentals
- Multi-Agent Workflows - Stream from multiple agents
- Agent Output Types - Structure streaming outputs
- Interactive GroupChat - Real-time agent interactions