Skip to main content
DeerFlow’s agent system uses a custom ThreadState schema that extends LangChain’s AgentState with domain-specific fields for sandbox management, file tracking, and user interactions. The state is managed by LangGraph’s checkpointing system and enhanced with custom reducers for intelligent state merging.

ThreadState Schema

Defined in backend/src/agents/thread_state.py:
from typing import Annotated, NotRequired, TypedDict
from langchain.agents import AgentState

class SandboxState(TypedDict):
    sandbox_id: NotRequired[str | None]

class ThreadDataState(TypedDict):
    workspace_path: NotRequired[str | None]
    uploads_path: NotRequired[str | None]
    outputs_path: NotRequired[str | None]

class ViewedImageData(TypedDict):
    base64: str
    mime_type: str

class ThreadState(AgentState):
    sandbox: NotRequired[SandboxState | None]
    thread_data: NotRequired[ThreadDataState | None]
    title: NotRequired[str | None]
    artifacts: Annotated[list[str], merge_artifacts]
    todos: NotRequired[list | None]
    uploaded_files: NotRequired[list[dict] | None]
    viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images]

State Fields

sandbox

Type: SandboxState | None Purpose: Tracks the active sandbox environment for isolated tool execution. Structure:
{
    "sandbox_id": "local"  # or Docker container ID
}
Lifecycle:
  • Set by SandboxMiddleware in before_agent hook
  • Persisted across turns within same thread (sandbox not released)
  • Used by sandbox tools (bash, read_file, write_file, str_replace, ls)
Example:
state["sandbox"] = {"sandbox_id": "local"}

# Tools check sandbox_id to determine execution mode
from src.sandbox import is_local_sandbox
if is_local_sandbox(state["sandbox"]["sandbox_id"]):
    # Local filesystem execution with path translation
    real_path = replace_virtual_path(virtual_path, thread_id)
else:
    # Docker container execution
    sandbox = provider.get(state["sandbox"]["sandbox_id"])
    result = sandbox.execute_command(command)

thread_data

Type: ThreadDataState | None Purpose: Provides path mappings between virtual (agent-visible) and physical (host) paths. Structure:
{
    "workspace_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/workspace",
    "uploads_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/uploads",
    "outputs_path": "/path/to/backend/.deer-flow/threads/{thread_id}/user-data/outputs"
}
Virtual Path Mappings:
  • Agent sees: /mnt/user-data/workspace, /mnt/user-data/uploads, /mnt/user-data/outputs
  • Physical: backend/.deer-flow/threads/{thread_id}/user-data/{workspace,uploads,outputs}
Lifecycle:
  • Set by ThreadDataMiddleware in before_agent hook
  • With lazy_init=True (default): Paths computed but directories created on-demand
  • With lazy_init=False: Directories eagerly created in middleware

title

Type: str | None Purpose: Human-readable thread title for UI display. Lifecycle:
  • Set by TitleMiddleware after first complete user-assistant exchange
  • Generated via lightweight LLM based on first user message and assistant response
  • Persisted by LangGraph checkpointer
Generation (backend/src/agents/middlewares/title_middleware.py:46-81):
def _generate_title(self, state):
    config = get_title_config()  # max_words, max_chars, prompt_template
    model = create_chat_model(thinking_enabled=False)
    
    user_msg = next(m.content for m in messages if m.type == "human")
    assistant_msg = next(m.content for m in messages if m.type == "ai")
    
    prompt = config.prompt_template.format(
        max_words=config.max_words,
        user_msg=user_msg[:500],
        assistant_msg=assistant_msg[:500]
    )
    
    try:
        response = model.invoke(prompt)
        title = response.content.strip()[: config.max_chars]
        return title
    except Exception:
        # Fallback: first 50 chars of user message
        return user_msg[:50].rstrip() + "..."
Configuration (config.yaml):
title:
  enabled: true
  max_words: 8
  max_chars: 80
  prompt_template: "Generate a concise title (max {max_words} words) for this conversation: {user_msg}\n{assistant_msg}"

artifacts

Type: Annotated[list[str], merge_artifacts] Purpose: Tracks files presented to user via present_files tool. Custom Reducer (backend/src/agents/thread_state.py:21-28):
def merge_artifacts(existing: list[str] | None, new: list[str] | None) -> list[str]:
    """Reducer for artifacts list - merges and deduplicates artifacts."""
    if existing is None:
        return new or []
    if new is None:
        return existing
    # Use dict.fromkeys to deduplicate while preserving order
    return list(dict.fromkeys(existing + new))
Behavior:
  • Maintains insertion order (first occurrence preserved)
  • Automatically deduplicates paths
  • Survives across turns (cumulative)
Usage in Tools:
# present_files tool (backend/src/tools/builtins/present_files.py)
def present_files(file_paths: list[str], runtime: ToolRuntimeContext):
    state = runtime.state
    thread_data = state.get("thread_data", {})
    outputs_dir = thread_data.get("outputs_path")
    
    # Validate files are in outputs directory
    for path in file_paths:
        if not path.startswith("/mnt/user-data/outputs"):
            raise ValueError(f"Files must be in /mnt/user-data/outputs: {path}")
    
    # Update state (triggers merge_artifacts reducer)
    return {"artifacts": file_paths}

todos

Type: list | None Purpose: Stores task list when TodoListMiddleware is enabled (is_plan_mode=True). Structure:
[
    {
        "id": "todo-1",
        "title": "Read configuration files",
        "status": "completed"
    },
    {
        "id": "todo-2",
        "title": "Implement authentication logic",
        "status": "in_progress"
    },
    {
        "id": "todo-3",
        "title": "Write unit tests",
        "status": "pending"
    }
]
Task States:
  • pending - Not yet started
  • in_progress - Currently working (one at a time, or multiple if parallel)
  • completed - Finished successfully
Managed By: TodoListMiddleware (LangChain built-in) with custom prompts Tool: write_todos (injected by middleware)

uploaded_files

Type: list[dict] | None Purpose: Tracks newly uploaded files for current turn. Structure:
[
    {
        "filename": "report.pdf",
        "size": 1048576,  # bytes
        "path": "/mnt/user-data/uploads/report.pdf",
        "extension": ".pdf"
    }
]
Lifecycle:
  • Set by UploadsMiddleware in before_agent hook
  • Only includes files NOT already shown in previous messages (deduplication)
  • Cleared on next turn (not cumulative)
Deduplication Logic (backend/src/agents/middlewares/uploads_middleware.py:110-136):
def _extract_files_from_message(self, content: str) -> set[str]:
    """Extract filenames from <uploaded_files> tag in message history."""
    match = re.search(r"<uploaded_files>([\s\S]*?)</uploaded_files>", content)
    if not match:
        return set()
    
    files_content = match.group(1)
    filenames = set()
    for line in files_content.split("\n"):
        # Match: - filename.ext (size)
        file_match = re.match(r"^-\s+(.+?)\s*\(", line.strip())
        if file_match:
            filenames.add(file_match.group(1).strip())
    
    return filenames

def before_agent(self, state, runtime):
    # Scan all previous messages (except last) for shown files
    shown_files = set()
    for msg in state["messages"][:-1]:
        if isinstance(msg, HumanMessage):
            shown_files.update(self._extract_files_from_message(msg.content))
    
    # List only newly uploaded files
    new_files = self._list_newly_uploaded_files(thread_id, shown_files)
    
    return {"uploaded_files": new_files, "messages": updated_messages}

viewed_images

Type: Annotated[dict[str, ViewedImageData], merge_viewed_images] Purpose: Tracks images loaded via view_image tool for vision model analysis. Structure:
{
    "/mnt/user-data/uploads/diagram.png": {
        "base64": "iVBORw0KGgoAAAANSUhEUgA...",
        "mime_type": "image/png"
    },
    "/mnt/user-data/workspace/screenshot.jpg": {
        "base64": "/9j/4AAQSkZJRgABAQEAY...",
        "mime_type": "image/jpeg"
    }
}
Custom Reducer (backend/src/agents/thread_state.py:31-45):
def merge_viewed_images(
    existing: dict[str, ViewedImageData] | None,
    new: dict[str, ViewedImageData] | None
) -> dict[str, ViewedImageData]:
    """Reducer for viewed_images dict - merges image dictionaries.
    
    Special case: If new is an empty dict {}, it clears the existing images.
    This allows middlewares to clear the viewed_images state after processing.
    """
    if existing is None:
        return new or {}
    if new is None:
        return existing
    # Special case: empty dict means clear all viewed images
    if len(new) == 0:
        return {}
    # Merge dictionaries, new values override existing ones for same keys
    return {**existing, **new}
Behavior:
  • Normal updates: Merge dictionaries (new keys added, existing keys updated)
  • Empty dict {}: Clears all viewed images (reset)
  • Used by ViewImageMiddleware to inject images before LLM call
Lifecycle:
  1. Agent calls view_image tool → Tool returns base64 data in ToolMessage
  2. Tool also updates state: {"viewed_images": {path: {base64, mime_type}}}
  3. ViewImageMiddleware detects completed view_image tool calls in before_model
  4. Middleware injects HumanMessage with multimodal content (text + images)
  5. LLM analyzes images automatically
  6. Middleware clears state: {"viewed_images": {}} after processing

State Management Patterns

1. Middleware State Updates

Middlewares return state updates as dictionaries:
class MyMiddleware(AgentMiddleware[MyState]):
    def before_agent(self, state, runtime):
        return {
            "field1": "value1",
            "field2": ["item1", "item2"],
            "messages": [HumanMessage(content="injected message")]
        }
Merge Behavior:
  • Fields without custom reducers: Replace existing value
  • Fields with custom reducers: Call reducer function
  • messages: Uses LangChain’s add_messages reducer (append to list)

2. Tool State Updates

Tools can update state by returning dictionaries:
def my_tool(arg: str, runtime: ToolRuntimeContext):
    # Access current state
    state = runtime.state
    current_artifacts = state.get("artifacts", [])
    
    # Return state update
    return {
        "artifacts": current_artifacts + ["/new/file.txt"]
    }

3. State Persistence

State persisted via LangGraph checkpointer:
from langgraph.checkpoint.postgres import PostgresCheckpointer

# Configure in langgraph.json or programmatically
checkpointer = PostgresCheckpointer(
    connection_string=os.getenv("POSTGRES_URI")
)

agent = create_agent(
    model=model,
    tools=tools,
    middleware=middlewares,
    state_schema=ThreadState,
    checkpointer=checkpointer  # Enables state persistence
)
Persisted Fields:
  • All ThreadState fields
  • Full message history
  • Checkpoints created after each agent step
Retrieval:
# Get latest state for thread
state = agent.get_state(thread_id)

# Get state at specific checkpoint
state = agent.get_state(checkpoint_id)

# List all checkpoints
checkpoints = agent.list_checkpoints(thread_id)

4. Thread Isolation

Each thread maintains independent state:
# Thread 1
agent.invoke(
    {"messages": [HumanMessage(content="Hello")]},
    config={"configurable": {"thread_id": "thread-1"}}
)

# Thread 2 (completely isolated)
agent.invoke(
    {"messages": [HumanMessage(content="Hello")]},
    config={"configurable": {"thread_id": "thread-2"}}
)
Physical Isolation:
  • Separate directories: backend/.deer-flow/threads/{thread_id}/
  • Separate sandboxes (if using Docker provider)
  • Separate checkpoint history

Custom Reducer Implementation

When to Use Custom Reducers

  1. Deduplication - Remove duplicates while merging (like merge_artifacts)
  2. Merging Dicts - Intelligently merge nested structures (like merge_viewed_images)
  3. Reset Semantics - Support clearing values (empty dict resets viewed_images)
  4. Aggregation - Accumulate values with custom logic

Creating Custom Reducers

from typing import Annotated

def merge_my_field(existing: list[str] | None, new: list[str] | None) -> list[str]:
    """Custom reducer for my_field."""
    if existing is None:
        return new or []
    if new is None:
        return existing
    
    # Custom merge logic here
    combined = existing + new
    return sorted(set(combined))  # Deduplicate and sort

class MyState(AgentState):
    my_field: Annotated[list[str], merge_my_field]
Reducer Contract:
  • Takes two arguments: existing (current state) and new (update)
  • Both arguments can be None
  • Returns merged value of same type
  • Pure function (no side effects)

Testing Reducers

import pytest
from src.agents.thread_state import merge_artifacts, merge_viewed_images

def test_merge_artifacts_deduplication():
    existing = ["file1.txt", "file2.txt"]
    new = ["file2.txt", "file3.txt"]
    result = merge_artifacts(existing, new)
    assert result == ["file1.txt", "file2.txt", "file3.txt"]

def test_merge_viewed_images_clear():
    existing = {"img1.png": {"base64": "data", "mime_type": "image/png"}}
    new = {}  # Empty dict should clear
    result = merge_viewed_images(existing, new)
    assert result == {}

def test_merge_viewed_images_update():
    existing = {"img1.png": {"base64": "old", "mime_type": "image/png"}}
    new = {"img1.png": {"base64": "new", "mime_type": "image/png"}}
    result = merge_viewed_images(existing, new)
    assert result["img1.png"]["base64"] == "new"  # New value wins

State Debugging

Inspecting Current State

# In middleware
class DebugMiddleware(AgentMiddleware[ThreadState]):
    def before_agent(self, state, runtime):
        import json
        print("Current state:", json.dumps({
            "sandbox": state.get("sandbox"),
            "thread_data": state.get("thread_data"),
            "artifacts": state.get("artifacts"),
            "viewed_images": list(state.get("viewed_images", {}).keys())
        }, indent=2))
        return None

State Size Monitoring

import sys

class StateSizeMonitor(AgentMiddleware[ThreadState]):
    def after_agent(self, state, runtime):
        total_size = sum([
            sys.getsizeof(str(state.get("messages", []))),
            sys.getsizeof(str(state.get("viewed_images", {}))),
            sys.getsizeof(str(state.get("artifacts", [])))
        ])
        print(f"Total state size: {total_size / 1024:.2f} KB")
        
        if total_size > 1_000_000:  # 1 MB
            print("WARNING: State size exceeds 1 MB, consider summarization")
        
        return None

Performance Considerations

Memory Usage

  • messages: Grows unbounded without summarization (use SummarizationMiddleware)
  • viewed_images: Store base64 data (can be large, clear after processing)
  • artifacts: Small (just file paths)

Database Size

  • Each checkpoint persists full state to database
  • With PostgresCheckpointer: One row per checkpoint
  • Recommend periodic cleanup of old threads

State Transfer

  • State serialized/deserialized on every agent step
  • Keep state schema simple (avoid deeply nested structures)
  • Use NotRequired for optional fields (reduces serialization overhead)

See Also

Build docs developers (and LLMs) love