Skip to main content

Overview

The AOP (Agent Orchestration Platform) class enables you to deploy multiple Swarms agents as individual tools in an MCP server. It provides production-ready features including queue-based task execution, automatic restart capabilities, network monitoring, and comprehensive error handling.

Constructor

Create an AOP instance to manage and deploy agents as MCP tools.
from swarms.structs.aop import AOP
from swarms import Agent

aop = AOP(
    server_name="Production Swarm",
    description="Multi-agent production cluster",
    agents=[agent1, agent2, agent3],
    port=8000,
    queue_enabled=True,
    persistence=True,
    verbose=True
)

Parameters

server_name
str
default:"AOP Cluster"
Name for the MCP server
description
str
Description of the AOP cluster
agents
any
default:"None"
Optional list of agents to add initially
port
int
default:"8000"
Port for the MCP server
transport
str
default:"streamable-http"
Transport type for the MCP server
verbose
bool
default:"False"
Enable verbose logging
traceback_enabled
bool
default:"True"
Enable traceback logging for errors
host
str
default:"localhost"
Host to bind the server to
queue_enabled
bool
default:"True"
Enable queue-based task execution
max_workers_per_agent
int
default:"1"
Maximum number of workers per agent
max_queue_size_per_agent
int
default:"1000"
Maximum queue size per agent
processing_timeout
int
default:"30"
Timeout for task processing in seconds
retry_delay
float
default:"1.0"
Delay between retries in seconds
persistence
bool
default:"False"
Enable automatic restart on shutdown (with failsafe)
max_restart_attempts
int
default:"10"
Maximum number of restart attempts before giving up
restart_delay
float
default:"5.0"
Delay between restart attempts in seconds
network_monitoring
bool
default:"True"
Enable network connection monitoring and retry
max_network_retries
int
default:"5"
Maximum number of network reconnection attempts
network_retry_delay
float
default:"10.0"
Delay between network retry attempts in seconds
log_level
Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
default:"INFO"
Logging level

Methods

add_agent

Add a single agent to the MCP server as a tool.
tool_name = aop.add_agent(
    agent=financial_agent,
    tool_name="financial_analyzer",
    tool_description="Analyzes financial data and generates reports",
    timeout=60,
    max_retries=3,
    verbose=True
)
agent
AgentType
required
The Swarms Agent instance to deploy
tool_name
str
default:"None"
Name for the tool (defaults to agent.agent_name)
tool_description
str
default:"None"
Description of the tool (defaults to agent.agent_description)
input_schema
Dict[str, Any]
default:"None"
JSON schema for input parameters
output_schema
Dict[str, Any]
default:"None"
JSON schema for output
timeout
int
default:"30"
Maximum execution time in seconds
max_retries
int
default:"3"
Number of retries on failure
verbose
bool
default:"None"
Enable verbose logging for this tool (defaults to deployer’s verbose setting)
traceback_enabled
bool
default:"None"
Enable traceback logging for this tool
return
str
The tool name that was registered
Raises:
  • ValueError: If agent is None or tool_name already exists

add_agents_batch

Add multiple agents to the MCP server in batch.
tool_names = aop.add_agents_batch(
    agents=[agent1, agent2, agent3],
    tool_names=["analyzer", "researcher", "writer"],
    tool_descriptions=[
        "Analyzes data",
        "Researches topics",
        "Writes reports"
    ],
    timeouts=[30, 60, 45],
    verbose_list=[True, True, False]
)
agents
List[Agent]
required
List of Swarms Agent instances
tool_names
List[str]
default:"None"
Optional list of tool names (defaults to agent names)
tool_descriptions
List[str]
default:"None"
Optional list of tool descriptions
input_schemas
List[Dict[str, Any]]
default:"None"
Optional list of input schemas
output_schemas
List[Dict[str, Any]]
default:"None"
Optional list of output schemas
timeouts
List[int]
default:"None"
Optional list of timeout values
max_retries_list
List[int]
default:"None"
Optional list of max retry values
verbose_list
List[bool]
default:"None"
Optional list of verbose settings for each agent
traceback_enabled_list
List[bool]
default:"None"
Optional list of traceback settings for each agent
return
List[str]
List of tool names that were registered
Raises:
  • ValueError: If agents list is empty or contains None values

remove_agent

Remove an agent from the MCP server.
success = aop.remove_agent("financial_analyzer")
tool_name
str
required
Name of the tool to remove
return
bool
True if agent was removed, False if not found

list_agents

Get a list of all registered agent tool names.
agent_names = aop.list_agents()
print(f"Active agents: {agent_names}")
return
List[str]
List of tool names

get_agent_info

Get detailed information about a specific agent tool.
info = aop.get_agent_info("financial_analyzer")
print(info)
# {
#   "tool_name": "financial_analyzer",
#   "agent_name": "FinancialAgent",
#   "agent_description": "...",
#   "model_name": "gpt-4",
#   "max_loops": 3,
#   "timeout": 60,
#   "max_retries": 3,
#   "verbose": True
# }
tool_name
str
required
Name of the tool
return
Optional[Dict[str, Any]]
Dictionary containing agent information, or None if not found

get_queue_stats

Get queue statistics for agents.
# Stats for specific agent
stats = aop.get_queue_stats(tool_name="financial_analyzer")

# Stats for all agents
all_stats = aop.get_queue_stats()

print(stats)
# {
#   "success": True,
#   "agent_name": "financial_analyzer",
#   "stats": {
#     "total_tasks": 150,
#     "completed_tasks": 145,
#     "failed_tasks": 2,
#     "pending_tasks": 3,
#     "processing_tasks": 1,
#     "average_processing_time": 2.34,
#     "queue_size": 3,
#     "queue_status": "running"
#   }
# }
tool_name
str
default:"None"
Optional specific agent name. If None, returns stats for all agents.
return
Dict[str, Any]
Dictionary containing queue statistics

Queue Management

When queue_enabled=True, each agent gets its own task queue with the following features:

TaskQueue Features

  1. Priority-based execution: Tasks can be assigned priorities
  2. Automatic retries: Failed tasks are automatically retried
  3. Worker threads: Background workers process tasks concurrently
  4. Statistics tracking: Comprehensive metrics on task execution
  5. Pause/resume: Queues can be paused and resumed

Queue States

  • RUNNING: Queue is actively processing tasks
  • PAUSED: Queue is paused, workers wait for resume
  • STOPPED: Queue is stopped, workers are terminated

Task States

  • PENDING: Task is waiting in queue
  • PROCESSING: Task is currently being executed
  • COMPLETED: Task completed successfully
  • FAILED: Task failed after max retries
  • CANCELLED: Task was cancelled

Persistence & Network Monitoring

Persistence Mode

When persistence=True:
  • Server automatically restarts on shutdown
  • Configurable restart attempts and delays
  • Failsafe protection prevents infinite restart loops

Network Monitoring

When network_monitoring=True:
  • Automatic detection of network issues
  • Retry logic for network failures
  • Configurable retry attempts and delays

Complete Example

from swarms import Agent
from swarms.structs.aop import AOP

# Create specialized agents
financial_agent = Agent(
    agent_name="Financial-Analyst",
    system_prompt="You are a financial analysis expert...",
    model_name="gpt-4",
    max_loops=3,
    verbose=True
)

research_agent = Agent(
    agent_name="Research-Specialist",
    system_prompt="You are a research expert...",
    model_name="gpt-4",
    max_loops=2,
    verbose=True
)

writing_agent = Agent(
    agent_name="Content-Writer",
    system_prompt="You are a content writing expert...",
    model_name="gpt-4",
    max_loops=1,
    verbose=False
)

# Deploy all agents in an AOP cluster
aop = AOP(
    server_name="Production Analysis Swarm",
    description="Multi-agent cluster for financial analysis",
    agents=[financial_agent, research_agent, writing_agent],
    port=8000,
    host="0.0.0.0",
    queue_enabled=True,
    max_workers_per_agent=2,
    max_queue_size_per_agent=100,
    processing_timeout=120,
    persistence=True,
    max_restart_attempts=5,
    network_monitoring=True,
    log_level="INFO",
    verbose=True,
    traceback_enabled=True
)

# Add another agent dynamically
data_agent = Agent(
    agent_name="Data-Processor",
    system_prompt="You process and analyze data...",
    model_name="gpt-4"
)

aop.add_agent(
    agent=data_agent,
    tool_name="data_processor",
    timeout=60,
    max_retries=5
)

# Monitor queue statistics
stats = aop.get_queue_stats()
for agent_name, agent_stats in stats["stats"].items():
    print(f"{agent_name}: {agent_stats['completed_tasks']} tasks completed")

# Get agent information
info = aop.get_agent_info("financial_analyzer")
print(f"Model: {info['model_name']}")
print(f"Max loops: {info['max_loops']}")

# List all active agents
agents = aop.list_agents()
print(f"Active agents: {', '.join(agents)}")

# The server automatically handles:
# - Queue management and task execution
# - Automatic retries on failures
# - Network monitoring and recovery
# - Server persistence and restarts
# - Comprehensive error logging

Best Practices

  1. Enable queuing: Use queue_enabled=True for production reliability
  2. Set timeouts: Configure appropriate timeouts based on task complexity
  3. Monitor queues: Regularly check queue statistics to identify bottlenecks
  4. Use persistence: Enable persistence mode for critical production deployments
  5. Configure workers: Adjust max_workers_per_agent based on load requirements
  6. Enable logging: Use verbose=True and appropriate log_level for debugging
  7. Handle failures: Set max_retries appropriately for your use case
  8. Network resilience: Enable network_monitoring for unstable connections
  9. Gradual scaling: Start with fewer agents and scale up based on metrics
  10. Health monitoring: Regular check get_queue_stats() for system health

Error Handling

The AOP class implements comprehensive error handling:
try:
    aop.add_agent(agent=my_agent)
except ValueError as e:
    print(f"Configuration error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
    if aop.traceback_enabled:
        import traceback
        traceback.print_exc()

Performance Tuning

# High-throughput configuration
aop = AOP(
    max_workers_per_agent=5,
    max_queue_size_per_agent=1000,
    processing_timeout=300,
    retry_delay=0.5,
    queue_enabled=True
)

# Low-latency configuration
aop = AOP(
    max_workers_per_agent=1,
    processing_timeout=10,
    retry_delay=0.1,
    queue_enabled=True
)

# High-reliability configuration  
aop = AOP(
    persistence=True,
    max_restart_attempts=10,
    network_monitoring=True,
    max_network_retries=10,
    traceback_enabled=True
)

Build docs developers (and LLMs) love