Skip to main content

Overview

Deploying multi-agent systems in production requires careful attention to error handling, retries, logging, monitoring, and system design. This guide covers the essential best practices for running Swarms agents reliably at scale.

Error Handling

Agent Error Hierarchy

Swarms provides a comprehensive exception hierarchy for different failure modes:
from swarms.structs.agent import (
    AgentError,  # Base exception
    AgentInitializationError,  # Initialization failures
    AgentRunError,  # Runtime failures
    AgentLLMError,  # LLM-related errors
    AgentToolError,  # Tool execution errors
    AgentMemoryError,  # Memory errors
    AgentToolExecutionError,  # Tool execution failures
)

try:
    agent = Agent(
        agent_name="Production-Agent",
        model_name="gpt-4o",
        max_loops=3,
    )
    result = agent.run("Process this task")
except AgentInitializationError as e:
    logger.error(f"Failed to initialize agent: {e}")
    # Handle initialization failure (e.g., retry with different config)
except AgentLLMError as e:
    logger.error(f"LLM error: {e}")
    # Handle LLM failures (e.g., switch to fallback model)
except AgentToolError as e:
    logger.error(f"Tool error: {e}")
    # Handle tool failures (e.g., disable problematic tool)
except AgentRunError as e:
    logger.error(f"Runtime error: {e}")
    # Handle general runtime errors
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    # Catch-all for unexpected errors

Graceful Error Recovery

import traceback
from loguru import logger

def run_agent_with_recovery(agent, task, max_retries=3):
    """
    Run agent with automatic recovery from transient errors.
    """
    for attempt in range(max_retries):
        try:
            result = agent.run(task)
            return result
        except (AgentLLMError, AgentRunError) as e:
            logger.warning(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                logger.info(f"Retrying in {2 ** attempt} seconds...")
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                logger.error(f"All {max_retries} attempts failed")
                raise
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            logger.error(traceback.format_exc())
            raise

Retry Strategies

Built-in Retry Configuration

from swarms import Agent

agent = Agent(
    agent_name="Resilient-Agent",
    model_name="gpt-4o",
    retry_attempts=5,  # Number of retry attempts
    retry_interval=2,  # Initial retry interval (seconds)
    max_loops=3,
)

Exponential Backoff

import time
import random

def exponential_backoff_retry(
    func, 
    max_retries=5, 
    base_delay=1, 
    max_delay=60,
    jitter=True
):
    """
    Execute function with exponential backoff retry.
    """
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay *= (0.5 + random.random())  # Add jitter
            
            logger.warning(
                f"Attempt {attempt + 1} failed: {e}. "
                f"Retrying in {delay:.2f}s..."
            )
            time.sleep(delay)

# Usage
result = exponential_backoff_retry(
    lambda: agent.run("Complex task"),
    max_retries=5,
    base_delay=1,
    max_delay=60,
)

Fallback Models

Use fallback models for resilience:
agent = Agent(
    agent_name="Resilient-Agent",
    fallback_models=[
        "gpt-4o",           # Primary model
        "gpt-4o-mini",      # First fallback
        "gpt-3.5-turbo",    # Second fallback
    ],
    max_loops=3,
)

# Agent automatically tries fallback models if primary fails
result = agent.run("Generate a report")

Logging

Structured Logging with Loguru

Swarms uses Loguru for powerful, structured logging:
from loguru import logger
import sys

# Configure logging for production
logger.remove()  # Remove default handler
logger.add(
    sys.stderr,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level="INFO",
    colorize=True,
)

# Add file logging with rotation
logger.add(
    "logs/agent_{time}.log",
    rotation="500 MB",  # Rotate when file reaches 500MB
    retention="10 days",  # Keep logs for 10 days
    compression="zip",  # Compress rotated logs
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)

# Add error-only log file
logger.add(
    "logs/errors_{time}.log",
    rotation="100 MB",
    retention="30 days",
    compression="zip",
    level="ERROR",
    filter=lambda record: record["level"].name == "ERROR",
)

Agent-Specific Logging

# Enable verbose logging for specific agents
agent = Agent(
    agent_name="Debug-Agent",
    model_name="gpt-4o",
    verbose=True,  # Enable verbose logging
    print_on=True,  # Print outputs to console
)

# Control logging in AOP
from swarms.structs.aop import AOP

deployer = AOP(
    server_name="ProductionCluster",
    verbose=True,
    traceback_enabled=True,  # Enable detailed tracebacks
    log_level="INFO",  # Set log level
)

Contextual Logging

from loguru import logger
import contextvars

# Create context var for request ID
request_id_var = contextvars.ContextVar('request_id', default='unknown')

# Add request ID to all logs
logger.configure(
    patcher=lambda record: record.update(
        request_id=request_id_var.get()
    )
)

def process_request(request_id, task):
    request_id_var.set(request_id)
    logger.info(f"Processing task: {task}")
    
    try:
        result = agent.run(task)
        logger.info(f"Task completed successfully")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise

Monitoring

Performance Metrics

import time
from loguru import logger

class AgentMetrics:
    def __init__(self):
        self.total_requests = 0
        self.successful_requests = 0
        self.failed_requests = 0
        self.total_duration = 0.0
    
    def record_request(self, success: bool, duration: float):
        self.total_requests += 1
        if success:
            self.successful_requests += 1
        else:
            self.failed_requests += 1
        self.total_duration += duration
    
    def get_metrics(self):
        avg_duration = (
            self.total_duration / self.total_requests 
            if self.total_requests > 0 else 0
        )
        success_rate = (
            self.successful_requests / self.total_requests 
            if self.total_requests > 0 else 0
        )
        
        return {
            "total_requests": self.total_requests,
            "successful_requests": self.successful_requests,
            "failed_requests": self.failed_requests,
            "success_rate": success_rate,
            "avg_duration": avg_duration,
        }

metrics = AgentMetrics()

def monitored_agent_run(agent, task):
    start_time = time.time()
    success = False
    
    try:
        result = agent.run(task)
        success = True
        return result
    except Exception as e:
        logger.error(f"Agent run failed: {e}")
        raise
    finally:
        duration = time.time() - start_time
        metrics.record_request(success, duration)
        logger.info(f"Request completed in {duration:.2f}s")

Health Checks

def health_check(agent):
    """
    Perform health check on agent.
    """
    try:
        # Quick test run
        result = agent.run("Hello", max_loops=1)
        return {
            "status": "healthy",
            "agent_name": agent.agent_name,
            "model": agent.model_name,
            "timestamp": time.time(),
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "agent_name": agent.agent_name,
            "error": str(e),
            "timestamp": time.time(),
        }

# Periodic health checks
import threading

def periodic_health_check(agent, interval=60):
    def check():
        while True:
            status = health_check(agent)
            logger.info(f"Health check: {status}")
            time.sleep(interval)
    
    thread = threading.Thread(target=check, daemon=True)
    thread.start()

Configuration Management

Environment-Based Configuration

import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Production configuration
PROD_CONFIG = {
    "model_name": os.getenv("MODEL_NAME", "gpt-4o"),
    "max_loops": int(os.getenv("MAX_LOOPS", "3")),
    "retry_attempts": int(os.getenv("RETRY_ATTEMPTS", "5")),
    "timeout": int(os.getenv("TIMEOUT", "120")),
    "verbose": os.getenv("VERBOSE", "false").lower() == "true",
}

agent = Agent(
    agent_name="Production-Agent",
    **PROD_CONFIG,
)

Configuration Validation

from pydantic import BaseModel, Field, validator

class AgentConfig(BaseModel):
    agent_name: str = Field(..., min_length=1)
    model_name: str
    max_loops: int = Field(default=3, ge=1, le=100)
    retry_attempts: int = Field(default=3, ge=0, le=10)
    timeout: int = Field(default=120, ge=1)
    
    @validator('model_name')
    def validate_model(cls, v):
        allowed_models = ["gpt-4o", "gpt-4o-mini", "claude-sonnet-3.5"]
        if v not in allowed_models:
            raise ValueError(f"Model must be one of {allowed_models}")
        return v

# Load and validate config
config = AgentConfig(
    agent_name="Production-Agent",
    model_name="gpt-4o",
    max_loops=5,
)

agent = Agent(**config.dict())

Security Best Practices

API Key Management

import os
from cryptography.fernet import Fernet

# Never hardcode API keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")

# Use environment variables or secure vaults
agent = Agent(
    model_name="gpt-4o",
    llm_api_key=OPENAI_API_KEY,
)

Input Validation

import re

def sanitize_input(text: str) -> str:
    """
    Sanitize user input to prevent injection attacks.
    """
    # Remove control characters
    text = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', text)
    
    # Limit length
    max_length = 10000
    if len(text) > max_length:
        text = text[:max_length]
    
    return text.strip()

# Use sanitized input
user_task = sanitize_input(request.get("task"))
result = agent.run(user_task)

Safety Prompts

agent = Agent(
    agent_name="Safe-Agent",
    model_name="gpt-4o",
    safety_prompt_on=True,  # Enable safety guardrails
)

Resource Management

Memory Management

# Truncate conversation history to manage memory
agent = Agent(
    agent_name="Memory-Managed-Agent",
    model_name="gpt-4o",
    context_length=8000,  # Context window size
)

# Manual truncation
agent.truncate_history(max_messages=50)

Connection Pooling

from concurrent.futures import ThreadPoolExecutor

# Use thread pool for concurrent requests
executor = ThreadPoolExecutor(max_workers=10)

def process_tasks(tasks):
    futures = []
    for task in tasks:
        future = executor.submit(agent.run, task)
        futures.append(future)
    
    return [f.result() for f in futures]

State Management

Autosave

agent = Agent(
    agent_name="Stateful-Agent",
    model_name="gpt-4o",
    autosave=True,  # Auto-save state after each run
    saved_state_path="./agent_states/",
)

Manual State Management

# Save state
agent.save_state()

# Load state
from swarms import Agent

agent = Agent(
    agent_name="Restored-Agent",
    model_name="gpt-4o",
    load_state_path="./agent_states/agent_state.json",
)

Testing

Unit Testing

import unittest
from swarms import Agent

class TestAgent(unittest.TestCase):
    def setUp(self):
        self.agent = Agent(
            agent_name="Test-Agent",
            model_name="gpt-4o-mini",
            max_loops=1,
        )
    
    def test_basic_run(self):
        result = self.agent.run("Say hello")
        self.assertIsNotNone(result)
        self.assertIn("hello", result.lower())
    
    def test_error_handling(self):
        with self.assertRaises(AgentRunError):
            self.agent.run("")  # Empty task should fail

Integration Testing

def test_agent_integration():
    """Test agent with real LLM."""
    agent = Agent(
        agent_name="Integration-Test-Agent",
        model_name="gpt-4o-mini",
        max_loops=2,
    )
    
    # Test with various inputs
    test_cases = [
        "Simple task",
        "Multi-step reasoning task",
        "Task requiring tool use",
    ]
    
    for task in test_cases:
        result = agent.run(task)
        assert result is not None
        assert len(result) > 0

Build docs developers (and LLMs) love