Skip to main content

Overview

The stream() method streams a conversation turn, yielding events incrementally. Each call sends one user message and yields events until the agent finishes its turn.
for event in client.stream("hello"):
    print(event.type, event.data)

Method Signature

def stream(
    self,
    message: str,
    *,
    thread_id: str | None = None,
    **kwargs,
) -> Generator[StreamEvent, None, None]

Parameters

message
str
required
User message text to send to the agent.
thread_id
str | None
default:"None"
Thread ID for conversation context. Auto-generated if None.
model_name
str
default:"client default"
Override the model for this specific call.
thinking_enabled
bool
default:"client default"
Override thinking mode for this call.
plan_mode
bool
default:"client default"
Override plan mode for this call.
subagent_enabled
bool
default:"client default"
Override subagent delegation for this call.
recursion_limit
int
default:"100"
Maximum number of agent steps per turn.

Return Value

Yields StreamEvent objects with the following structure:
StreamEvent
object
A single event from the streaming agent response.

Event Types

Event types align with the LangGraph SSE protocol:

messages-tuple Events

Per-message updates emitted as the agent processes:

AI Text Message

{
    "type": "ai",
    "content": "Here's my response...",
    "id": "msg-abc123"
}

AI Tool Call

{
    "type": "ai",
    "content": "",
    "id": "msg-abc123",
    "tool_calls": [
        {
            "name": "bash",
            "args": {"cmd": "ls"},
            "id": "tc-xyz789"
        }
    ]
}

Tool Result

{
    "type": "tool",
    "content": "file1.txt\nfile2.txt",
    "name": "bash",
    "tool_call_id": "tc-xyz789",
    "id": "msg-def456"
}

values Events

Full state snapshot after each agent step:
{
    "title": "File Listing",
    "messages": [
        {"type": "human", "content": "list files", "id": "msg-1"},
        {"type": "ai", "content": "", "id": "msg-2", "tool_calls": [...]},
        {"type": "tool", "content": "...", "id": "msg-3"},
        {"type": "ai", "content": "Here are the files", "id": "msg-4"}
    ],
    "artifacts": [
        {"path": "/mnt/user-data/outputs/result.txt", "size": 1024}
    ]
}

end Event

Signals stream completion:
{
    # empty data
}

Examples

Basic Streaming

from src.client import DeerFlowClient

client = DeerFlowClient()

for event in client.stream("What is 2 + 2?"):
    if event.type == "messages-tuple" and event.data.get("type") == "ai":
        if content := event.data.get("content"):
            print(f"AI: {content}")
    elif event.type == "end":
        print("Stream complete")

Tool Call Detection

for event in client.stream("List files in /mnt/user-data/workspace"):
    if event.type == "messages-tuple":
        data = event.data
        
        # Tool call
        if data.get("type") == "ai" and "tool_calls" in data:
            for tc in data["tool_calls"]:
                print(f"Tool: {tc['name']} with args {tc['args']}")
        
        # Tool result
        elif data.get("type") == "tool":
            print(f"Result: {data['content']}")
        
        # AI text
        elif data.get("type") == "ai" and data.get("content"):
            print(f"AI: {data['content']}")

State Snapshots

for event in client.stream("Analyze this data", thread_id="analysis-1"):
    if event.type == "values":
        title = event.data.get("title")
        msg_count = len(event.data.get("messages", []))
        artifacts = event.data.get("artifacts", [])
        
        print(f"State: {title} | {msg_count} messages | {len(artifacts)} artifacts")

Multi-Tool Chain

tool_calls = []
tool_results = []

for event in client.stream("Read file.txt and count the lines"):
    if event.type == "messages-tuple":
        data = event.data
        
        if data.get("type") == "ai" and "tool_calls" in data:
            tool_calls.extend(data["tool_calls"])
        
        elif data.get("type") == "tool":
            tool_results.append({
                "name": data.get("name"),
                "content": data.get("content")
            })

print(f"Used tools: {[tc['name'] for tc in tool_calls]}")
print(f"Got {len(tool_results)} results")

StreamEvent Class

The StreamEvent dataclass is defined as:
from dataclasses import dataclass, field
from typing import Any

@dataclass
class StreamEvent:
    """A single event from the streaming agent response.
    
    Event types align with the LangGraph SSE protocol:
        - "values": Full state snapshot (title, messages, artifacts).
        - "messages-tuple": Per-message update (AI text, tool calls, tool results).
        - "end": Stream finished.
    
    Attributes:
        type: Event type.
        data: Event payload. Contents vary by type.
    """
    type: str
    data: dict[str, Any] = field(default_factory=dict)

LangGraph SSE Protocol Alignment

Event types and data structures match the LangGraph SSE protocol so consumers can switch between HTTP streaming and embedded mode without changing their event-handling logic.

Protocol Mapping

Event TypeLangGraph SSEPython Client
State snapshotvaluesvalues
Message updatemessages/partialmessages-tuple
Stream endendend

Migration Example

HTTP Gateway client:
import requests

response = requests.post(
    "http://localhost:8000/threads/123/runs/stream",
    json={"message": "hello"},
    stream=True
)

for line in response.iter_lines():
    if line.startswith(b"event: "):
        event_type = line[7:].decode()
    elif line.startswith(b"data: "):
        data = json.loads(line[6:])
        process_event(event_type, data)
Python embedded client:
from src.client import DeerFlowClient

client = DeerFlowClient()

for event in client.stream("hello", thread_id="123"):
    process_event(event.type, event.data)

Testing

From the live test suite:
def test_stream_yields_messages_tuple_and_end(self, client):
    """stream() produces at least one messages-tuple event and ends with end."""
    events = list(client.stream("Say hi in one word."))

    types = [e.type for e in events]
    assert "messages-tuple" in types
    assert "values" in types
    assert types[-1] == "end"

    for e in events:
        assert isinstance(e, StreamEvent)

See Also

  • Chat - For simple request/response
  • Configuration - For file operations and model selection
  • Overview - For initialization options

Build docs developers (and LLMs) love