Skip to main content

Overview

The Agent Orchestration Protocol (AOP) is a powerful framework for deploying and managing agents as distributed services. AOP enables agents to be discovered, managed, and executed through a standardized protocol built on the Model Context Protocol (MCP), making it perfect for building scalable, production-ready multi-agent systems.

Key Features

  • Distributed Deployment: Deploy agents as independent services accessible via MCP
  • Service Discovery: Automatic agent registration and discovery
  • Queue-Based Execution: Built-in task queuing for reliability and performance
  • Monitoring & Stats: Real-time queue statistics and performance metrics
  • Persistence Mode: Automatic restart and failsafe protection
  • Network Resilience: Connection monitoring and automatic retry capabilities

Architecture

AOP transforms agents into MCP tools that can be:
  • Accessed remotely via HTTP or other transports
  • Discovered through standardized protocols
  • Managed centrally with queue-based task execution
  • Monitored for performance and health
┌─────────────────────────────────────────────┐
│           AOP Server (MCP)                  │
├─────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐          │
│  │ Agent 1     │  │ Agent 2     │          │
│  │ (as Tool)   │  │ (as Tool)   │   ...    │
│  └─────────────┘  └─────────────┘          │
│         ↓                ↓                  │
│  ┌─────────────┐  ┌─────────────┐          │
│  │ Task Queue  │  │ Task Queue  │          │
│  │ + Workers   │  │ + Workers   │          │
│  └─────────────┘  └─────────────┘          │
└─────────────────────────────────────────────┘

Basic Usage

Creating an AOP Server

from swarms import Agent
from swarms.structs.aop import AOP

# Create specialized agents
research_agent = Agent(
    agent_name="Research-Agent",
    agent_description="Expert in research and data collection",
    model_name="gpt-4o",
    max_loops=1,
)

analysis_agent = Agent(
    agent_name="Analysis-Agent",
    agent_description="Expert in data analysis and insights",
    model_name="gpt-4o",
    max_loops=1,
)

# Create AOP server with queue-based execution
deployer = AOP(
    server_name="ResearchCluster",
    description="A cluster of research and analysis agents",
    port=8000,
    verbose=True,
    queue_enabled=True,  # Enable task queuing
    max_workers_per_agent=2,  # Concurrent workers per agent
)

# Add agents to the server
deployer.add_agent(
    agent=research_agent,
    tool_name="research_tool",
    tool_description="Research and data collection tool",
    timeout=30,
    max_retries=3
)

deployer.add_agent(
    agent=analysis_agent,
    tool_name="analysis_tool",
    tool_description="Data analysis and insights tool",
    timeout=30,
    max_retries=3
)

# Start the server
deployer.run()

Adding Multiple Agents in Batch

from swarms import Agent
from swarms.structs.aop import AOP

# Create multiple agents
agents = [
    Agent(
        agent_name=f"Worker-{i}",
        agent_description=f"Specialized worker agent {i}",
        model_name="gpt-4o",
        max_loops=1,
    )
    for i in range(5)
]

# Create AOP server
deployer = AOP(
    server_name="WorkerCluster",
    agents=agents,  # Pass agents at initialization
    port=8000,
    queue_enabled=True,
)

# Or add them after initialization
# deployer.add_agents_batch(
#     agents=agents,
#     tool_names=[f"worker_{i}" for i in range(5)],
#     timeouts=[30] * 5,
#     max_retries_list=[3] * 5,
# )

deployer.run()

Task Queue System

AOP includes a powerful task queue system for reliable, scalable task execution.

Queue Features

  • Priority-based queuing: Tasks with higher priority execute first
  • Automatic retries: Failed tasks retry automatically with backoff
  • Worker threads: Multiple workers process tasks concurrently
  • Task tracking: Monitor task status (pending, processing, completed, failed)
  • Statistics: Real-time metrics on queue performance

Task Lifecycle

from swarms.structs.aop import TaskStatus

# Task flows through these states:
# PENDING -> PROCESSING -> COMPLETED
#                      -> FAILED (with retries)
#                      -> CANCELLED

Queue Configuration

deployer = AOP(
    server_name="ProductionCluster",
    queue_enabled=True,
    max_workers_per_agent=5,  # 5 concurrent workers
    max_queue_size_per_agent=1000,  # Queue capacity
    processing_timeout=60,  # Task timeout in seconds
    retry_delay=1.0,  # Delay between retries
)

Monitoring and Statistics

Getting Queue Statistics

# Get stats for a specific agent
stats = deployer.get_queue_stats(tool_name="research_tool")
print(stats)
# {
#     "success": True,
#     "agent_name": "research_tool",
#     "stats": {
#         "total_tasks": 150,
#         "completed_tasks": 142,
#         "failed_tasks": 3,
#         "pending_tasks": 5,
#         "processing_tasks": 2,
#         "average_processing_time": 2.34,
#         "queue_size": 5,
#         "queue_status": "running"
#     }
# }

# Get stats for all agents
all_stats = deployer.get_queue_stats()

Agent Discovery

# List all registered agents
agent_names = deployer.list_agents()
print(f"Registered agents: {agent_names}")

# Get detailed info about an agent
info = deployer.get_agent_info("research_tool")
print(info)
# {
#     "tool_name": "research_tool",
#     "agent_name": "Research-Agent",
#     "agent_description": "Expert in research and data collection",
#     "model_name": "gpt-4o",
#     "timeout": 30,
#     "max_retries": 3,
#     "verbose": True
# }

Production Features

Persistence Mode

Enable automatic restart and failsafe protection:
deployer = AOP(
    server_name="ProductionCluster",
    persistence=True,  # Enable persistence mode
    max_restart_attempts=10,  # Max restart attempts
    restart_delay=5.0,  # Delay between restarts (seconds)
)

Network Monitoring

Automatic network connection monitoring and retry:
deployer = AOP(
    server_name="ProductionCluster",
    network_monitoring=True,  # Enable network monitoring
    max_network_retries=5,  # Max reconnection attempts
    network_retry_delay=10.0,  # Delay between retries
    network_timeout=30.0,  # Connection timeout
)

Error Handling and Logging

deployer = AOP(
    server_name="ProductionCluster",
    verbose=True,  # Enable verbose logging
    traceback_enabled=True,  # Enable detailed tracebacks
    log_level="INFO",  # Set log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
)

Transport Options

AOP supports multiple transport types for different deployment scenarios:
# HTTP transport (default)
deployer = AOP(
    server_name="HTTPCluster",
    transport="streamable-http",
    host="localhost",
    port=8000,
)

# Start the server
deployer.run()

Advanced Configuration

Custom Tool Schemas

# Define custom input/output schemas
input_schema = {
    "type": "object",
    "properties": {
        "query": {"type": "string", "description": "Search query"},
        "filters": {"type": "object", "description": "Search filters"},
    },
    "required": ["query"],
}

output_schema = {
    "type": "object",
    "properties": {
        "results": {"type": "array", "description": "Search results"},
        "total": {"type": "integer", "description": "Total results"},
    },
}

deployer.add_agent(
    agent=search_agent,
    tool_name="search_tool",
    input_schema=input_schema,
    output_schema=output_schema,
)

Per-Agent Configuration

# Different settings for different agents
deployer.add_agent(
    agent=fast_agent,
    tool_name="fast_tool",
    timeout=10,  # Quick timeout
    max_retries=1,  # Minimal retries
    verbose=False,  # Quiet
)

deployer.add_agent(
    agent=critical_agent,
    tool_name="critical_tool",
    timeout=300,  # Long timeout
    max_retries=10,  # Many retries
    verbose=True,  # Detailed logging
)

Best Practices

1. Use Queue-Based Execution

Always enable queuing for production deployments:
deployer = AOP(
    queue_enabled=True,
    max_workers_per_agent=3,  # Adjust based on load
)

2. Set Appropriate Timeouts

Configure timeouts based on agent complexity:
# Quick agents
deployer.add_agent(agent, timeout=15)

# Complex agents
deployer.add_agent(agent, timeout=120)

3. Monitor Performance

Regularly check queue statistics:
import time

while True:
    stats = deployer.get_queue_stats()
    print(f"Queue stats: {stats}")
    time.sleep(60)  # Check every minute

4. Handle Graceful Shutdown

import signal

def shutdown_handler(signum, frame):
    print("Shutting down gracefully...")
    # Stop all queue workers
    for queue in deployer.task_queues.values():
        queue.stop_workers()
    exit(0)

signal.signal(signal.SIGINT, shutdown_handler)
deployer.run()

5. Use Agent Tags and Capabilities

agent = Agent(
    agent_name="Research-Agent",
    tags=["research", "data-collection", "analysis"],
    capabilities=["web-search", "data-gathering", "report-generation"],
    role="researcher",
)

Build docs developers (and LLMs) love