Skip to main content

Agent Loop

The Agent Loop is the heart of nanobot — a focused iteration engine that processes messages through context building, LLM interaction, and tool execution.

Overview

Implemented in nanobot/agent/loop.py, the AgentLoop class orchestrates the entire agent processing pipeline:
class AgentLoop:
    """
    The agent loop is the core processing engine.

    It:
    1. Receives messages from the bus
    2. Builds context with history, memory, skills
    3. Calls the LLM
    4. Executes tool calls
    5. Sends responses back
    """

Initialization

The AgentLoop is initialized with configuration and dependencies:
def __init__(
    self,
    bus: MessageBus,
    provider: LLMProvider,
    workspace: Path,
    model: str | None = None,
    max_iterations: int = 40,
    temperature: float = 0.1,
    max_tokens: int = 4096,
    memory_window: int = 100,
    reasoning_effort: str | None = None,
    # ... more parameters
):
Default max_iterations: 40 — prevents infinite loops while allowing complex multi-step reasoning

Message Processing Flow

1

Message Reception

The loop continuously polls the message bus for new InboundMessage objects:
async def run(self) -> None:
    self._running = True
    await self._connect_mcp()
    logger.info("Agent loop started")

    while self._running:
        try:
            msg = await asyncio.wait_for(
                self.bus.consume_inbound(), 
                timeout=1.0
            )
        except asyncio.TimeoutError:
            continue

        if msg.content.strip().lower() == "/stop":
            await self._handle_stop(msg)
        else:
            task = asyncio.create_task(self._dispatch(msg))
            # Track task for cancellation support
2

Context Building

Load session history and build the message context:
session = self.sessions.get_or_create(key)
history = session.get_history(max_messages=self.memory_window)
initial_messages = self.context.build_messages(
    history=history,
    current_message=msg.content,
    media=msg.media if msg.media else None,
    channel=msg.channel, 
    chat_id=msg.chat_id,
)
3

Agent Iteration Loop

Enter the core iteration loop that calls the LLM and executes tools:
async def _run_agent_loop(
    self,
    initial_messages: list[dict],
    on_progress: Callable[..., Awaitable[None]] | None = None,
) -> tuple[str | None, list[str], list[dict]]:
    messages = initial_messages
    iteration = 0
    final_content = None
    tools_used: list[str] = []

    while iteration < self.max_iterations:
        iteration += 1

        response = await self.provider.chat(
            messages=messages,
            tools=self.tools.get_definitions(),
            model=self.model,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            reasoning_effort=self.reasoning_effort,
        )

        if response.has_tool_calls:
            # Execute tools and continue iteration
            # ...
        else:
            # Got final text response, exit loop
            final_content = response.content
            break
4

Session Persistence

Save the conversation turn to session history:
self._save_turn(session, all_msgs, 1 + len(history))
self.sessions.save(session)
5

Response Publishing

Publish the final response back to the channel:
return OutboundMessage(
    channel=msg.channel, 
    chat_id=msg.chat_id, 
    content=final_content,
    metadata=msg.metadata or {},
)

Iteration Logic

The iteration loop continues until one of these conditions is met:

Text Response

LLM returns text without tool calls — task is complete

Max Iterations

Reaches max_iterations limit (default: 40) — prevents infinite loops

Error Response

LLM returns an error finish reason — handled gracefully

Task Cancelled

User sends /stop command — all active tasks are cancelled

Tool Call Execution

When the LLM returns tool calls:
if response.has_tool_calls:
    # Show thinking/reasoning to user if provided
    if on_progress:
        thoughts = [
            self._strip_think(response.content),
            response.reasoning_content,
            # ... thinking blocks
        ]
        combined_thoughts = "\n\n".join(filter(None, thoughts))
        if combined_thoughts:
            await on_progress(combined_thoughts)
        await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)

    # Add assistant message with tool calls
    tool_call_dicts = [
        {
            "id": tc.id,
            "type": "function",
            "function": {
                "name": tc.name,
                "arguments": json.dumps(tc.arguments, ensure_ascii=False)
            }
        }
        for tc in response.tool_calls
    ]
    messages = self.context.add_assistant_message(
        messages, response.content, tool_call_dicts,
        reasoning_content=response.reasoning_content,
        thinking_blocks=response.thinking_blocks,
    )

    # Execute each tool call
    for tool_call in response.tool_calls:
        tools_used.append(tool_call.name)
        result = await self.tools.execute(
            tool_call.name, 
            tool_call.arguments
        )
        messages = self.context.add_tool_result(
            messages, tool_call.id, tool_call.name, result
        )
Tool results are added to the message history and sent back to the LLM in the next iteration.

Special Commands

The agent loop handles special slash commands:
Consolidates current session into memory and starts fresh:
if cmd == "/new":
    lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())
    self._consolidating.add(session.key)
    try:
        async with lock:
            snapshot = session.messages[session.last_consolidated:]
            if snapshot:
                # Archive unconsolidated messages to memory
                temp = Session(key=session.key)
                temp.messages = list(snapshot)
                if not await self._consolidate_memory(temp, archive_all=True):
                    return OutboundMessage(
                        channel=msg.channel, chat_id=msg.chat_id,
                        content="Memory archival failed...",
                    )
    finally:
        self._consolidating.discard(session.key)

    session.clear()
    self.sessions.save(session)
    self.sessions.invalidate(session.key)
    return OutboundMessage(
        channel=msg.channel, chat_id=msg.chat_id,
        content="New session started."
    )
Cancels all running tasks and subagents for the session:
async def _handle_stop(self, msg: InboundMessage) -> None:
    tasks = self._active_tasks.pop(msg.session_key, [])
    cancelled = sum(1 for t in tasks if not t.done() and t.cancel())
    for t in tasks:
        try:
            await t
        except (asyncio.CancelledError, Exception):
            pass
    sub_cancelled = await self.subagents.cancel_by_session(msg.session_key)
    total = cancelled + sub_cancelled
    content = f"⏹ Stopped {total} task(s)." if total else "No active task to stop."
    await self.bus.publish_outbound(OutboundMessage(
        channel=msg.channel, chat_id=msg.chat_id, content=content,
    ))
Returns available commands:
if cmd == "/help":
    return OutboundMessage(
        channel=msg.channel, chat_id=msg.chat_id,
        content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands"
    )

Memory Consolidation

When session history grows beyond memory_window, the agent automatically triggers background memory consolidation:
unconsolidated = len(session.messages) - session.last_consolidated
if (unconsolidated >= self.memory_window and 
    session.key not in self._consolidating):
    self._consolidating.add(session.key)
    lock = self._consolidation_locks.setdefault(session.key, asyncio.Lock())

    async def _consolidate_and_unlock():
        try:
            async with lock:
                await self._consolidate_memory(session)
        finally:
            self._consolidating.discard(session.key)

    _task = asyncio.create_task(_consolidate_and_unlock())
    self._consolidation_tasks.add(_task)
Memory consolidation runs asynchronously without blocking the main agent loop.

Progress Streaming

The agent loop supports real-time progress updates to keep users informed during long operations:
async def _bus_progress(content: str, *, tool_hint: bool = False) -> None:
    meta = dict(msg.metadata or {})
    meta["_progress"] = True
    meta["_tool_hint"] = tool_hint
    await self.bus.publish_outbound(OutboundMessage(
        channel=msg.channel, 
        chat_id=msg.chat_id, 
        content=content, 
        metadata=meta,
    ))
Progress messages show:
  • Thinking content: Reasoning/planning from the LLM
  • Tool hints: Which tools are being called (e.g., web_search("query"))

MCP Integration

The agent loop connects to MCP (Model Context Protocol) servers on startup:
async def _connect_mcp(self) -> None:
    """Connect to configured MCP servers (one-time, lazy)."""
    if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
        return
    self._mcp_connecting = True
    from nanobot.agent.tools.mcp import connect_mcp_servers
    try:
        self._mcp_stack = AsyncExitStack()
        await self._mcp_stack.__aenter__()
        await connect_mcp_servers(
            self._mcp_servers, 
            self.tools, 
            self._mcp_stack
        )
        self._mcp_connected = True
    except Exception as e:
        logger.error("Failed to connect MCP servers: {}", e)
    finally:
        self._mcp_connecting = False
MCP servers are connected lazily on first use and reused for all subsequent messages.

Error Handling

The agent loop includes robust error handling:
async def _dispatch(self, msg: InboundMessage) -> None:
    async with self._processing_lock:
        try:
            response = await self._process_message(msg)
            if response is not None:
                await self.bus.publish_outbound(response)
            elif msg.channel == "cli":
                # Send empty response for CLI to complete prompt
                await self.bus.publish_outbound(OutboundMessage(
                    channel=msg.channel, chat_id=msg.chat_id,
                    content="", metadata=msg.metadata or {},
                ))
        except asyncio.CancelledError:
            logger.info("Task cancelled for session {}", msg.session_key)
            raise
        except Exception:
            logger.exception("Error processing message for session {}", msg.session_key)
            await self.bus.publish_outbound(OutboundMessage(
                channel=msg.channel, chat_id=msg.chat_id,
                content="Sorry, I encountered an error.",
            ))

Direct Processing Mode

For CLI and cron usage, the agent loop provides a direct processing API:
async def process_direct(
    self,
    content: str,
    session_key: str = "cli:direct",
    channel: str = "cli",
    chat_id: str = "direct",
    on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
    """Process a message directly (for CLI or cron usage)."""
    await self._connect_mcp()
    msg = InboundMessage(
        channel=channel, 
        sender_id="user", 
        chat_id=chat_id, 
        content=content
    )
    response = await self._process_message(
        msg, 
        session_key=session_key, 
        on_progress=on_progress
    )
    return response.content if response else ""

Architecture

Overall system design

Tools

How tools are executed

Memory

Session persistence and consolidation

Build docs developers (and LLMs) love