Overview
DeerFlow supports streaming agent responses using Server-Sent Events (SSE). This allows you to:
Display agent responses in real-time as they’re generated
Show tool calls and results as they happen
Track task progress with sub-agents
Handle interrupts and clarification requests
Display thinking process for supported models
Streaming with LangGraph SDK
Use the stream method to receive events as they occur:
from langgraph_sdk import get_client
client = get_client( url = "http://localhost:2024" )
# Create thread
thread = await client.threads.create()
# Stream responses
async for event in client.runs.stream(
thread_id = thread[ "thread_id" ],
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : "Hello!" }]},
stream_mode = "messages-tuple" # or "values"
):
print (event)
Stream Modes
DeerFlow supports multiple streaming modes:
Mode Description Use Case messages-tuplePer-message updates Real-time chat UI, incremental text valuesFull state snapshots Complete state after each step updatesState deltas Track specific field changes eventsAll graph events Debugging, detailed logging
Use messages-tuple for chat UIs and values when you need complete state after each step.
Event Types
DeerFlow streams three primary event types:
1. Values Events
Full state snapshots after each agent step:
{
"event" : "values" ,
"data" : {
"messages" : [ ... ], # All messages
"title" : "Thread Title" ,
"artifacts" : [ "output.pdf" ],
"todos" : [ ... ],
"uploaded_files" : [ ... ],
"sandbox" : { "sandbox_id" : "local" },
"thread_data" : {
"workspace_path" : "..." ,
"uploads_path" : "..." ,
"outputs_path" : "..."
}
}
}
When to use : Need complete state, building history view, resuming interrupted sessions.
2. Messages-Tuple Events
Per-message updates for incremental rendering:
# AI response chunk
{
"event" : "messages-tuple" ,
"data" : {
"role" : "assistant" ,
"content" : "Let me help" , # Incremental text
"tool_calls" : [] # Empty until tools are called
}
}
# Tool call
{
"event" : "messages-tuple" ,
"data" : {
"role" : "assistant" ,
"content" : "" ,
"tool_calls" : [
{
"id" : "call_abc" ,
"type" : "function" ,
"function" : {
"name" : "bash" ,
"arguments" : '{"command": "ls -la"}'
}
}
]
}
}
# Tool result
{
"event" : "messages-tuple" ,
"data" : {
"role" : "tool" ,
"tool_call_id" : "call_abc" ,
"content" : "total 24 \n drwxr-xr-x..."
}
}
When to use : Chat UI, showing tool execution, incremental text rendering.
3. End Event
Signals stream completion:
{
"event" : "end" ,
"data" : {}
}
When to use : Cleanup, final state snapshot, enabling UI controls.
SSE Protocol
DeerFlow follows the standard Server-Sent Events protocol:
event: values
data: {"messages": [...], "title": "..."}
event: messages-tuple
data: {"role": "assistant", "content": "Hello"}
event: messages-tuple
data: {"role": "assistant", "content": " there!"}
event: end
data: {}
Each event has:
event: line specifying the event type
data: line with JSON payload
Empty line separating events
Building a Chat UI
Here’s a complete example of streaming to a chat interface:
import asyncio
from langgraph_sdk import get_client
class ChatUI :
def __init__ ( self ):
self .client = get_client( url = "http://localhost:2024" )
self .current_message = ""
async def send_message ( self , thread_id : str , content : str ):
"""Send a message and stream the response."""
self .current_message = ""
async for event in self .client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : content}]},
stream_mode = "messages-tuple" ,
config = {
"configurable" : {
"thinking_enabled" : True ,
"subagent_enabled" : True
}
}
):
await self .handle_event(event)
async def handle_event ( self , event ):
"""Process streaming events."""
if event[ "event" ] == "messages-tuple" :
data = event[ "data" ]
if data[ "role" ] == "assistant" :
# Accumulate AI response text
if content := data.get( "content" ):
self .current_message += content
self .display_message( self .current_message)
# Show tool calls
if tool_calls := data.get( "tool_calls" ):
for call in tool_calls:
tool_name = call[ "function" ][ "name" ]
self .display_tool_call(tool_name)
elif data[ "role" ] == "tool" :
# Show tool results
tool_id = data[ "tool_call_id" ]
result = data[ "content" ]
self .display_tool_result(tool_id, result)
elif event[ "event" ] == "end" :
self .finalize_message()
def display_message ( self , text : str ):
"""Update UI with incremental text."""
print ( f " \r AI: { text } " , end = "" , flush = True )
def display_tool_call ( self , tool_name : str ):
"""Show tool execution."""
print ( f " \n [Using tool: { tool_name } ]" )
def display_tool_result ( self , tool_id : str , result : str ):
"""Show tool output."""
print ( f "[Tool result: { result[: 100 ] } ...]" )
def finalize_message ( self ):
"""Cleanup after stream ends."""
print ( " \n [Stream complete]" )
# Usage
async def main ():
ui = ChatUI()
thread = await ui.client.threads.create()
await ui.send_message(thread[ "thread_id" ], "Analyze this data" )
asyncio.run(main())
Handling Sub-Agent Events
When subagent_enabled is true, you’ll receive task-related events:
# Task started
{
"event" : "messages-tuple" ,
"data" : {
"role" : "assistant" ,
"tool_calls" : [
{
"id" : "task_123" ,
"function" : {
"name" : "task" ,
"arguments" : '{"description": "Analyze logs", ...}'
}
}
]
}
}
# Task progress (custom events from SubagentExecutor)
{
"event" : "custom" ,
"data" : {
"type" : "task_running" ,
"task_id" : "task_123" ,
"status" : "processing"
}
}
# Task completed
{
"event" : "messages-tuple" ,
"data" : {
"role" : "tool" ,
"tool_call_id" : "task_123" ,
"content" : "Analysis complete: found 3 errors"
}
}
Sub-agent tasks execute in background threads. You can have up to max_concurrent_subagents tasks running in parallel (default: 3).
Thinking Mode
When thinking_enabled is true for supported models, you’ll receive extended thinking content:
{
"event" : "messages-tuple" ,
"data" : {
"role" : "assistant" ,
"content" : "" ,
"additional_kwargs" : {
"thinking" : "Let me analyze this step by step... \n 1. First, I need to..."
}
}
}
Display options :
Show in expandable section
Display in real-time like regular content
Hide from user but log for debugging
Thinking content can be lengthy. Consider truncating or collapsing it in your UI.
Interrupt Events
When the agent calls ask_clarification, the stream will pause:
# Clarification request
{
"event" : "messages-tuple" ,
"data" : {
"role" : "assistant" ,
"tool_calls" : [
{
"id" : "clarify_xyz" ,
"function" : {
"name" : "ask_clarification" ,
"arguments" : '{"question": "Which file did you mean?"}'
}
}
]
}
}
# Stream ends with interrupt
{
"event" : "end" ,
"data" : { "interrupted" : true}
}
Handling interrupts :
async def handle_interrupt ( client , thread_id , question ):
"""Prompt user and resume."""
# Show question to user
print ( f "Agent asks: { question } " )
user_response = input ( "Your answer: " )
# Resume with answer
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : user_response}]}
):
# Process continuation
pass
Error Handling
Handle errors gracefully during streaming:
try :
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : message}]}
):
await handle_event(event)
except asyncio.TimeoutError:
print ( "Stream timed out" )
# Optionally retry or cancel
except Exception as e:
print ( f "Stream error: { e } " )
# Check thread state for recovery
state = await client.threads.get_state(thread_id)
if state[ "next" ]:
# Agent is still running
pass
Connection Management
Timeouts
Set timeouts to prevent hanging:
import asyncio
async def stream_with_timeout ( client , thread_id , message , timeout = 300 ):
"""Stream with 5-minute timeout."""
try :
async with asyncio.timeout(timeout):
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : message}]}
):
yield event
except asyncio.TimeoutError:
print ( "Stream timed out after 5 minutes" )
Reconnection
If the connection drops, you can resume from state:
async def resilient_stream ( client , thread_id , message ):
"""Stream with automatic reconnection."""
max_retries = 3
for attempt in range (max_retries):
try :
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : message}]}
):
yield event
break # Success
except Exception as e:
if attempt < max_retries - 1 :
print ( f "Retry { attempt + 1 } / { max_retries } ..." )
await asyncio.sleep( 2 ** attempt) # Exponential backoff
else :
raise
Buffer events for batching
Reduce UI updates by batching events: async def buffered_stream ( client , thread_id , message , buffer_ms = 50 ):
buffer = []
last_flush = asyncio.get_event_loop().time()
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : message}]}
):
buffer.append(event)
now = asyncio.get_event_loop().time()
if now - last_flush >= buffer_ms / 1000 :
yield buffer
buffer = []
last_flush = now
if buffer:
yield buffer
Use messages-tuple for chat UIs
messages-tuple mode is optimized for incremental rendering:# Efficient for chat
stream_mode = "messages-tuple"
# Avoid for chat (too much data)
stream_mode = "values" # Sends entire state every time
Prevent excessive re-renders: class ThrottledUI :
def __init__ ( self , throttle_ms = 100 ):
self .throttle_ms = throttle_ms
self .last_update = 0
self .pending_text = ""
async def update_text ( self , text : str ):
self .pending_text += text
now = asyncio.get_event_loop().time() * 1000
if now - self .last_update >= self .throttle_ms:
self .render( self .pending_text)
self .last_update = now
Complete Example
import asyncio
import json
from langgraph_sdk import get_client
async def interactive_chat ():
"""Full-featured streaming chat with all features."""
client = get_client( url = "http://localhost:2024" )
thread = await client.threads.create()
thread_id = thread[ "thread_id" ]
print ( f "Chat started (thread: { thread_id } )" )
print ( "Type 'exit' to quit \n " )
while True :
# Get user input
user_input = input ( "You: " )
if user_input.lower() == "exit" :
break
print ( "AI: " , end = "" , flush = True )
current_text = ""
try :
# Stream response
async for event in client.runs.stream(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : user_input}]},
stream_mode = "messages-tuple" ,
config = {
"configurable" : {
"thinking_enabled" : True ,
"subagent_enabled" : True
}
}
):
if event[ "event" ] == "messages-tuple" :
data = event[ "data" ]
if data[ "role" ] == "assistant" :
# Display AI text
if content := data.get( "content" ):
print (content, end = "" , flush = True )
current_text += content
# Show tool calls
if tool_calls := data.get( "tool_calls" ):
for call in tool_calls:
tool = call[ "function" ][ "name" ]
args = json.loads(call[ "function" ][ "arguments" ])
print ( f " \n [ { tool } ( { args } )]" )
# Handle clarification
if tool_calls and tool_calls[ 0 ][ "function" ][ "name" ] == "ask_clarification" :
question = json.loads(tool_calls[ 0 ][ "function" ][ "arguments" ])[ "question" ]
print ( f " \n\n Agent asks: { question } " )
break # Exit inner loop to get user input
elif data[ "role" ] == "tool" :
# Show tool results (truncated)
result = data[ "content" ][: 200 ]
print ( f " [result: { result } ...]" )
elif event[ "event" ] == "end" :
print () # Newline after complete response
except Exception as e:
print ( f " \n Error: { e } " )
# Try to recover
state = await client.threads.get_state(thread_id)
if state[ "next" ] == [ "__interrupt__" ]:
print ( "Agent is waiting for input" )
continue
# Cleanup
print ( f " \n Ending chat (thread: { thread_id } )" )
# Run
asyncio.run(interactive_chat())
Next Steps
Thread Management Learn about thread lifecycle and state management
Python Client Use the embedded client for in-process streaming
Sub-Agents Understand parallel task execution with sub-agents
Middleware Chain Deep dive into middleware processing pipeline