Skip to main content

Overview

Threads represent individual conversation sessions in DeerFlow. Each thread maintains its own:
  • Conversation history - All messages exchanged
  • State data - Title, artifacts, todos, uploads, viewed images
  • Isolated workspace - Dedicated directories for files and outputs
  • Sandbox connection - Persistent or ephemeral execution environment

Creating Threads

Create a new thread using the LangGraph SDK:
from langgraph_sdk import get_client

client = get_client(url="http://localhost:2024")

# Create a thread
thread = await client.threads.create()
print(f"Thread ID: {thread['thread_id']}")

# Create with metadata
thread = await client.threads.create(
    metadata={
        "user_id": "user-123",
        "project": "my-project",
        "tags": ["research", "analysis"]
    }
)

Thread IDs

Thread IDs are UUIDs automatically generated by LangGraph. You can also provide your own:
import uuid

thread_id = str(uuid.uuid4())
thread = await client.threads.create(thread_id=thread_id)
Thread IDs must be unique. Creating a thread with an existing ID will fail.

Listing Threads

Retrieve all threads with pagination:
# List all threads
threads = await client.threads.list()

# Paginated listing
threads = await client.threads.list(
    limit=20,
    offset=0
)

# Filter by metadata
threads = await client.threads.list(
    metadata={"user_id": "user-123"}
)

Getting Thread Details

Retrieve a specific thread:
thread = await client.threads.get(thread_id)

print(thread["thread_id"])
print(thread["metadata"])
print(thread["created_at"])
print(thread["updated_at"])

Thread State

DeerFlow extends the standard LangGraph state with additional fields:

State Schema

class ThreadState:
    # Standard fields
    messages: list[Message]  # Conversation history
    
    # DeerFlow extensions
    sandbox: SandboxState  # {sandbox_id: str}
    thread_data: ThreadDataState  # {workspace_path, uploads_path, outputs_path}
    title: str | None  # Auto-generated after first exchange
    artifacts: list[str]  # Files presented to user via present_files tool
    todos: list | None  # Task list (plan_mode only)
    uploaded_files: list[dict]  # User uploads with metadata
    viewed_images: dict[str, ViewedImageData]  # Image cache for vision models

Getting State

# Get current state
state = await client.threads.get_state(thread_id)

print(state["values"]["messages"])  # All messages
print(state["values"]["title"])  # Thread title
print(state["values"]["artifacts"])  # Presented files
print(state["values"]["uploaded_files"])  # Uploads

State Operations

Update thread state directly:
# Update state
await client.threads.update_state(
    thread_id,
    values={
        "title": "Research Session",
        "artifacts": ["analysis.pdf", "report.md"]
    }
)

# Append to messages
await client.threads.update_state(
    thread_id,
    values={
        "messages": [
            {"role": "user", "content": "Additional context here"}
        ]
    },
    as_node="user"  # Simulate user input
)
State updates trigger the middleware chain, so changes like adding uploads will be processed by UploadsMiddleware.

Thread Configuration

Threads inherit runtime configuration from runs. You can specify different configs for each run:
# Run 1: Thinking mode enabled
await client.runs.create(
    thread_id=thread_id,
    assistant_id="lead_agent",
    input={"messages": [{"role": "user", "content": "Analyze this"}]},
    config={"configurable": {"thinking_enabled": True}}
)

# Run 2: Plan mode with different model
await client.runs.create(
    thread_id=thread_id,
    assistant_id="lead_agent",
    input={"messages": [{"role": "user", "content": "Create a plan"}]},
    config={
        "configurable": {
            "is_plan_mode": True,
            "model_name": "claude-3-5-sonnet-20241022"
        }
    }
)

Configuration Options

ParameterTypeDescription
thinking_enabledboolEnable extended thinking (model must support it)
reasoning_effortstrReasoning level: “low”, “medium”, “high”
model_namestrOverride default model
is_plan_modeboolEnable task tracking with write_todos tool
subagent_enabledboolEnable task tool for sub-agent delegation
max_concurrent_subagentsintParallel sub-agent limit (default: 3)
agent_namestrUse custom agent instead of lead_agent

Thread Isolation

Each thread gets isolated directories managed by ThreadDataMiddleware:
backend/.deer-flow/threads/{thread_id}/
├── user-data/
│   ├── workspace/    # Working directory for file operations
│   ├── uploads/      # User-uploaded files
│   └── outputs/      # Files for download via present_files

Virtual Path Mapping

Inside the sandbox, these paths are mapped:
Physical PathVirtual Path (in sandbox)
.deer-flow/threads/{id}/user-data/workspace//mnt/user-data/workspace/
.deer-flow/threads/{id}/user-data/uploads//mnt/user-data/uploads/
.deer-flow/threads/{id}/user-data/outputs//mnt/user-data/outputs/
skills//mnt/skills/
The agent sees only virtual paths. Path translation happens automatically in sandbox tools.

Working with Messages

Message Format

DeerFlow uses LangChain message types:
# User message
{
    "role": "user",
    "content": "Hello!"
}

# AI message
{
    "role": "assistant",
    "content": "Hi there!"
}

# AI message with tool calls
{
    "role": "assistant",
    "content": "",
    "tool_calls": [
        {
            "id": "call_abc123",
            "type": "function",
            "function": {
                "name": "bash",
                "arguments": '{"command": "ls -la"}'
            }
        }
    ]
}

# Tool result message
{
    "role": "tool",
    "tool_call_id": "call_abc123",
    "content": "total 24\ndrwxr-xr-x..."
}

Retrieving Messages

state = await client.threads.get_state(thread_id)
messages = state["values"]["messages"]

for msg in messages:
    if msg["role"] == "user":
        print(f"User: {msg['content']}")
    elif msg["role"] == "assistant":
        if msg.get("tool_calls"):
            print(f"AI used {len(msg['tool_calls'])} tools")
        else:
            print(f"AI: {msg['content']}")

Deleting Threads

Delete a thread and its associated data:
await client.threads.delete(thread_id)
This permanently deletes:
  • All conversation history
  • Thread state (title, artifacts, todos)
  • Isolated workspace files
  • Cannot be undone

Interrupts

DeerFlow supports interrupts via the ask_clarification tool. When the agent needs user input:
# Agent calls ask_clarification tool
# ClarificationMiddleware intercepts and interrupts

# Check for interrupts
state = await client.threads.get_state(thread_id)
if state["next"] == ["__interrupt__"]:
    print("Agent is waiting for input")
    
    # Resume with user response
    await client.threads.update_state(
        thread_id,
        values={
            "messages": [
                {"role": "user", "content": "Here's the clarification"}
            ]
        }
    )

Interrupt Flow

  1. Agent calls ask_clarification tool with a question
  2. ClarificationMiddleware intercepts in after_model hook
  3. Returns Command(goto=END) to halt execution
  4. Thread state shows next: ["__interrupt__"]
  5. Client provides clarification and resumes run
See Streaming - Interrupt Events for handling interrupts in streaming mode.

Checkpointing

LangGraph automatically checkpoints thread state after each step. This enables:
  • Time travel - Rewind to any previous state
  • Branching - Create alternate conversation paths
  • Recovery - Resume after crashes
# Get state history
history = await client.threads.get_history(thread_id, limit=10)

for checkpoint in history:
    print(f"Step {checkpoint['step']}: {checkpoint['values']['messages'][-1]}")

# Rewind to specific checkpoint
await client.threads.update_state(
    thread_id,
    values={},
    checkpoint_id=checkpoint["checkpoint_id"]
)

Example: Complete Thread Lifecycle

from langgraph_sdk import get_client

async def main():
    client = get_client(url="http://localhost:2024")
    
    # 1. Create thread
    thread = await client.threads.create(
        metadata={"project": "analysis"}
    )
    thread_id = thread["thread_id"]
    
    # 2. Start conversation
    await client.runs.create(
        thread_id=thread_id,
        assistant_id="lead_agent",
        input={
            "messages": [
                {"role": "user", "content": "Analyze sales data in /mnt/user-data/uploads/sales.csv"}
            ]
        },
        config={
            "configurable": {
                "is_plan_mode": True,
                "thinking_enabled": True
            }
        }
    )
    
    # 3. Get state
    state = await client.threads.get_state(thread_id)
    print(f"Title: {state['values']['title']}")
    print(f"Artifacts: {state['values']['artifacts']}")
    
    # 4. Continue conversation
    await client.runs.create(
        thread_id=thread_id,
        assistant_id="lead_agent",
        input={
            "messages": [
                {"role": "user", "content": "Now create a visualization"}
            ]
        }
    )
    
    # 5. Cleanup
    await client.threads.delete(thread_id)

Best Practices

Store thread IDs in your database to maintain conversation continuity across sessions.
# Store in DB
thread_id = thread["thread_id"]
db.save_conversation(user_id, thread_id)

# Retrieve later
thread_id = db.get_conversation(user_id)
state = await client.threads.get_state(thread_id)
Use metadata to organize and filter threads:
await client.threads.create(
    metadata={
        "user_id": "user-123",
        "type": "research",
        "project": "q4-analysis",
        "created_by": "web-app"
    }
)
Implement a cleanup strategy for inactive threads:
# Delete threads older than 30 days
threads = await client.threads.list()
for thread in threads:
    age = now - thread["updated_at"]
    if age > timedelta(days=30):
        await client.threads.delete(thread["thread_id"])
Always check for interrupt state before assuming completion:
state = await client.threads.get_state(thread_id)
if state["next"] == ["__interrupt__"]:
    # Handle clarification request
    last_message = state["values"]["messages"][-1]
    if last_message.get("tool_calls"):
        tool_call = last_message["tool_calls"][0]
        if tool_call["function"]["name"] == "ask_clarification":
            question = json.loads(tool_call["function"]["arguments"])["question"]
            # Prompt user for clarification

Next Steps

Streaming

Learn how to stream agent responses in real-time

File Uploads

Upload files for agent processing

Plan Mode

Enable task tracking with TodoList middleware

Memory

Understand how memory works across threads

Build docs developers (and LLMs) love