Skip to main content

Overview

Monitoring is essential for running production multi-agent systems. This guide covers how to monitor agent performance, track metrics, implement logging, and gain full observability into your Swarms deployment.

Logging Architecture

Loguru Integration

Swarms uses Loguru for powerful, structured logging:
from loguru import logger
import sys

# Basic configuration
logger.remove()  # Remove default handler
logger.add(
    sys.stderr,
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
    level="INFO",
    colorize=True,
)

Multi-Level Logging

Configure different log levels for different outputs:
from loguru import logger
import sys

# Console: INFO and above
logger.add(
    sys.stderr,
    level="INFO",
    format="{time:HH:mm:ss} | {level} | {message}",
    colorize=True,
)

# File: All logs with rotation
logger.add(
    "logs/agent_{time}.log",
    rotation="500 MB",
    retention="10 days",
    compression="zip",
    level="DEBUG",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)

# Error file: Errors only
logger.add(
    "logs/errors_{time}.log",
    rotation="100 MB",
    retention="30 days",
    compression="zip",
    level="ERROR",
    filter=lambda record: record["level"].name == "ERROR",
)

# JSON format for structured logging
logger.add(
    "logs/structured_{time}.json",
    rotation="1 day",
    serialize=True,  # JSON format
    level="INFO",
)

Contextual Logging

Add context to logs for better traceability:
from loguru import logger
import contextvars

# Create context variables
request_id_var = contextvars.ContextVar('request_id', default='unknown')
user_id_var = contextvars.ContextVar('user_id', default='anonymous')

# Configure logger to include context
logger.configure(
    patcher=lambda record: record.update(
        request_id=request_id_var.get(),
        user_id=user_id_var.get(),
    )
)

def process_request(request_id, user_id, task):
    # Set context
    request_id_var.set(request_id)
    user_id_var.set(user_id)
    
    logger.info(f"Processing task: {task}")
    try:
        result = agent.run(task)
        logger.info("Task completed successfully")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise

Agent-Level Monitoring

Built-in Verbose Mode

from swarms import Agent

agent = Agent(
    agent_name="Monitored-Agent",
    model_name="gpt-4o",
    verbose=True,  # Enable detailed logging
    print_on=True,  # Print to console
)

# Agent will log:
# - Task inputs
# - Tool executions  
# - LLM calls
# - Responses
# - Errors and retries

Custom Monitoring Wrapper

import time
from loguru import logger
from typing import Any, Dict

class MonitoredAgent:
    def __init__(self, agent):
        self.agent = agent
        self.metrics = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "total_duration": 0.0,
            "errors": [],
        }
    
    def run(self, task: str, *args, **kwargs) -> Any:
        start_time = time.time()
        self.metrics["total_requests"] += 1
        
        logger.info(
            f"Starting task for {self.agent.agent_name}",
            extra={"task": task[:100]}
        )
        
        try:
            result = self.agent.run(task, *args, **kwargs)
            self.metrics["successful_requests"] += 1
            
            duration = time.time() - start_time
            self.metrics["total_duration"] += duration
            
            logger.info(
                f"Task completed successfully",
                extra={
                    "agent": self.agent.agent_name,
                    "duration": f"{duration:.2f}s",
                    "success": True,
                }
            )
            
            return result
            
        except Exception as e:
            self.metrics["failed_requests"] += 1
            self.metrics["errors"].append({
                "timestamp": time.time(),
                "error": str(e),
                "task": task[:100],
            })
            
            logger.error(
                f"Task failed",
                extra={
                    "agent": self.agent.agent_name,
                    "error": str(e),
                    "duration": f"{time.time() - start_time:.2f}s",
                }
            )
            raise
    
    def get_metrics(self) -> Dict:
        avg_duration = (
            self.metrics["total_duration"] / self.metrics["total_requests"]
            if self.metrics["total_requests"] > 0 else 0
        )
        
        success_rate = (
            self.metrics["successful_requests"] / self.metrics["total_requests"]
            if self.metrics["total_requests"] > 0 else 0
        )
        
        return {
            **self.metrics,
            "avg_duration": avg_duration,
            "success_rate": success_rate,
        }

# Usage
monitored_agent = MonitoredAgent(agent)
result = monitored_agent.run("Process this task")
metrics = monitored_agent.get_metrics()

Queue Monitoring (AOP)

Real-Time Queue Statistics

from swarms.structs.aop import AOP
import time
import threading

# Create AOP cluster
deployer = AOP(
    server_name="MonitoredCluster",
    agents=agents,
    queue_enabled=True,
    verbose=True,
)

# Monitor queue stats in background
def monitor_queues():
    while True:
        stats = deployer.get_queue_stats()
        
        if stats["success"]:
            for agent_name, agent_stats in stats["stats"].items():
                logger.info(
                    f"Queue stats for {agent_name}",
                    extra=agent_stats
                )
        
        time.sleep(60)  # Check every minute

monitor_thread = threading.Thread(target=monitor_queues, daemon=True)
monitor_thread.start()

deployer.run()

Queue Metrics Dashboard

from rich.console import Console
from rich.table import Table
import time

console = Console()

def display_queue_dashboard(deployer):
    while True:
        # Clear console
        console.clear()
        
        # Create table
        table = Table(title="Agent Queue Statistics")
        table.add_column("Agent", style="cyan")
        table.add_column("Total", style="magenta")
        table.add_column("Completed", style="green")
        table.add_column("Failed", style="red")
        table.add_column("Pending", style="yellow")
        table.add_column("Processing", style="blue")
        table.add_column("Avg Time", style="white")
        table.add_column("Queue Size", style="white")
        
        # Get stats for all agents
        all_stats = deployer.get_queue_stats()
        
        if all_stats["success"]:
            for agent_name, stats in all_stats["stats"].items():
                table.add_row(
                    agent_name,
                    str(stats["total_tasks"]),
                    str(stats["completed_tasks"]),
                    str(stats["failed_tasks"]),
                    str(stats["pending_tasks"]),
                    str(stats["processing_tasks"]),
                    f"{stats['average_processing_time']:.2f}s",
                    str(stats["queue_size"]),
                )
        
        console.print(table)
        time.sleep(5)  # Update every 5 seconds

# Run dashboard in background
import threading
dashboard_thread = threading.Thread(
    target=display_queue_dashboard,
    args=(deployer,),
    daemon=True,
)
dashboard_thread.start()

Performance Metrics

Response Time Tracking

import time
from collections import deque
import statistics

class PerformanceTracker:
    def __init__(self, window_size=100):
        self.response_times = deque(maxlen=window_size)
        self.request_count = 0
    
    def record_request(self, duration: float):
        self.response_times.append(duration)
        self.request_count += 1
    
    def get_stats(self):
        if not self.response_times:
            return None
        
        return {
            "count": len(self.response_times),
            "total_requests": self.request_count,
            "mean": statistics.mean(self.response_times),
            "median": statistics.median(self.response_times),
            "min": min(self.response_times),
            "max": max(self.response_times),
            "stdev": statistics.stdev(self.response_times) if len(self.response_times) > 1 else 0,
        }

tracker = PerformanceTracker()

def timed_agent_run(agent, task):
    start = time.time()
    try:
        result = agent.run(task)
        return result
    finally:
        duration = time.time() - start
        tracker.record_request(duration)

# Get stats
stats = tracker.get_stats()
logger.info(f"Performance stats: {stats}")

Throughput Monitoring

import time
from collections import deque

class ThroughputMonitor:
    def __init__(self, window_seconds=60):
        self.window_seconds = window_seconds
        self.requests = deque()
    
    def record_request(self):
        now = time.time()
        self.requests.append(now)
        
        # Remove old requests
        cutoff = now - self.window_seconds
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
    
    def get_throughput(self):
        return len(self.requests) / self.window_seconds
    
    def get_stats(self):
        throughput = self.get_throughput()
        return {
            "throughput_per_second": throughput,
            "throughput_per_minute": throughput * 60,
            "requests_in_window": len(self.requests),
            "window_seconds": self.window_seconds,
        }

monitor = ThroughputMonitor(window_seconds=60)

# Record each request
for task in tasks:
    result = agent.run(task)
    monitor.record_request()

# Get current throughput
stats = monitor.get_stats()
logger.info(f"Current throughput: {stats['throughput_per_minute']:.2f} req/min")

Error Tracking

Error Rate Monitoring

from collections import Counter
import time

class ErrorTracker:
    def __init__(self):
        self.total_requests = 0
        self.errors = []
        self.error_types = Counter()
    
    def record_error(self, error: Exception, context: dict = None):
        error_record = {
            "timestamp": time.time(),
            "error_type": type(error).__name__,
            "message": str(error),
            "context": context or {},
        }
        self.errors.append(error_record)
        self.error_types[type(error).__name__] += 1
    
    def record_success(self):
        self.total_requests += 1
    
    def get_error_rate(self):
        if self.total_requests == 0:
            return 0.0
        return len(self.errors) / self.total_requests
    
    def get_stats(self):
        return {
            "total_requests": self.total_requests,
            "total_errors": len(self.errors),
            "error_rate": self.get_error_rate(),
            "error_types": dict(self.error_types),
            "recent_errors": self.errors[-10:],  # Last 10 errors
        }

error_tracker = ErrorTracker()

def tracked_run(agent, task):
    try:
        result = agent.run(task)
        error_tracker.record_success()
        return result
    except Exception as e:
        error_tracker.record_error(e, {"task": task[:100], "agent": agent.agent_name})
        raise

Health Checks

Agent Health Check

import time
from typing import Dict, Any

def health_check(agent) -> Dict[str, Any]:
    """
    Perform comprehensive health check on agent.
    """
    try:
        # Quick test run
        start = time.time()
        result = agent.run("Hello", max_loops=1)
        duration = time.time() - start
        
        return {
            "status": "healthy",
            "agent_name": agent.agent_name,
            "model": agent.model_name,
            "response_time": duration,
            "timestamp": time.time(),
            "checks": {
                "llm_responsive": True,
                "response_valid": result is not None,
            }
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "agent_name": agent.agent_name,
            "error": str(e),
            "error_type": type(e).__name__,
            "timestamp": time.time(),
        }

# Periodic health checks
import threading

def periodic_health_check(agents, interval=60):
    def check():
        while True:
            for agent in agents:
                status = health_check(agent)
                if status["status"] == "unhealthy":
                    logger.error(f"Agent {agent.agent_name} is unhealthy: {status}")
                else:
                    logger.info(f"Agent {agent.agent_name} is healthy")
            time.sleep(interval)
    
    thread = threading.Thread(target=check, daemon=True)
    thread.start()

System Health Check

import psutil

def system_health_check() -> Dict[str, Any]:
    """
    Check system resource health.
    """
    cpu_percent = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    disk = psutil.disk_usage('/')
    
    return {
        "status": "healthy" if cpu_percent < 90 and memory.percent < 85 else "warning",
        "cpu_percent": cpu_percent,
        "memory_percent": memory.percent,
        "disk_percent": disk.percent,
        "timestamp": time.time(),
        "warnings": [
            f"High CPU usage: {cpu_percent}%" if cpu_percent > 90 else None,
            f"High memory usage: {memory.percent}%" if memory.percent > 85 else None,
            f"Low disk space: {disk.percent}%" if disk.percent > 90 else None,
        ],
    }

Alerting

Simple Alert System

from enum import Enum
from typing import Callable, List

class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertManager:
    def __init__(self):
        self.handlers: List[Callable] = []
    
    def add_handler(self, handler: Callable):
        self.handlers.append(handler)
    
    def alert(self, level: AlertLevel, message: str, context: dict = None):
        alert_data = {
            "level": level.value,
            "message": message,
            "context": context or {},
            "timestamp": time.time(),
        }
        
        for handler in self.handlers:
            try:
                handler(alert_data)
            except Exception as e:
                logger.error(f"Alert handler failed: {e}")

# Alert handlers
def log_alert(alert_data):
    level = alert_data["level"]
    message = alert_data["message"]
    
    if level == "critical":
        logger.critical(message)
    elif level == "error":
        logger.error(message)
    elif level == "warning":
        logger.warning(message)
    else:
        logger.info(message)

def email_alert(alert_data):
    # Implement email notification
    pass

# Setup alerts
alert_manager = AlertManager()
alert_manager.add_handler(log_alert)
# alert_manager.add_handler(email_alert)

# Use alerts
if error_rate > 0.1:
    alert_manager.alert(
        AlertLevel.ERROR,
        "High error rate detected",
        {"error_rate": error_rate}
    )

Best Practices

1. Use Structured Logging

# Good: Structured
logger.info(
    "Task completed",
    extra={
        "agent": agent.agent_name,
        "duration": duration,
        "success": True,
    }
)

# Avoid: Unstructured
logger.info(f"Task completed by {agent.agent_name} in {duration}s")

2. Log at Appropriate Levels

  • DEBUG: Detailed diagnostic information
  • INFO: General informational messages
  • WARNING: Warning messages for recoverable issues
  • ERROR: Error messages for failures
  • CRITICAL: Critical errors requiring immediate attention

3. Include Context

logger.info(
    "Processing request",
    extra={
        "request_id": request_id,
        "user_id": user_id,
        "agent": agent.agent_name,
        "task_length": len(task),
    }
)

4. Monitor Key Metrics

  • Response time (mean, median, p95, p99)
  • Throughput (requests per second/minute)
  • Error rate
  • Queue sizes (if using AOP)
  • Resource usage (CPU, memory)

5. Set Up Alerts

  • High error rates
  • Slow response times
  • Resource exhaustion
  • Queue backlogs

Build docs developers (and LLMs) love