Why Stream?
- Immediate Feedback: Users see responses as they’re generated
- Better UX: Perceived performance improvement
- Tool Visibility: See tool calls and reasoning in real-time
- Lower Latency: First token appears faster
- Cancellation: Stop generation early if needed
Quick Start
Enable streaming with thestream_chat method:
Stream Event Types
Agentor emits structured events during streaming:JSON Serialization
Get events as JSON strings for easy transmission:HTTP Streaming
Serve streaming responses over HTTP:Built-in Server Streaming
Agentor’s built-in server supports streaming out of the box:A2A Protocol Streaming
Stream responses using the A2A protocol:Advanced Streaming Patterns
Custom Event Filtering
Filter specific event types:Progress Tracking
Track completion progress:Buffered Streaming
Buffer and process events in batches:Multi-Agent Streaming
Stream from multiple agents concurrently:WebSocket Streaming
For bidirectional streaming, use WebSockets:Error Handling
Handle streaming errors gracefully:Best Practices
# Good
async def main():
async for event in agent.stream_chat("Hello"):
print(event)
# Won't work
for event in agent.stream_chat("Hello"): # Error!
print(event)
async for event in agent.stream_chat("Hello", serialize=False):
if event.chunk:
print(event.chunk, end="", flush=True) # flush=True is important
buffer = ""
async for event in agent.stream_chat("Hello", serialize=False):
if event.chunk:
buffer += event.chunk
# Process complete words only
if " " in buffer:
words = buffer.split(" ")
for word in words[:-1]:
process_word(word)
buffer = words[-1]
import asyncio
async def stream_with_timeout():
try:
async with asyncio.timeout(30): # 30 second timeout
async for event in agent.stream_chat("Long task"):
print(event)
except asyncio.TimeoutError:
print("Stream timeout")
from fastapi.responses import StreamingResponse
@app.post("/chat")
async def chat(message: str):
async def event_stream():
try:
async for chunk in agent.stream_chat(message):
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"event: error\ndata: {str(e)}\n\n"
return StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
Performance Tips
- Use
serialize=True(default) when sending over network - Use
serialize=Falsefor local processing to avoid JSON overhead - Buffer small chunks for better network efficiency
- Set appropriate timeouts based on expected response time
- Close streams properly to free resources
Next Steps
- Deploy streaming agents with the Celesto CLI
- Enable observability to monitor stream performance
- Learn about agent communication with streaming A2A