Skip to main content

Overview

DeerFlow supports streaming agent responses using Server-Sent Events (SSE). This allows you to:
  • Display agent responses in real-time as they’re generated
  • Show tool calls and results as they happen
  • Track task progress with sub-agents
  • Handle interrupts and clarification requests
  • Display thinking process for supported models

Streaming with LangGraph SDK

Use the stream method to receive events as they occur:
from langgraph_sdk import get_client

client = get_client(url="http://localhost:2024")

# Create thread
thread = await client.threads.create()

# Stream responses
async for event in client.runs.stream(
    thread_id=thread["thread_id"],
    assistant_id="lead_agent",
    input={"messages": [{"role": "user", "content": "Hello!"}]},
    stream_mode="messages-tuple"  # or "values"
):
    print(event)

Stream Modes

DeerFlow supports multiple streaming modes:
ModeDescriptionUse Case
messages-tuplePer-message updatesReal-time chat UI, incremental text
valuesFull state snapshotsComplete state after each step
updatesState deltasTrack specific field changes
eventsAll graph eventsDebugging, detailed logging
Use messages-tuple for chat UIs and values when you need complete state after each step.

Event Types

DeerFlow streams three primary event types:

1. Values Events

Full state snapshots after each agent step:
{
    "event": "values",
    "data": {
        "messages": [...],  # All messages
        "title": "Thread Title",
        "artifacts": ["output.pdf"],
        "todos": [...],
        "uploaded_files": [...],
        "sandbox": {"sandbox_id": "local"},
        "thread_data": {
            "workspace_path": "...",
            "uploads_path": "...",
            "outputs_path": "..."
        }
    }
}
When to use: Need complete state, building history view, resuming interrupted sessions.

2. Messages-Tuple Events

Per-message updates for incremental rendering:
# AI response chunk
{
    "event": "messages-tuple",
    "data": {
        "role": "assistant",
        "content": "Let me help",  # Incremental text
        "tool_calls": []  # Empty until tools are called
    }
}

# Tool call
{
    "event": "messages-tuple",
    "data": {
        "role": "assistant",
        "content": "",
        "tool_calls": [
            {
                "id": "call_abc",
                "type": "function",
                "function": {
                    "name": "bash",
                    "arguments": '{"command": "ls -la"}'
                }
            }
        ]
    }
}

# Tool result
{
    "event": "messages-tuple",
    "data": {
        "role": "tool",
        "tool_call_id": "call_abc",
        "content": "total 24\ndrwxr-xr-x..."
    }
}
When to use: Chat UI, showing tool execution, incremental text rendering.

3. End Event

Signals stream completion:
{
    "event": "end",
    "data": {}
}
When to use: Cleanup, final state snapshot, enabling UI controls.

SSE Protocol

DeerFlow follows the standard Server-Sent Events protocol:
event: values
data: {"messages": [...], "title": "..."}

event: messages-tuple
data: {"role": "assistant", "content": "Hello"}

event: messages-tuple
data: {"role": "assistant", "content": " there!"}

event: end
data: {}
Each event has:
  • event: line specifying the event type
  • data: line with JSON payload
  • Empty line separating events

Building a Chat UI

Here’s a complete example of streaming to a chat interface:
import asyncio
from langgraph_sdk import get_client

class ChatUI:
    def __init__(self):
        self.client = get_client(url="http://localhost:2024")
        self.current_message = ""
    
    async def send_message(self, thread_id: str, content: str):
        """Send a message and stream the response."""
        self.current_message = ""
        
        async for event in self.client.runs.stream(
            thread_id=thread_id,
            assistant_id="lead_agent",
            input={"messages": [{"role": "user", "content": content}]},
            stream_mode="messages-tuple",
            config={
                "configurable": {
                    "thinking_enabled": True,
                    "subagent_enabled": True
                }
            }
        ):
            await self.handle_event(event)
    
    async def handle_event(self, event):
        """Process streaming events."""
        if event["event"] == "messages-tuple":
            data = event["data"]
            
            if data["role"] == "assistant":
                # Accumulate AI response text
                if content := data.get("content"):
                    self.current_message += content
                    self.display_message(self.current_message)
                
                # Show tool calls
                if tool_calls := data.get("tool_calls"):
                    for call in tool_calls:
                        tool_name = call["function"]["name"]
                        self.display_tool_call(tool_name)
            
            elif data["role"] == "tool":
                # Show tool results
                tool_id = data["tool_call_id"]
                result = data["content"]
                self.display_tool_result(tool_id, result)
        
        elif event["event"] == "end":
            self.finalize_message()
    
    def display_message(self, text: str):
        """Update UI with incremental text."""
        print(f"\rAI: {text}", end="", flush=True)
    
    def display_tool_call(self, tool_name: str):
        """Show tool execution."""
        print(f"\n[Using tool: {tool_name}]")
    
    def display_tool_result(self, tool_id: str, result: str):
        """Show tool output."""
        print(f"[Tool result: {result[:100]}...]")
    
    def finalize_message(self):
        """Cleanup after stream ends."""
        print("\n[Stream complete]")

# Usage
async def main():
    ui = ChatUI()
    thread = await ui.client.threads.create()
    await ui.send_message(thread["thread_id"], "Analyze this data")

asyncio.run(main())

Handling Sub-Agent Events

When subagent_enabled is true, you’ll receive task-related events:
# Task started
{
    "event": "messages-tuple",
    "data": {
        "role": "assistant",
        "tool_calls": [
            {
                "id": "task_123",
                "function": {
                    "name": "task",
                    "arguments": '{"description": "Analyze logs", ...}'
                }
            }
        ]
    }
}

# Task progress (custom events from SubagentExecutor)
{
    "event": "custom",
    "data": {
        "type": "task_running",
        "task_id": "task_123",
        "status": "processing"
    }
}

# Task completed
{
    "event": "messages-tuple",
    "data": {
        "role": "tool",
        "tool_call_id": "task_123",
        "content": "Analysis complete: found 3 errors"
    }
}
Sub-agent tasks execute in background threads. You can have up to max_concurrent_subagents tasks running in parallel (default: 3).

Thinking Mode

When thinking_enabled is true for supported models, you’ll receive extended thinking content:
{
    "event": "messages-tuple",
    "data": {
        "role": "assistant",
        "content": "",
        "additional_kwargs": {
            "thinking": "Let me analyze this step by step...\n1. First, I need to..."
        }
    }
}
Display options:
  • Show in expandable section
  • Display in real-time like regular content
  • Hide from user but log for debugging
Thinking content can be lengthy. Consider truncating or collapsing it in your UI.

Interrupt Events

When the agent calls ask_clarification, the stream will pause:
# Clarification request
{
    "event": "messages-tuple",
    "data": {
        "role": "assistant",
        "tool_calls": [
            {
                "id": "clarify_xyz",
                "function": {
                    "name": "ask_clarification",
                    "arguments": '{"question": "Which file did you mean?"}'
                }
            }
        ]
    }
}

# Stream ends with interrupt
{
    "event": "end",
    "data": {"interrupted": true}
}
Handling interrupts:
async def handle_interrupt(client, thread_id, question):
    """Prompt user and resume."""
    # Show question to user
    print(f"Agent asks: {question}")
    user_response = input("Your answer: ")
    
    # Resume with answer
    async for event in client.runs.stream(
        thread_id=thread_id,
        assistant_id="lead_agent",
        input={"messages": [{"role": "user", "content": user_response}]}
    ):
        # Process continuation
        pass

Error Handling

Handle errors gracefully during streaming:
try:
    async for event in client.runs.stream(
        thread_id=thread_id,
        assistant_id="lead_agent",
        input={"messages": [{"role": "user", "content": message}]}
    ):
        await handle_event(event)
except asyncio.TimeoutError:
    print("Stream timed out")
    # Optionally retry or cancel
except Exception as e:
    print(f"Stream error: {e}")
    # Check thread state for recovery
    state = await client.threads.get_state(thread_id)
    if state["next"]:
        # Agent is still running
        pass

Connection Management

Timeouts

Set timeouts to prevent hanging:
import asyncio

async def stream_with_timeout(client, thread_id, message, timeout=300):
    """Stream with 5-minute timeout."""
    try:
        async with asyncio.timeout(timeout):
            async for event in client.runs.stream(
                thread_id=thread_id,
                assistant_id="lead_agent",
                input={"messages": [{"role": "user", "content": message}]}
            ):
                yield event
    except asyncio.TimeoutError:
        print("Stream timed out after 5 minutes")

Reconnection

If the connection drops, you can resume from state:
async def resilient_stream(client, thread_id, message):
    """Stream with automatic reconnection."""
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            async for event in client.runs.stream(
                thread_id=thread_id,
                assistant_id="lead_agent",
                input={"messages": [{"role": "user", "content": message}]}
            ):
                yield event
            break  # Success
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Retry {attempt + 1}/{max_retries}...")
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise

Performance Optimization

Reduce UI updates by batching events:
async def buffered_stream(client, thread_id, message, buffer_ms=50):
    buffer = []
    last_flush = asyncio.get_event_loop().time()
    
    async for event in client.runs.stream(
        thread_id=thread_id,
        assistant_id="lead_agent",
        input={"messages": [{"role": "user", "content": message}]}
    ):
        buffer.append(event)
        now = asyncio.get_event_loop().time()
        
        if now - last_flush >= buffer_ms / 1000:
            yield buffer
            buffer = []
            last_flush = now
    
    if buffer:
        yield buffer
messages-tuple mode is optimized for incremental rendering:
# Efficient for chat
stream_mode="messages-tuple"

# Avoid for chat (too much data)
stream_mode="values"  # Sends entire state every time
Prevent excessive re-renders:
class ThrottledUI:
    def __init__(self, throttle_ms=100):
        self.throttle_ms = throttle_ms
        self.last_update = 0
        self.pending_text = ""
    
    async def update_text(self, text: str):
        self.pending_text += text
        now = asyncio.get_event_loop().time() * 1000
        
        if now - self.last_update >= self.throttle_ms:
            self.render(self.pending_text)
            self.last_update = now

Complete Example

import asyncio
import json
from langgraph_sdk import get_client

async def interactive_chat():
    """Full-featured streaming chat with all features."""
    client = get_client(url="http://localhost:2024")
    thread = await client.threads.create()
    thread_id = thread["thread_id"]
    
    print(f"Chat started (thread: {thread_id})")
    print("Type 'exit' to quit\n")
    
    while True:
        # Get user input
        user_input = input("You: ")
        if user_input.lower() == "exit":
            break
        
        print("AI: ", end="", flush=True)
        current_text = ""
        
        try:
            # Stream response
            async for event in client.runs.stream(
                thread_id=thread_id,
                assistant_id="lead_agent",
                input={"messages": [{"role": "user", "content": user_input}]},
                stream_mode="messages-tuple",
                config={
                    "configurable": {
                        "thinking_enabled": True,
                        "subagent_enabled": True
                    }
                }
            ):
                if event["event"] == "messages-tuple":
                    data = event["data"]
                    
                    if data["role"] == "assistant":
                        # Display AI text
                        if content := data.get("content"):
                            print(content, end="", flush=True)
                            current_text += content
                        
                        # Show tool calls
                        if tool_calls := data.get("tool_calls"):
                            for call in tool_calls:
                                tool = call["function"]["name"]
                                args = json.loads(call["function"]["arguments"])
                                print(f"\n[{tool}({args})]")
                        
                        # Handle clarification
                        if tool_calls and tool_calls[0]["function"]["name"] == "ask_clarification":
                            question = json.loads(tool_calls[0]["function"]["arguments"])["question"]
                            print(f"\n\nAgent asks: {question}")
                            break  # Exit inner loop to get user input
                    
                    elif data["role"] == "tool":
                        # Show tool results (truncated)
                        result = data["content"][:200]
                        print(f" [result: {result}...]")
                
                elif event["event"] == "end":
                    print()  # Newline after complete response
        
        except Exception as e:
            print(f"\nError: {e}")
            # Try to recover
            state = await client.threads.get_state(thread_id)
            if state["next"] == ["__interrupt__"]:
                print("Agent is waiting for input")
            continue
    
    # Cleanup
    print(f"\nEnding chat (thread: {thread_id})")

# Run
asyncio.run(interactive_chat())

Next Steps

Thread Management

Learn about thread lifecycle and state management

Python Client

Use the embedded client for in-process streaming

Sub-Agents

Understand parallel task execution with sub-agents

Middleware Chain

Deep dive into middleware processing pipeline

Build docs developers (and LLMs) love