Skip to main content
Subagents enable background task execution, allowing the main agent to spawn independent tasks that run asynchronously and report back when complete.

Overview

When you need to perform a long-running operation (like running tests, generating reports, or fetching data from multiple sources), subagents execute the task in the background while the main agent remains responsive. Key features:
  • Non-blocking background execution
  • Full agent capabilities (tools, LLM reasoning)
  • Automatic result announcement to the main agent
  • Session-based task tracking and cancellation
  • Separate tool registry (no message/spawn tools to prevent recursion)
  • Configurable iteration limits and timeouts

Architecture

From nanobot/agent/subagent.py:21-51:
class SubagentManager:
    """Manages background subagent execution."""
    
    def __init__(
        self,
        provider: LLMProvider,
        workspace: Path,
        bus: MessageBus,
        model: str | None = None,
        temperature: float = 0.7,
        max_tokens: int = 4096,
        reasoning_effort: str | None = None,
        brave_api_key: str | None = None,
        web_proxy: str | None = None,
        exec_config: "ExecToolConfig | None" = None,
        restrict_to_workspace: bool = False,
    ):
        # ...
        self._running_tasks: dict[str, asyncio.Task[None]] = {}
        self._session_tasks: dict[str, set[str]] = {}  # session_key -> {task_id, ...}
Each subagent runs in its own asyncio task with:
  • Task ID: Short UUID for tracking (8 characters)
  • Label: Human-readable description
  • Origin: Channel and chat_id for result delivery
  • Tool access: Read, write, edit, list, exec, web search, web fetch
  • No message tool: Prevents sending intermediate updates
  • No spawn tool: Prevents recursive subagent creation

Spawning a Subagent

From nanobot/agent/subagent.py:53-83:
async def spawn(
    self,
    task: str,
    label: str | None = None,
    origin_channel: str = "cli",
    origin_chat_id: str = "direct",
    session_key: str | None = None,
) -> str:
    """Spawn a subagent to execute a task in the background."""
    task_id = str(uuid.uuid4())[:8]
    display_label = label or task[:30] + ("..." if len(task) > 30 else "")
    origin = {"channel": origin_channel, "chat_id": origin_chat_id}
    
    bg_task = asyncio.create_task(
        self._run_subagent(task_id, task, display_label, origin)
    )
    self._running_tasks[task_id] = bg_task
    if session_key:
        self._session_tasks.setdefault(session_key, set()).add(task_id)
    
    def _cleanup(_: asyncio.Task) -> None:
        self._running_tasks.pop(task_id, None)
        if session_key and (ids := self._session_tasks.get(session_key)):
            ids.discard(task_id)
            if not ids:
                del self._session_tasks[session_key]
    
    bg_task.add_done_callback(_cleanup)
    
    logger.info("Spawned subagent [{}]: {}", task_id, display_label)
    return f"Subagent [{display_label}] started (id: {task_id}). I'll notify you when it completes."

Tool Registry

Subagents have a restricted tool set to prevent unwanted side effects (nanobot/agent/subagent.py:96-110):
# Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry()
allowed_dir = self.workspace if self.restrict_to_workspace else None
tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ExecTool(
    working_dir=str(self.workspace),
    timeout=self.exec_config.timeout,
    restrict_to_workspace=self.restrict_to_workspace,
    path_append=self.exec_config.path_append,
))
tools.register(WebSearchTool(api_key=self.brave_api_key, proxy=self.web_proxy))
tools.register(WebFetchTool(proxy=self.web_proxy))
Available tools:
  • File operations: read, write, edit, list
  • Shell execution: run commands
  • Web: search and fetch
Excluded tools:
  • Message sending (prevents spamming channels)
  • Spawn (prevents recursive subagent creation)

Execution Loop

From nanobot/agent/subagent.py:119-167:
# Run agent loop (limited iterations)
max_iterations = 15
iteration = 0
final_result: str | None = None

while iteration < max_iterations:
    iteration += 1
    
    response = await self.provider.chat(
        messages=messages,
        tools=tools.get_definitions(),
        model=self.model,
        temperature=self.temperature,
        max_tokens=self.max_tokens,
        reasoning_effort=self.reasoning_effort,
    )
    
    if response.has_tool_calls:
        # Add assistant message with tool calls
        tool_call_dicts = [
            {
                "id": tc.id,
                "type": "function",
                "function": {
                    "name": tc.name,
                    "arguments": json.dumps(tc.arguments, ensure_ascii=False),
                },
            }
            for tc in response.tool_calls
        ]
        messages.append({
            "role": "assistant",
            "content": response.content or "",
            "tool_calls": tool_call_dicts,
        })
        
        # Execute tools
        for tool_call in response.tool_calls:
            args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
            logger.debug("Subagent [{}] executing: {} with arguments: {}", task_id, tool_call.name, args_str)
            result = await tools.execute(tool_call.name, tool_call.arguments)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "name": tool_call.name,
                "content": result,
            })
    else:
        final_result = response.content
        break

if final_result is None:
    final_result = "Task completed but no final response was generated."

Result Announcement

When a subagent completes, it announces the result to the main agent via the message bus (nanobot/agent/subagent.py:180-210):
async def _announce_result(
    self,
    task_id: str,
    label: str,
    task: str,
    result: str,
    origin: dict[str, str],
    status: str,
) -> None:
    """Announce the subagent result to the main agent via the message bus."""
    status_text = "completed successfully" if status == "ok" else "failed"
    
    announce_content = f"""[Subagent '{label}' {status_text}]

Task: {task}

Result:
{result}

Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs."""
    
    # Inject as system message to trigger main agent
    msg = InboundMessage(
        channel="system",
        sender_id="subagent",
        chat_id=f"{origin['channel']}:{origin['chat_id']}",
        content=announce_content,
    )
    
    await self.bus.publish_inbound(msg)
    logger.debug("Subagent [{}] announced result to {}:{}", task_id, origin['channel'], origin['chat_id'])
The main agent receives this as a system message and can summarize the results for the user.

System Prompt

Subagents use a focused prompt (nanobot/agent/subagent.py:212-232):
def _build_subagent_prompt(self) -> str:
    """Build a focused system prompt for the subagent."""
    from nanobot.agent.context import ContextBuilder
    from nanobot.agent.skills import SkillsLoader
    
    time_ctx = ContextBuilder._build_runtime_context(None, None)
    parts = [f"""# Subagent

{time_ctx}

You are a subagent spawned by the main agent to complete a specific task.
Stay focused on the assigned task. Your final response will be reported back to the main agent.

## Workspace
{self.workspace}"""]
    
    skills_summary = SkillsLoader(self.workspace).build_skills_summary()
    if skills_summary:
        parts.append(f"## Skills\n\nRead SKILL.md with read_file to use a skill.\n\n{skills_summary}")
    
    return "\n\n".join(parts)

Usage Examples

1. Running Tests

manager = SubagentManager(
    provider=llm_provider,
    workspace=Path("~/project"),
    bus=message_bus,
    model="anthropic/claude-opus-4-5",
)

message = await manager.spawn(
    task="Run all unit tests and report any failures with details",
    label="Run tests",
    origin_channel="telegram",
    origin_chat_id="123456789",
    session_key="user:telegram:123456789",
)

print(message)  # "Subagent [Run tests] started (id: a1b2c3d4). I'll notify you when it completes."

2. Data Collection

await manager.spawn(
    task="Fetch weather data for San Francisco, New York, and London. Compare temperatures and summarize trends.",
    label="Weather comparison",
    origin_channel="discord",
    origin_chat_id="987654321",
)

3. Report Generation

await manager.spawn(
    task="Analyze Git commits from the last week, categorize by type (feature, bugfix, refactor), and generate a markdown report",
    label="Weekly commit report",
    origin_channel="slack",
    origin_chat_id="#engineering",
)

4. File Processing

await manager.spawn(
    task="Read all CSV files in the data/ directory, validate schema, and report any inconsistencies",
    label="CSV validation",
    origin_channel="cli",
    origin_chat_id="direct",
)

Session Management

Tracking Running Tasks

# Get count of running subagents
count = manager.get_running_count()
print(f"Currently running: {count} subagents")

Canceling Tasks by Session

# Cancel all subagents for a specific session
cancelled = await manager.cancel_by_session("user:telegram:123456789")
print(f"Cancelled {cancelled} tasks")
From nanobot/agent/subagent.py:234-242:
async def cancel_by_session(self, session_key: str) -> int:
    """Cancel all subagents for the given session. Returns count cancelled."""
    tasks = [self._running_tasks[tid] for tid in self._session_tasks.get(session_key, [])
             if tid in self._running_tasks and not self._running_tasks[tid].done()]
    for t in tasks:
        t.cancel()
    if tasks:
        await asyncio.gather(*tasks, return_exceptions=True)
    return len(tasks)

Use Cases

1. Long-Running Analysis

Analyzing large datasets without blocking the main conversation:
await manager.spawn(
    task="Analyze all log files in /var/log, extract error patterns, and generate a summary report with recommendations",
    label="Log analysis",
)

2. Multi-Step Workflows

Complex tasks that require multiple tool calls:
await manager.spawn(
    task="1) Clone repo, 2) Run linter, 3) Run tests, 4) Generate coverage report, 5) Summarize findings",
    label="CI pipeline",
)

3. Parallel Research

Gathering information from multiple sources:
await manager.spawn(
    task="Research best practices for API rate limiting from 5 different sources and synthesize recommendations",
    label="Rate limiting research",
)

4. Background Monitoring

Continuous monitoring without blocking:
await manager.spawn(
    task="Monitor server CPU and memory usage for 10 minutes, alert if thresholds exceeded",
    label="Server monitoring",
)

Configuration

ParameterTypeDefaultDescription
providerLLMProviderrequiredLLM provider for subagent reasoning
workspacePathrequiredWorking directory for file operations
busMessageBusrequiredMessage bus for result announcements
modelstrprovider defaultModel to use for subagent
temperaturefloat0.7Sampling temperature
max_tokensint4096Max tokens per response
reasoning_effortstrNoneReasoning effort level (provider-specific)
brave_api_keystrNoneAPI key for web search
web_proxystrNoneProxy for web requests
exec_configExecToolConfigdefaultShell execution configuration
restrict_to_workspaceboolFalseLimit file/shell access to workspace

Best Practices

  1. Use descriptive labels: Labels appear in the status message and help track tasks
  2. Set session keys: Enable session-based cancellation for user-initiated tasks
  3. Keep tasks focused: Break complex workflows into smaller subagent tasks
  4. Monitor running count: Limit concurrent subagents to avoid resource exhaustion
  5. Handle failures gracefully: Subagents can fail; main agent should handle error announcements
  6. Use workspace restriction: Set restrict_to_workspace=True in production to sandbox file access
  7. Set appropriate iteration limits: Default 15 iterations prevents infinite loops

Limitations

  • No message tool: Subagents cannot send intermediate updates to channels
  • No nested spawning: Subagents cannot spawn other subagents
  • Fixed iteration limit: Maximum 15 LLM calls per subagent
  • No progress tracking: Main agent only receives final result or error
  • Resource usage: Each subagent consumes memory and tokens; monitor usage

Build docs developers (and LLMs) love