Skip to main content
The DeerFlow agent system uses a sophisticated middleware chain that processes every agent invocation through 11 specialized middleware components. Each middleware executes at specific lifecycle hooks (before_agent, after_agent, before_model, after_model, wrap_model_call, wrap_tool_call) to augment agent behavior without modifying core logic.

Execution Order

Middlewares execute in strict order defined in backend/src/agents/lead_agent/agent.py:217-250:
middlewares = [
    ThreadDataMiddleware(),
    UploadsMiddleware(),
    SandboxMiddleware(),
    DanglingToolCallMiddleware(),
    # Conditionally added:
    SummarizationMiddleware(),  # if enabled
    TodoListMiddleware(),        # if is_plan_mode
    TitleMiddleware(),
    MemoryMiddleware(),
    ViewImageMiddleware(),       # if model supports vision
    SubagentLimitMiddleware(),   # if subagent_enabled
    ClarificationMiddleware()    # must be last
]

Middleware Components

1. ThreadDataMiddleware

Purpose: Creates per-thread isolated directory structure for workspace, uploads, and output files. Lifecycle: before_agent Implementation (backend/src/agents/middlewares/thread_data_middleware.py):
class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
    def __init__(self, base_dir: str | None = None, lazy_init: bool = True):
        # lazy_init=True: Only compute paths, defer directory creation
        # lazy_init=False: Eagerly create directories
        self._paths = Paths(base_dir) if base_dir else get_paths()
        self._lazy_init = lazy_init

    def before_agent(self, state, runtime):
        thread_id = runtime.context.get("thread_id")
        if self._lazy_init:
            paths = self._get_thread_paths(thread_id)
        else:
            paths = self._create_thread_directories(thread_id)
        
        return {
            "thread_data": {
                "workspace_path": str(paths["workspace_path"]),
                "uploads_path": str(paths["uploads_path"]),
                "outputs_path": str(paths["outputs_path"])
            }
        }
Directory Structure Created:
backend/.deer-flow/threads/{thread_id}/user-data/
β”œβ”€β”€ workspace/  # Agent's working directory
β”œβ”€β”€ uploads/    # User-uploaded files
└── outputs/    # Files presented to user via present_files tool

2. UploadsMiddleware

Purpose: Injects uploaded file information into the conversation, tracking new uploads across turns. Lifecycle: before_agent Implementation (backend/src/agents/middlewares/uploads_middleware.py:139-220):
class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
    def before_agent(self, state, runtime):
        thread_id = runtime.context.get("thread_id")
        messages = state.get("messages", [])
        
        # Track previously shown files from message history
        shown_files = self._extract_files_from_previous_messages(messages[:-1])
        
        # List only newly uploaded files
        new_files = self._list_newly_uploaded_files(thread_id, shown_files)
        
        if new_files:
            # Prepend file list to last human message
            files_message = self._create_files_message(new_files)
            updated_message = HumanMessage(
                content=f"{files_message}\n\n{original_content}"
            )
            messages[-1] = updated_message
        
        return {"uploaded_files": new_files, "messages": messages}
Key Features:
  • Deduplicates files already shown in previous turns
  • Formats file list with size and virtual path: /mnt/user-data/uploads/{filename}
  • Supports filenames with spaces via regex r"^-\s+(.+?)\s*\("

3. SandboxMiddleware

Purpose: Acquires and manages isolated execution environments for agent tool calls. Lifecycle: before_agent Implementation (backend/src/sandbox/middleware.py:18-61):
class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):
    def __init__(self, lazy_init: bool = True):
        # lazy_init=True: Acquire on first tool call
        # lazy_init=False: Acquire in before_agent()
        self._lazy_init = lazy_init

    def before_agent(self, state, runtime):
        if self._lazy_init:
            return None  # Defer acquisition
        
        if "sandbox" not in state or state["sandbox"] is None:
            thread_id = runtime.context["thread_id"]
            sandbox_id = self._acquire_sandbox(thread_id)
            return {"sandbox": {"sandbox_id": sandbox_id}}
Sandbox Lifecycle:
  • Sandbox reused across turns within same thread (not released after each call)
  • Cleanup occurs at application shutdown via SandboxProvider.shutdown()
  • Supports local filesystem (LocalSandboxProvider) and Docker (AioSandboxProvider)

4. DanglingToolCallMiddleware

Purpose: Fixes message history gaps caused by interrupted tool calls (e.g., user cancellation). Lifecycle: wrap_model_call Implementation (backend/src/agents/middlewares/dangling_tool_call_middleware.py:28-111):
class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
    def wrap_model_call(self, request, handler):
        # Scan for AIMessages with tool_calls that lack ToolMessage responses
        patched = self._build_patched_messages(request.messages)
        if patched:
            request = request.override(messages=patched)
        return handler(request)
    
    def _build_patched_messages(self, messages):
        existing_tool_msg_ids = {msg.tool_call_id for msg in messages 
                                  if isinstance(msg, ToolMessage)}
        
        patched = []
        for msg in messages:
            patched.append(msg)
            if getattr(msg, "type", None) == "ai":
                for tc in getattr(msg, "tool_calls", []):
                    if tc["id"] not in existing_tool_msg_ids:
                        # Inject placeholder ToolMessage
                        patched.append(ToolMessage(
                            content="[Tool call was interrupted and did not return a result.]",
                            tool_call_id=tc["id"],
                            status="error"
                        ))
        return patched
Why wrap_model_call instead of before_model: Ensures patches are inserted immediately after each dangling AIMessage, not appended to the end (which before_model + add_messages reducer would do).

5. SummarizationMiddleware (Optional)

Purpose: Automatic context reduction when approaching token limits. Lifecycle: before_model, after_model Configuration (backend/src/config/summarization_config.py):
class SummarizationConfig(BaseModel):
    enabled: bool = False
    model_name: str | None = None  # None = use lightweight model
    trigger: ContextSize | list[ContextSize] | None
    keep: ContextSize = ContextSize(type="messages", value=20)
    trim_tokens_to_summarize: int | None = 4000
    summary_prompt: str | None = None
Trigger Types:
  • {"type": "fraction", "value": 0.8} - 80% of model’s max input tokens
  • {"type": "tokens", "value": 4000} - 4000 tokens
  • {"type": "messages", "value": 50} - 50 messages
Keep Policies: Same types as triggers, defines how much context to preserve after summarization. Creation (backend/src/agents/lead_agent/agent.py:41-80):
def _create_summarization_middleware():
    config = get_summarization_config()
    if not config.enabled:
        return None
    
    # Convert config to middleware parameters
    trigger = [t.to_tuple() for t in config.trigger] if isinstance(config.trigger, list) 
              else config.trigger.to_tuple()
    keep = config.keep.to_tuple()
    model = config.model_name or create_chat_model(thinking_enabled=False)
    
    return SummarizationMiddleware(
        model=model,
        trigger=trigger,
        keep=keep,
        trim_tokens_to_summarize=config.trim_tokens_to_summarize
    )

6. TodoListMiddleware (Optional)

Purpose: Provides write_todos tool for structured task tracking in complex multi-step workflows. Lifecycle: Tool injection + state management Activation: Enabled when config.configurable.is_plan_mode = True Custom Configuration (backend/src/agents/lead_agent/agent.py:83-195):
def _create_todo_list_middleware(is_plan_mode: bool):
    if not is_plan_mode:
        return None
    
    system_prompt = """
    <todo_list_system>
    **CRITICAL RULES:**
    - Mark todos as completed IMMEDIATELY after finishing each step
    - Keep EXACTLY ONE task as `in_progress` at any time
    - Update in REAL-TIME - gives users visibility
    - DO NOT use for simple tasks (< 3 steps)
    </todo_list_system>
    """
    
    tool_description = """Use for complex tasks (3+ steps) only..."""
    
    return TodoListMiddleware(
        system_prompt=system_prompt,
        tool_description=tool_description
    )
Task States:
  • pending - Not started
  • in_progress - Currently working (one at a time, or multiple if parallel)
  • completed - Finished successfully

7. TitleMiddleware

Purpose: Auto-generates thread title after first complete user-assistant exchange. Lifecycle: after_agent Implementation (backend/src/agents/middlewares/title_middleware.py:19-94):
class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
    def after_agent(self, state, runtime):
        if self._should_generate_title(state):
            title = self._generate_title(state)
            return {"title": title}
        return None
    
    def _should_generate_title(self, state):
        config = get_title_config()
        if not config.enabled or state.get("title"):
            return False
        
        messages = state.get("messages", [])
        user_messages = [m for m in messages if m.type == "human"]
        assistant_messages = [m for m in messages if m.type == "ai"]
        
        # Generate after first complete exchange
        return len(user_messages) == 1 and len(assistant_messages) >= 1
    
    def _generate_title(self, state):
        config = get_title_config()
        model = create_chat_model(thinking_enabled=False)  # Lightweight model
        
        user_msg = next(m.content for m in messages if m.type == "human")
        assistant_msg = next(m.content for m in messages if m.type == "ai")
        
        prompt = config.prompt_template.format(
            max_words=config.max_words,
            user_msg=user_msg[:500],
            assistant_msg=assistant_msg[:500]
        )
        
        response = model.invoke(prompt)
        title = response.content.strip()[:config.max_chars]
        return title
Fallback: If LLM fails, uses first 50 characters of user message.

8. MemoryMiddleware

Purpose: Queues conversation for asynchronous memory extraction and updates. Lifecycle: after_agent Implementation (backend/src/agents/middlewares/memory_middleware.py:53-117):
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
    def __init__(self, agent_name: str | None = None):
        # agent_name: If provided, uses per-agent memory storage
        self._agent_name = agent_name
    
    def after_agent(self, state, runtime):
        config = get_memory_config()
        if not config.enabled:
            return None
        
        thread_id = runtime.context.get("thread_id")
        messages = state.get("messages", [])
        
        # Filter to user inputs + final assistant responses (no tool calls)
        filtered_messages = _filter_messages_for_memory(messages)
        
        # Queue for debounced background processing
        queue = get_memory_queue()
        queue.add(
            thread_id=thread_id,
            messages=filtered_messages,
            agent_name=self._agent_name
        )
        
        return None  # No state changes
Message Filtering (backend/src/agents/middlewares/memory_middleware.py:19-50):
def _filter_messages_for_memory(messages):
    filtered = []
    for msg in messages:
        if msg.type == "human":
            filtered.append(msg)  # Always keep user messages
        elif msg.type == "ai" and not getattr(msg, "tool_calls", None):
            filtered.append(msg)  # Only keep final AI responses
    return filtered
Memory Workflow:
  1. Middleware queues conversation after agent completes
  2. Queue debounces (30s default) and batches updates
  3. Background thread invokes LLM to extract facts and context
  4. Updates stored atomically in backend/.deer-flow/memory.json
  5. Next interaction injects top 15 facts into system prompt

9. ViewImageMiddleware (Optional)

Purpose: Injects base64 image data into conversation when view_image tool completes. Lifecycle: before_model Activation: Only added if model_config.supports_vision = true Implementation (backend/src/agents/middlewares/view_image_middleware.py:19-222):
class ViewImageMiddleware(AgentMiddleware[ViewImageMiddlewareState]):
    def before_model(self, state, runtime):
        return self._inject_image_message(state)
    
    def _should_inject_image_message(self, state):
        messages = state.get("messages", [])
        last_assistant_msg = self._get_last_assistant_message(messages)
        
        if not last_assistant_msg:
            return False
        
        # Check if it has view_image tool calls
        if not self._has_view_image_tool(last_assistant_msg):
            return False
        
        # Check if all tools completed
        if not self._all_tools_completed(messages, last_assistant_msg):
            return False
        
        # Check if we already injected the message
        # (prevents duplicate injections)
        return not self._already_injected(messages, last_assistant_msg)
    
    def _create_image_details_message(self, state):
        viewed_images = state.get("viewed_images", {})
        content_blocks = [
            {"type": "text", "text": "Here are the images you've viewed:"}
        ]
        
        for image_path, image_data in viewed_images.items():
            content_blocks.append({
                "type": "text",
                "text": f"\n- **{image_path}** ({image_data['mime_type']})"
            })
            content_blocks.append({
                "type": "image_url",
                "image_url": {
                    "url": f"data:{image_data['mime_type']};base64,{image_data['base64']}"
                }
            })
        
        return content_blocks
State Management: Uses viewed_images dict in ThreadState with custom reducer:
def merge_viewed_images(existing, new):
    if new == {}:  # Empty dict clears all viewed images
        return {}
    return {**existing, **new}  # Merge dictionaries

10. SubagentLimitMiddleware (Optional)

Purpose: Enforces maximum concurrent subagent calls by truncating excess task tool calls. Lifecycle: after_model Activation: Only added if config.configurable.subagent_enabled = True Implementation (backend/src/agents/middlewares/subagent_limit_middleware.py:24-76):
class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
    def __init__(self, max_concurrent: int = MAX_CONCURRENT_SUBAGENTS):
        # max_concurrent clamped to [2, 4]
        self.max_concurrent = _clamp_subagent_limit(max_concurrent)
    
    def after_model(self, state, runtime):
        return self._truncate_task_calls(state)
    
    def _truncate_task_calls(self, state):
        messages = state.get("messages", [])
        last_msg = messages[-1]
        
        if getattr(last_msg, "type", None) != "ai":
            return None
        
        tool_calls = getattr(last_msg, "tool_calls", None)
        task_indices = [i for i, tc in enumerate(tool_calls) 
                        if tc.get("name") == "task"]
        
        if len(task_indices) <= self.max_concurrent:
            return None
        
        # Keep only first max_concurrent task calls
        indices_to_drop = set(task_indices[self.max_concurrent:])
        truncated = [tc for i, tc in enumerate(tool_calls) 
                     if i not in indices_to_drop]
        
        logger.warning(f"Truncated {len(indices_to_drop)} excess task calls")
        
        updated_msg = last_msg.model_copy(update={"tool_calls": truncated})
        return {"messages": [updated_msg]}
Why This Works: More reliable than prompt-based limits. Model can generate unlimited task calls, middleware truncates deterministically.

11. ClarificationMiddleware

Purpose: Intercepts ask_clarification tool calls and interrupts execution to present questions to user. Lifecycle: wrap_tool_call Position: MUST BE LAST in middleware chain to intercept after all other processing. Implementation (backend/src/agents/middlewares/clarification_middleware.py:20-174):
class ClarificationMiddleware(AgentMiddleware[ClarificationMiddlewareState]):
    def wrap_tool_call(self, request, handler):
        if request.tool_call.get("name") != "ask_clarification":
            return handler(request)  # Pass through
        
        return self._handle_clarification(request)
    
    def _handle_clarification(self, request):
        args = request.tool_call.get("args", {})
        formatted_message = self._format_clarification_message(args)
        
        tool_message = ToolMessage(
            content=formatted_message,
            tool_call_id=request.tool_call.get("id"),
            name="ask_clarification"
        )
        
        # Return Command that interrupts execution
        return Command(
            update={"messages": [tool_message]},
            goto=END  # Stop execution, wait for user response
        )
    
    def _format_clarification_message(self, args):
        question = args.get("question", "")
        clarification_type = args.get("clarification_type", "missing_info")
        context = args.get("context")
        options = args.get("options", [])
        
        type_icons = {
            "missing_info": "❓",
            "ambiguous_requirement": "πŸ€”",
            "approach_choice": "πŸ”€",
            "risk_confirmation": "⚠️",
            "suggestion": "πŸ’‘"
        }
        
        icon = type_icons.get(clarification_type, "❓")
        
        message_parts = []
        if context:
            message_parts.append(f"{icon} {context}")
            message_parts.append(f"\n{question}")
        else:
            message_parts.append(f"{icon} {question}")
        
        if options:
            message_parts.append("")
            for i, option in enumerate(options, 1):
                message_parts.append(f"  {i}. {option}")
        
        return "\n".join(message_parts)
Key Behavior: Uses Command(goto=END) to interrupt graph execution, forcing wait for user input.

Middleware Ordering Rationale

The strict order ensures correct dependency resolution:
  1. ThreadDataMiddleware β†’ Creates thread directories first (required by UploadsMiddleware, SandboxMiddleware)
  2. UploadsMiddleware β†’ Injects file info before sandbox/model sees it
  3. SandboxMiddleware β†’ Acquires environment before tool execution
  4. DanglingToolCallMiddleware β†’ Patches message history before model sees it
  5. SummarizationMiddleware β†’ Reduces context early (before other processing)
  6. TodoListMiddleware β†’ Enables task tracking (before clarification)
  7. TitleMiddleware β†’ Generates title after first exchange
  8. MemoryMiddleware β†’ Queues after title generation (complete turn)
  9. ViewImageMiddleware β†’ Injects images before model call (if vision supported)
  10. SubagentLimitMiddleware β†’ Truncates after model generates tool calls
  11. ClarificationMiddleware β†’ MUST BE LAST to intercept all tool calls

Runtime Configuration

Middlewares can be conditionally enabled via config.configurable:
config = {
    "configurable": {
        "thinking_enabled": True,
        "model_name": "gpt-4o",
        "is_plan_mode": False,      # Enables TodoListMiddleware
        "subagent_enabled": True,    # Enables SubagentLimitMiddleware
        "max_concurrent_subagents": 3
    }
}

agent = make_lead_agent(config)

State Schema Compatibility

All middlewares use state schemas compatible with ThreadState (backend/src/agents/thread_state.py:48-56):
class ThreadState(AgentState):
    sandbox: NotRequired[SandboxState | None]
    thread_data: NotRequired[ThreadDataState | None]
    title: NotRequired[str | None]
    artifacts: Annotated[list[str], merge_artifacts]
    todos: NotRequired[list | None]
    uploaded_files: NotRequired[list[dict] | None]
    viewed_images: Annotated[dict[str, ViewedImageData], merge_viewed_images]
Custom Reducers:
  • merge_artifacts - Deduplicates artifact paths while preserving order
  • merge_viewed_images - Merges image dicts, empty dict {} clears all

Debugging Middlewares

Each middleware logs key actions:
logger.warning(f"Injecting {count} placeholder ToolMessage(s) for dangling tool calls")
logger.warning(f"Truncated {count} excess task call(s) from model response (limit: {limit})")
print(f"[ViewImageMiddleware] Injecting image details message with images before LLM call")
print(f"[ClarificationMiddleware] Intercepted clarification request")
View logs via:
cd backend
make dev  # Watch logs in terminal

See Also

Build docs developers (and LLMs) love