Overview
Threads represent individual conversation sessions in DeerFlow. Each thread maintains its own:
Conversation history - All messages exchanged
State data - Title, artifacts, todos, uploads, viewed images
Isolated workspace - Dedicated directories for files and outputs
Sandbox connection - Persistent or ephemeral execution environment
Creating Threads
Create a new thread using the LangGraph SDK:
from langgraph_sdk import get_client
client = get_client( url = "http://localhost:2024" )
# Create a thread
thread = await client.threads.create()
print ( f "Thread ID: { thread[ 'thread_id' ] } " )
# Create with metadata
thread = await client.threads.create(
metadata = {
"user_id" : "user-123" ,
"project" : "my-project" ,
"tags" : [ "research" , "analysis" ]
}
)
Thread IDs
Thread IDs are UUIDs automatically generated by LangGraph. You can also provide your own:
import uuid
thread_id = str (uuid.uuid4())
thread = await client.threads.create( thread_id = thread_id)
Thread IDs must be unique. Creating a thread with an existing ID will fail.
Listing Threads
Retrieve all threads with pagination:
# List all threads
threads = await client.threads.list()
# Paginated listing
threads = await client.threads.list(
limit = 20 ,
offset = 0
)
# Filter by metadata
threads = await client.threads.list(
metadata = { "user_id" : "user-123" }
)
Getting Thread Details
Retrieve a specific thread:
thread = await client.threads.get(thread_id)
print (thread[ "thread_id" ])
print (thread[ "metadata" ])
print (thread[ "created_at" ])
print (thread[ "updated_at" ])
Thread State
DeerFlow extends the standard LangGraph state with additional fields:
State Schema
class ThreadState :
# Standard fields
messages: list[Message] # Conversation history
# DeerFlow extensions
sandbox: SandboxState # {sandbox_id: str}
thread_data: ThreadDataState # {workspace_path, uploads_path, outputs_path}
title: str | None # Auto-generated after first exchange
artifacts: list[ str ] # Files presented to user via present_files tool
todos: list | None # Task list (plan_mode only)
uploaded_files: list[ dict ] # User uploads with metadata
viewed_images: dict[ str , ViewedImageData] # Image cache for vision models
Getting State
# Get current state
state = await client.threads.get_state(thread_id)
print (state[ "values" ][ "messages" ]) # All messages
print (state[ "values" ][ "title" ]) # Thread title
print (state[ "values" ][ "artifacts" ]) # Presented files
print (state[ "values" ][ "uploaded_files" ]) # Uploads
State Operations
Update thread state directly:
# Update state
await client.threads.update_state(
thread_id,
values = {
"title" : "Research Session" ,
"artifacts" : [ "analysis.pdf" , "report.md" ]
}
)
# Append to messages
await client.threads.update_state(
thread_id,
values = {
"messages" : [
{ "role" : "user" , "content" : "Additional context here" }
]
},
as_node = "user" # Simulate user input
)
State updates trigger the middleware chain, so changes like adding uploads will be processed by UploadsMiddleware.
Thread Configuration
Threads inherit runtime configuration from runs. You can specify different configs for each run:
# Run 1: Thinking mode enabled
await client.runs.create(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : "Analyze this" }]},
config = { "configurable" : { "thinking_enabled" : True }}
)
# Run 2: Plan mode with different model
await client.runs.create(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = { "messages" : [{ "role" : "user" , "content" : "Create a plan" }]},
config = {
"configurable" : {
"is_plan_mode" : True ,
"model_name" : "claude-3-5-sonnet-20241022"
}
}
)
Configuration Options
Parameter Type Description thinking_enabledboolEnable extended thinking (model must support it) reasoning_effortstrReasoning level: “low”, “medium”, “high” model_namestrOverride default model is_plan_modeboolEnable task tracking with write_todos tool subagent_enabledboolEnable task tool for sub-agent delegation max_concurrent_subagentsintParallel sub-agent limit (default: 3) agent_namestrUse custom agent instead of lead_agent
Thread Isolation
Each thread gets isolated directories managed by ThreadDataMiddleware:
backend/.deer-flow/threads/{thread_id}/
├── user-data/
│ ├── workspace/ # Working directory for file operations
│ ├── uploads/ # User-uploaded files
│ └── outputs/ # Files for download via present_files
Virtual Path Mapping
Inside the sandbox, these paths are mapped:
Physical Path Virtual Path (in sandbox) .deer-flow/threads/{id}/user-data/workspace//mnt/user-data/workspace/.deer-flow/threads/{id}/user-data/uploads//mnt/user-data/uploads/.deer-flow/threads/{id}/user-data/outputs//mnt/user-data/outputs/skills//mnt/skills/
The agent sees only virtual paths. Path translation happens automatically in sandbox tools.
Working with Messages
DeerFlow uses LangChain message types:
# User message
{
"role" : "user" ,
"content" : "Hello!"
}
# AI message
{
"role" : "assistant" ,
"content" : "Hi there!"
}
# AI message with tool calls
{
"role" : "assistant" ,
"content" : "" ,
"tool_calls" : [
{
"id" : "call_abc123" ,
"type" : "function" ,
"function" : {
"name" : "bash" ,
"arguments" : '{"command": "ls -la"}'
}
}
]
}
# Tool result message
{
"role" : "tool" ,
"tool_call_id" : "call_abc123" ,
"content" : "total 24 \n drwxr-xr-x..."
}
Retrieving Messages
state = await client.threads.get_state(thread_id)
messages = state[ "values" ][ "messages" ]
for msg in messages:
if msg[ "role" ] == "user" :
print ( f "User: { msg[ 'content' ] } " )
elif msg[ "role" ] == "assistant" :
if msg.get( "tool_calls" ):
print ( f "AI used { len (msg[ 'tool_calls' ]) } tools" )
else :
print ( f "AI: { msg[ 'content' ] } " )
Deleting Threads
Delete a thread and its associated data:
await client.threads.delete(thread_id)
This permanently deletes:
All conversation history
Thread state (title, artifacts, todos)
Isolated workspace files
Cannot be undone
Interrupts
DeerFlow supports interrupts via the ask_clarification tool. When the agent needs user input:
# Agent calls ask_clarification tool
# ClarificationMiddleware intercepts and interrupts
# Check for interrupts
state = await client.threads.get_state(thread_id)
if state[ "next" ] == [ "__interrupt__" ]:
print ( "Agent is waiting for input" )
# Resume with user response
await client.threads.update_state(
thread_id,
values = {
"messages" : [
{ "role" : "user" , "content" : "Here's the clarification" }
]
}
)
Interrupt Flow
Agent calls ask_clarification tool with a question
ClarificationMiddleware intercepts in after_model hook
Returns Command(goto=END) to halt execution
Thread state shows next: ["__interrupt__"]
Client provides clarification and resumes run
Checkpointing
LangGraph automatically checkpoints thread state after each step. This enables:
Time travel - Rewind to any previous state
Branching - Create alternate conversation paths
Recovery - Resume after crashes
# Get state history
history = await client.threads.get_history(thread_id, limit = 10 )
for checkpoint in history:
print ( f "Step { checkpoint[ 'step' ] } : { checkpoint[ 'values' ][ 'messages' ][ - 1 ] } " )
# Rewind to specific checkpoint
await client.threads.update_state(
thread_id,
values = {},
checkpoint_id = checkpoint[ "checkpoint_id" ]
)
Example: Complete Thread Lifecycle
from langgraph_sdk import get_client
async def main ():
client = get_client( url = "http://localhost:2024" )
# 1. Create thread
thread = await client.threads.create(
metadata = { "project" : "analysis" }
)
thread_id = thread[ "thread_id" ]
# 2. Start conversation
await client.runs.create(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = {
"messages" : [
{ "role" : "user" , "content" : "Analyze sales data in /mnt/user-data/uploads/sales.csv" }
]
},
config = {
"configurable" : {
"is_plan_mode" : True ,
"thinking_enabled" : True
}
}
)
# 3. Get state
state = await client.threads.get_state(thread_id)
print ( f "Title: { state[ 'values' ][ 'title' ] } " )
print ( f "Artifacts: { state[ 'values' ][ 'artifacts' ] } " )
# 4. Continue conversation
await client.runs.create(
thread_id = thread_id,
assistant_id = "lead_agent" ,
input = {
"messages" : [
{ "role" : "user" , "content" : "Now create a visualization" }
]
}
)
# 5. Cleanup
await client.threads.delete(thread_id)
Best Practices
Use consistent thread IDs
Store thread IDs in your database to maintain conversation continuity across sessions. # Store in DB
thread_id = thread[ "thread_id" ]
db.save_conversation(user_id, thread_id)
# Retrieve later
thread_id = db.get_conversation(user_id)
state = await client.threads.get_state(thread_id)
Add metadata for filtering
Implement a cleanup strategy for inactive threads: # Delete threads older than 30 days
threads = await client.threads.list()
for thread in threads:
age = now - thread[ "updated_at" ]
if age > timedelta( days = 30 ):
await client.threads.delete(thread[ "thread_id" ])
Handle interrupts gracefully
Always check for interrupt state before assuming completion: state = await client.threads.get_state(thread_id)
if state[ "next" ] == [ "__interrupt__" ]:
# Handle clarification request
last_message = state[ "values" ][ "messages" ][ - 1 ]
if last_message.get( "tool_calls" ):
tool_call = last_message[ "tool_calls" ][ 0 ]
if tool_call[ "function" ][ "name" ] == "ask_clarification" :
question = json.loads(tool_call[ "function" ][ "arguments" ])[ "question" ]
# Prompt user for clarification
Next Steps
Streaming Learn how to stream agent responses in real-time
File Uploads Upload files for agent processing
Plan Mode Enable task tracking with TodoList middleware
Memory Understand how memory works across threads