Overview
Deploying multi-agent systems in production requires careful attention to error handling, retries, logging, monitoring, and system design. This guide covers the essential best practices for running Swarms agents reliably at scale.Error Handling
Agent Error Hierarchy
Swarms provides a comprehensive exception hierarchy for different failure modes:from swarms.structs.agent import (
AgentError, # Base exception
AgentInitializationError, # Initialization failures
AgentRunError, # Runtime failures
AgentLLMError, # LLM-related errors
AgentToolError, # Tool execution errors
AgentMemoryError, # Memory errors
AgentToolExecutionError, # Tool execution failures
)
try:
agent = Agent(
agent_name="Production-Agent",
model_name="gpt-4o",
max_loops=3,
)
result = agent.run("Process this task")
except AgentInitializationError as e:
logger.error(f"Failed to initialize agent: {e}")
# Handle initialization failure (e.g., retry with different config)
except AgentLLMError as e:
logger.error(f"LLM error: {e}")
# Handle LLM failures (e.g., switch to fallback model)
except AgentToolError as e:
logger.error(f"Tool error: {e}")
# Handle tool failures (e.g., disable problematic tool)
except AgentRunError as e:
logger.error(f"Runtime error: {e}")
# Handle general runtime errors
except Exception as e:
logger.error(f"Unexpected error: {e}")
# Catch-all for unexpected errors
Graceful Error Recovery
import traceback
from loguru import logger
def run_agent_with_recovery(agent, task, max_retries=3):
"""
Run agent with automatic recovery from transient errors.
"""
for attempt in range(max_retries):
try:
result = agent.run(task)
return result
except (AgentLLMError, AgentRunError) as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
logger.info(f"Retrying in {2 ** attempt} seconds...")
time.sleep(2 ** attempt) # Exponential backoff
else:
logger.error(f"All {max_retries} attempts failed")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
logger.error(traceback.format_exc())
raise
Retry Strategies
Built-in Retry Configuration
from swarms import Agent
agent = Agent(
agent_name="Resilient-Agent",
model_name="gpt-4o",
retry_attempts=5, # Number of retry attempts
retry_interval=2, # Initial retry interval (seconds)
max_loops=3,
)
Exponential Backoff
import time
import random
def exponential_backoff_retry(
func,
max_retries=5,
base_delay=1,
max_delay=60,
jitter=True
):
"""
Execute function with exponential backoff retry.
"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random()) # Add jitter
logger.warning(
f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)
# Usage
result = exponential_backoff_retry(
lambda: agent.run("Complex task"),
max_retries=5,
base_delay=1,
max_delay=60,
)
Fallback Models
Use fallback models for resilience:agent = Agent(
agent_name="Resilient-Agent",
fallback_models=[
"gpt-4o", # Primary model
"gpt-4o-mini", # First fallback
"gpt-3.5-turbo", # Second fallback
],
max_loops=3,
)
# Agent automatically tries fallback models if primary fails
result = agent.run("Generate a report")
Logging
Structured Logging with Loguru
Swarms uses Loguru for powerful, structured logging:from loguru import logger
import sys
# Configure logging for production
logger.remove() # Remove default handler
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
colorize=True,
)
# Add file logging with rotation
logger.add(
"logs/agent_{time}.log",
rotation="500 MB", # Rotate when file reaches 500MB
retention="10 days", # Keep logs for 10 days
compression="zip", # Compress rotated logs
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)
# Add error-only log file
logger.add(
"logs/errors_{time}.log",
rotation="100 MB",
retention="30 days",
compression="zip",
level="ERROR",
filter=lambda record: record["level"].name == "ERROR",
)
Agent-Specific Logging
# Enable verbose logging for specific agents
agent = Agent(
agent_name="Debug-Agent",
model_name="gpt-4o",
verbose=True, # Enable verbose logging
print_on=True, # Print outputs to console
)
# Control logging in AOP
from swarms.structs.aop import AOP
deployer = AOP(
server_name="ProductionCluster",
verbose=True,
traceback_enabled=True, # Enable detailed tracebacks
log_level="INFO", # Set log level
)
Contextual Logging
from loguru import logger
import contextvars
# Create context var for request ID
request_id_var = contextvars.ContextVar('request_id', default='unknown')
# Add request ID to all logs
logger.configure(
patcher=lambda record: record.update(
request_id=request_id_var.get()
)
)
def process_request(request_id, task):
request_id_var.set(request_id)
logger.info(f"Processing task: {task}")
try:
result = agent.run(task)
logger.info(f"Task completed successfully")
return result
except Exception as e:
logger.error(f"Task failed: {e}")
raise
Monitoring
Performance Metrics
import time
from loguru import logger
class AgentMetrics:
def __init__(self):
self.total_requests = 0
self.successful_requests = 0
self.failed_requests = 0
self.total_duration = 0.0
def record_request(self, success: bool, duration: float):
self.total_requests += 1
if success:
self.successful_requests += 1
else:
self.failed_requests += 1
self.total_duration += duration
def get_metrics(self):
avg_duration = (
self.total_duration / self.total_requests
if self.total_requests > 0 else 0
)
success_rate = (
self.successful_requests / self.total_requests
if self.total_requests > 0 else 0
)
return {
"total_requests": self.total_requests,
"successful_requests": self.successful_requests,
"failed_requests": self.failed_requests,
"success_rate": success_rate,
"avg_duration": avg_duration,
}
metrics = AgentMetrics()
def monitored_agent_run(agent, task):
start_time = time.time()
success = False
try:
result = agent.run(task)
success = True
return result
except Exception as e:
logger.error(f"Agent run failed: {e}")
raise
finally:
duration = time.time() - start_time
metrics.record_request(success, duration)
logger.info(f"Request completed in {duration:.2f}s")
Health Checks
def health_check(agent):
"""
Perform health check on agent.
"""
try:
# Quick test run
result = agent.run("Hello", max_loops=1)
return {
"status": "healthy",
"agent_name": agent.agent_name,
"model": agent.model_name,
"timestamp": time.time(),
}
except Exception as e:
return {
"status": "unhealthy",
"agent_name": agent.agent_name,
"error": str(e),
"timestamp": time.time(),
}
# Periodic health checks
import threading
def periodic_health_check(agent, interval=60):
def check():
while True:
status = health_check(agent)
logger.info(f"Health check: {status}")
time.sleep(interval)
thread = threading.Thread(target=check, daemon=True)
thread.start()
Configuration Management
Environment-Based Configuration
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Production configuration
PROD_CONFIG = {
"model_name": os.getenv("MODEL_NAME", "gpt-4o"),
"max_loops": int(os.getenv("MAX_LOOPS", "3")),
"retry_attempts": int(os.getenv("RETRY_ATTEMPTS", "5")),
"timeout": int(os.getenv("TIMEOUT", "120")),
"verbose": os.getenv("VERBOSE", "false").lower() == "true",
}
agent = Agent(
agent_name="Production-Agent",
**PROD_CONFIG,
)
Configuration Validation
from pydantic import BaseModel, Field, validator
class AgentConfig(BaseModel):
agent_name: str = Field(..., min_length=1)
model_name: str
max_loops: int = Field(default=3, ge=1, le=100)
retry_attempts: int = Field(default=3, ge=0, le=10)
timeout: int = Field(default=120, ge=1)
@validator('model_name')
def validate_model(cls, v):
allowed_models = ["gpt-4o", "gpt-4o-mini", "claude-sonnet-3.5"]
if v not in allowed_models:
raise ValueError(f"Model must be one of {allowed_models}")
return v
# Load and validate config
config = AgentConfig(
agent_name="Production-Agent",
model_name="gpt-4o",
max_loops=5,
)
agent = Agent(**config.dict())
Security Best Practices
API Key Management
import os
from cryptography.fernet import Fernet
# Never hardcode API keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
# Use environment variables or secure vaults
agent = Agent(
model_name="gpt-4o",
llm_api_key=OPENAI_API_KEY,
)
Input Validation
import re
def sanitize_input(text: str) -> str:
"""
Sanitize user input to prevent injection attacks.
"""
# Remove control characters
text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
# Limit length
max_length = 10000
if len(text) > max_length:
text = text[:max_length]
return text.strip()
# Use sanitized input
user_task = sanitize_input(request.get("task"))
result = agent.run(user_task)
Safety Prompts
agent = Agent(
agent_name="Safe-Agent",
model_name="gpt-4o",
safety_prompt_on=True, # Enable safety guardrails
)
Resource Management
Memory Management
# Truncate conversation history to manage memory
agent = Agent(
agent_name="Memory-Managed-Agent",
model_name="gpt-4o",
context_length=8000, # Context window size
)
# Manual truncation
agent.truncate_history(max_messages=50)
Connection Pooling
from concurrent.futures import ThreadPoolExecutor
# Use thread pool for concurrent requests
executor = ThreadPoolExecutor(max_workers=10)
def process_tasks(tasks):
futures = []
for task in tasks:
future = executor.submit(agent.run, task)
futures.append(future)
return [f.result() for f in futures]
State Management
Autosave
agent = Agent(
agent_name="Stateful-Agent",
model_name="gpt-4o",
autosave=True, # Auto-save state after each run
saved_state_path="./agent_states/",
)
Manual State Management
# Save state
agent.save_state()
# Load state
from swarms import Agent
agent = Agent(
agent_name="Restored-Agent",
model_name="gpt-4o",
load_state_path="./agent_states/agent_state.json",
)
Testing
Unit Testing
import unittest
from swarms import Agent
class TestAgent(unittest.TestCase):
def setUp(self):
self.agent = Agent(
agent_name="Test-Agent",
model_name="gpt-4o-mini",
max_loops=1,
)
def test_basic_run(self):
result = self.agent.run("Say hello")
self.assertIsNotNone(result)
self.assertIn("hello", result.lower())
def test_error_handling(self):
with self.assertRaises(AgentRunError):
self.agent.run("") # Empty task should fail
Integration Testing
def test_agent_integration():
"""Test agent with real LLM."""
agent = Agent(
agent_name="Integration-Test-Agent",
model_name="gpt-4o-mini",
max_loops=2,
)
# Test with various inputs
test_cases = [
"Simple task",
"Multi-step reasoning task",
"Task requiring tool use",
]
for task in test_cases:
result = agent.run(task)
assert result is not None
assert len(result) > 0