Agent Loop
The Agent Loop is the heart of nanobot — a focused iteration engine that processes messages through context building, LLM interaction, and tool execution.
Overview
Implemented in nanobot/agent/loop.py, the AgentLoop class orchestrates the entire agent processing pipeline:
class AgentLoop :
"""
The agent loop is the core processing engine.
It:
1. Receives messages from the bus
2. Builds context with history, memory, skills
3. Calls the LLM
4. Executes tool calls
5. Sends responses back
"""
Initialization
The AgentLoop is initialized with configuration and dependencies:
def __init__ (
self ,
bus : MessageBus,
provider : LLMProvider,
workspace : Path,
model : str | None = None ,
max_iterations : int = 40 ,
temperature : float = 0.1 ,
max_tokens : int = 4096 ,
memory_window : int = 100 ,
reasoning_effort : str | None = None ,
# ... more parameters
):
Default max_iterations : 40 — prevents infinite loops while allowing complex multi-step reasoning
Message Processing Flow
Message Reception
The loop continuously polls the message bus for new InboundMessage objects: async def run ( self ) -> None :
self ._running = True
await self ._connect_mcp()
logger.info( "Agent loop started" )
while self ._running:
try :
msg = await asyncio.wait_for(
self .bus.consume_inbound(),
timeout = 1.0
)
except asyncio.TimeoutError:
continue
if msg.content.strip().lower() == "/stop" :
await self ._handle_stop(msg)
else :
task = asyncio.create_task( self ._dispatch(msg))
# Track task for cancellation support
Context Building
Load session history and build the message context: session = self .sessions.get_or_create(key)
history = session.get_history( max_messages = self .memory_window)
initial_messages = self .context.build_messages(
history = history,
current_message = msg.content,
media = msg.media if msg.media else None ,
channel = msg.channel,
chat_id = msg.chat_id,
)
Agent Iteration Loop
Enter the core iteration loop that calls the LLM and executes tools: async def _run_agent_loop (
self ,
initial_messages : list[ dict ],
on_progress : Callable[ ... , Awaitable[ None ]] | None = None ,
) -> tuple[ str | None , list[ str ], list[ dict ]]:
messages = initial_messages
iteration = 0
final_content = None
tools_used: list[ str ] = []
while iteration < self .max_iterations:
iteration += 1
response = await self .provider.chat(
messages = messages,
tools = self .tools.get_definitions(),
model = self .model,
temperature = self .temperature,
max_tokens = self .max_tokens,
reasoning_effort = self .reasoning_effort,
)
if response.has_tool_calls:
# Execute tools and continue iteration
# ...
else :
# Got final text response, exit loop
final_content = response.content
break
Session Persistence
Save the conversation turn to session history: self ._save_turn(session, all_msgs, 1 + len (history))
self .sessions.save(session)
Response Publishing
Publish the final response back to the channel: return OutboundMessage(
channel = msg.channel,
chat_id = msg.chat_id,
content = final_content,
metadata = msg.metadata or {},
)
Iteration Logic
The iteration loop continues until one of these conditions is met:
Text Response LLM returns text without tool calls — task is complete
Max Iterations Reaches max_iterations limit (default: 40) — prevents infinite loops
Error Response LLM returns an error finish reason — handled gracefully
Task Cancelled User sends /stop command — all active tasks are cancelled
When the LLM returns tool calls:
if response.has_tool_calls:
# Show thinking/reasoning to user if provided
if on_progress:
thoughts = [
self ._strip_think(response.content),
response.reasoning_content,
# ... thinking blocks
]
combined_thoughts = " \n\n " .join( filter ( None , thoughts))
if combined_thoughts:
await on_progress(combined_thoughts)
await on_progress( self ._tool_hint(response.tool_calls), tool_hint = True )
# Add assistant message with tool calls
tool_call_dicts = [
{
"id" : tc.id,
"type" : "function" ,
"function" : {
"name" : tc.name,
"arguments" : json.dumps(tc.arguments, ensure_ascii = False )
}
}
for tc in response.tool_calls
]
messages = self .context.add_assistant_message(
messages, response.content, tool_call_dicts,
reasoning_content = response.reasoning_content,
thinking_blocks = response.thinking_blocks,
)
# Execute each tool call
for tool_call in response.tool_calls:
tools_used.append(tool_call.name)
result = await self .tools.execute(
tool_call.name,
tool_call.arguments
)
messages = self .context.add_tool_result(
messages, tool_call.id, tool_call.name, result
)
Tool results are added to the message history and sent back to the LLM in the next iteration.
Special Commands
The agent loop handles special slash commands:
Consolidates current session into memory and starts fresh: if cmd == "/new" :
lock = self ._consolidation_locks.setdefault(session.key, asyncio.Lock())
self ._consolidating.add(session.key)
try :
async with lock:
snapshot = session.messages[session.last_consolidated:]
if snapshot:
# Archive unconsolidated messages to memory
temp = Session( key = session.key)
temp.messages = list (snapshot)
if not await self ._consolidate_memory(temp, archive_all = True ):
return OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id,
content = "Memory archival failed..." ,
)
finally :
self ._consolidating.discard(session.key)
session.clear()
self .sessions.save(session)
self .sessions.invalidate(session.key)
return OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id,
content = "New session started."
)
/stop - Cancel Active Tasks
Cancels all running tasks and subagents for the session: async def _handle_stop ( self , msg : InboundMessage) -> None :
tasks = self ._active_tasks.pop(msg.session_key, [])
cancelled = sum ( 1 for t in tasks if not t.done() and t.cancel())
for t in tasks:
try :
await t
except (asyncio.CancelledError, Exception ):
pass
sub_cancelled = await self .subagents.cancel_by_session(msg.session_key)
total = cancelled + sub_cancelled
content = f "⏹ Stopped { total } task(s)." if total else "No active task to stop."
await self .bus.publish_outbound(OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id, content = content,
))
Returns available commands: if cmd == "/help" :
return OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id,
content = "🐈 nanobot commands: \n /new — Start a new conversation \n /stop — Stop the current task \n /help — Show available commands"
)
Memory Consolidation
When session history grows beyond memory_window, the agent automatically triggers background memory consolidation:
unconsolidated = len (session.messages) - session.last_consolidated
if (unconsolidated >= self .memory_window and
session.key not in self ._consolidating):
self ._consolidating.add(session.key)
lock = self ._consolidation_locks.setdefault(session.key, asyncio.Lock())
async def _consolidate_and_unlock ():
try :
async with lock:
await self ._consolidate_memory(session)
finally :
self ._consolidating.discard(session.key)
_task = asyncio.create_task(_consolidate_and_unlock())
self ._consolidation_tasks.add(_task)
Memory consolidation runs asynchronously without blocking the main agent loop.
Progress Streaming
The agent loop supports real-time progress updates to keep users informed during long operations:
async def _bus_progress ( content : str , * , tool_hint : bool = False ) -> None :
meta = dict (msg.metadata or {})
meta[ "_progress" ] = True
meta[ "_tool_hint" ] = tool_hint
await self .bus.publish_outbound(OutboundMessage(
channel = msg.channel,
chat_id = msg.chat_id,
content = content,
metadata = meta,
))
Progress messages show:
Thinking content : Reasoning/planning from the LLM
Tool hints : Which tools are being called (e.g., web_search("query"))
MCP Integration
The agent loop connects to MCP (Model Context Protocol) servers on startup:
async def _connect_mcp ( self ) -> None :
"""Connect to configured MCP servers (one-time, lazy)."""
if self ._mcp_connected or self ._mcp_connecting or not self ._mcp_servers:
return
self ._mcp_connecting = True
from nanobot.agent.tools.mcp import connect_mcp_servers
try :
self ._mcp_stack = AsyncExitStack()
await self ._mcp_stack. __aenter__ ()
await connect_mcp_servers(
self ._mcp_servers,
self .tools,
self ._mcp_stack
)
self ._mcp_connected = True
except Exception as e:
logger.error( "Failed to connect MCP servers: {} " , e)
finally :
self ._mcp_connecting = False
MCP servers are connected lazily on first use and reused for all subsequent messages.
Error Handling
The agent loop includes robust error handling:
async def _dispatch ( self , msg : InboundMessage) -> None :
async with self ._processing_lock:
try :
response = await self ._process_message(msg)
if response is not None :
await self .bus.publish_outbound(response)
elif msg.channel == "cli" :
# Send empty response for CLI to complete prompt
await self .bus.publish_outbound(OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id,
content = "" , metadata = msg.metadata or {},
))
except asyncio.CancelledError:
logger.info( "Task cancelled for session {} " , msg.session_key)
raise
except Exception :
logger.exception( "Error processing message for session {} " , msg.session_key)
await self .bus.publish_outbound(OutboundMessage(
channel = msg.channel, chat_id = msg.chat_id,
content = "Sorry, I encountered an error." ,
))
Direct Processing Mode
For CLI and cron usage, the agent loop provides a direct processing API:
async def process_direct (
self ,
content : str ,
session_key : str = "cli:direct" ,
channel : str = "cli" ,
chat_id : str = "direct" ,
on_progress : Callable[[ str ], Awaitable[ None ]] | None = None ,
) -> str :
"""Process a message directly (for CLI or cron usage)."""
await self ._connect_mcp()
msg = InboundMessage(
channel = channel,
sender_id = "user" ,
chat_id = chat_id,
content = content
)
response = await self ._process_message(
msg,
session_key = session_key,
on_progress = on_progress
)
return response.content if response else ""
Architecture Overall system design
Tools How tools are executed
Memory Session persistence and consolidation