Overview
Monitoring is essential for running production multi-agent systems. This guide covers how to monitor agent performance, track metrics, implement logging, and gain full observability into your Swarms deployment.Logging Architecture
Loguru Integration
Swarms uses Loguru for powerful, structured logging:from loguru import logger
import sys
# Basic configuration
logger.remove() # Remove default handler
logger.add(
sys.stderr,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
colorize=True,
)
Multi-Level Logging
Configure different log levels for different outputs:from loguru import logger
import sys
# Console: INFO and above
logger.add(
sys.stderr,
level="INFO",
format="{time:HH:mm:ss} | {level} | {message}",
colorize=True,
)
# File: All logs with rotation
logger.add(
"logs/agent_{time}.log",
rotation="500 MB",
retention="10 days",
compression="zip",
level="DEBUG",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
)
# Error file: Errors only
logger.add(
"logs/errors_{time}.log",
rotation="100 MB",
retention="30 days",
compression="zip",
level="ERROR",
filter=lambda record: record["level"].name == "ERROR",
)
# JSON format for structured logging
logger.add(
"logs/structured_{time}.json",
rotation="1 day",
serialize=True, # JSON format
level="INFO",
)
Contextual Logging
Add context to logs for better traceability:from loguru import logger
import contextvars
# Create context variables
request_id_var = contextvars.ContextVar('request_id', default='unknown')
user_id_var = contextvars.ContextVar('user_id', default='anonymous')
# Configure logger to include context
logger.configure(
patcher=lambda record: record.update(
request_id=request_id_var.get(),
user_id=user_id_var.get(),
)
)
def process_request(request_id, user_id, task):
# Set context
request_id_var.set(request_id)
user_id_var.set(user_id)
logger.info(f"Processing task: {task}")
try:
result = agent.run(task)
logger.info("Task completed successfully")
return result
except Exception as e:
logger.error(f"Task failed: {e}")
raise
Agent-Level Monitoring
Built-in Verbose Mode
from swarms import Agent
agent = Agent(
agent_name="Monitored-Agent",
model_name="gpt-4o",
verbose=True, # Enable detailed logging
print_on=True, # Print to console
)
# Agent will log:
# - Task inputs
# - Tool executions
# - LLM calls
# - Responses
# - Errors and retries
Custom Monitoring Wrapper
import time
from loguru import logger
from typing import Any, Dict
class MonitoredAgent:
def __init__(self, agent):
self.agent = agent
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_duration": 0.0,
"errors": [],
}
def run(self, task: str, *args, **kwargs) -> Any:
start_time = time.time()
self.metrics["total_requests"] += 1
logger.info(
f"Starting task for {self.agent.agent_name}",
extra={"task": task[:100]}
)
try:
result = self.agent.run(task, *args, **kwargs)
self.metrics["successful_requests"] += 1
duration = time.time() - start_time
self.metrics["total_duration"] += duration
logger.info(
f"Task completed successfully",
extra={
"agent": self.agent.agent_name,
"duration": f"{duration:.2f}s",
"success": True,
}
)
return result
except Exception as e:
self.metrics["failed_requests"] += 1
self.metrics["errors"].append({
"timestamp": time.time(),
"error": str(e),
"task": task[:100],
})
logger.error(
f"Task failed",
extra={
"agent": self.agent.agent_name,
"error": str(e),
"duration": f"{time.time() - start_time:.2f}s",
}
)
raise
def get_metrics(self) -> Dict:
avg_duration = (
self.metrics["total_duration"] / self.metrics["total_requests"]
if self.metrics["total_requests"] > 0 else 0
)
success_rate = (
self.metrics["successful_requests"] / self.metrics["total_requests"]
if self.metrics["total_requests"] > 0 else 0
)
return {
**self.metrics,
"avg_duration": avg_duration,
"success_rate": success_rate,
}
# Usage
monitored_agent = MonitoredAgent(agent)
result = monitored_agent.run("Process this task")
metrics = monitored_agent.get_metrics()
Queue Monitoring (AOP)
Real-Time Queue Statistics
from swarms.structs.aop import AOP
import time
import threading
# Create AOP cluster
deployer = AOP(
server_name="MonitoredCluster",
agents=agents,
queue_enabled=True,
verbose=True,
)
# Monitor queue stats in background
def monitor_queues():
while True:
stats = deployer.get_queue_stats()
if stats["success"]:
for agent_name, agent_stats in stats["stats"].items():
logger.info(
f"Queue stats for {agent_name}",
extra=agent_stats
)
time.sleep(60) # Check every minute
monitor_thread = threading.Thread(target=monitor_queues, daemon=True)
monitor_thread.start()
deployer.run()
Queue Metrics Dashboard
from rich.console import Console
from rich.table import Table
import time
console = Console()
def display_queue_dashboard(deployer):
while True:
# Clear console
console.clear()
# Create table
table = Table(title="Agent Queue Statistics")
table.add_column("Agent", style="cyan")
table.add_column("Total", style="magenta")
table.add_column("Completed", style="green")
table.add_column("Failed", style="red")
table.add_column("Pending", style="yellow")
table.add_column("Processing", style="blue")
table.add_column("Avg Time", style="white")
table.add_column("Queue Size", style="white")
# Get stats for all agents
all_stats = deployer.get_queue_stats()
if all_stats["success"]:
for agent_name, stats in all_stats["stats"].items():
table.add_row(
agent_name,
str(stats["total_tasks"]),
str(stats["completed_tasks"]),
str(stats["failed_tasks"]),
str(stats["pending_tasks"]),
str(stats["processing_tasks"]),
f"{stats['average_processing_time']:.2f}s",
str(stats["queue_size"]),
)
console.print(table)
time.sleep(5) # Update every 5 seconds
# Run dashboard in background
import threading
dashboard_thread = threading.Thread(
target=display_queue_dashboard,
args=(deployer,),
daemon=True,
)
dashboard_thread.start()
Performance Metrics
Response Time Tracking
import time
from collections import deque
import statistics
class PerformanceTracker:
def __init__(self, window_size=100):
self.response_times = deque(maxlen=window_size)
self.request_count = 0
def record_request(self, duration: float):
self.response_times.append(duration)
self.request_count += 1
def get_stats(self):
if not self.response_times:
return None
return {
"count": len(self.response_times),
"total_requests": self.request_count,
"mean": statistics.mean(self.response_times),
"median": statistics.median(self.response_times),
"min": min(self.response_times),
"max": max(self.response_times),
"stdev": statistics.stdev(self.response_times) if len(self.response_times) > 1 else 0,
}
tracker = PerformanceTracker()
def timed_agent_run(agent, task):
start = time.time()
try:
result = agent.run(task)
return result
finally:
duration = time.time() - start
tracker.record_request(duration)
# Get stats
stats = tracker.get_stats()
logger.info(f"Performance stats: {stats}")
Throughput Monitoring
import time
from collections import deque
class ThroughputMonitor:
def __init__(self, window_seconds=60):
self.window_seconds = window_seconds
self.requests = deque()
def record_request(self):
now = time.time()
self.requests.append(now)
# Remove old requests
cutoff = now - self.window_seconds
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def get_throughput(self):
return len(self.requests) / self.window_seconds
def get_stats(self):
throughput = self.get_throughput()
return {
"throughput_per_second": throughput,
"throughput_per_minute": throughput * 60,
"requests_in_window": len(self.requests),
"window_seconds": self.window_seconds,
}
monitor = ThroughputMonitor(window_seconds=60)
# Record each request
for task in tasks:
result = agent.run(task)
monitor.record_request()
# Get current throughput
stats = monitor.get_stats()
logger.info(f"Current throughput: {stats['throughput_per_minute']:.2f} req/min")
Error Tracking
Error Rate Monitoring
from collections import Counter
import time
class ErrorTracker:
def __init__(self):
self.total_requests = 0
self.errors = []
self.error_types = Counter()
def record_error(self, error: Exception, context: dict = None):
error_record = {
"timestamp": time.time(),
"error_type": type(error).__name__,
"message": str(error),
"context": context or {},
}
self.errors.append(error_record)
self.error_types[type(error).__name__] += 1
def record_success(self):
self.total_requests += 1
def get_error_rate(self):
if self.total_requests == 0:
return 0.0
return len(self.errors) / self.total_requests
def get_stats(self):
return {
"total_requests": self.total_requests,
"total_errors": len(self.errors),
"error_rate": self.get_error_rate(),
"error_types": dict(self.error_types),
"recent_errors": self.errors[-10:], # Last 10 errors
}
error_tracker = ErrorTracker()
def tracked_run(agent, task):
try:
result = agent.run(task)
error_tracker.record_success()
return result
except Exception as e:
error_tracker.record_error(e, {"task": task[:100], "agent": agent.agent_name})
raise
Health Checks
Agent Health Check
import time
from typing import Dict, Any
def health_check(agent) -> Dict[str, Any]:
"""
Perform comprehensive health check on agent.
"""
try:
# Quick test run
start = time.time()
result = agent.run("Hello", max_loops=1)
duration = time.time() - start
return {
"status": "healthy",
"agent_name": agent.agent_name,
"model": agent.model_name,
"response_time": duration,
"timestamp": time.time(),
"checks": {
"llm_responsive": True,
"response_valid": result is not None,
}
}
except Exception as e:
return {
"status": "unhealthy",
"agent_name": agent.agent_name,
"error": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
}
# Periodic health checks
import threading
def periodic_health_check(agents, interval=60):
def check():
while True:
for agent in agents:
status = health_check(agent)
if status["status"] == "unhealthy":
logger.error(f"Agent {agent.agent_name} is unhealthy: {status}")
else:
logger.info(f"Agent {agent.agent_name} is healthy")
time.sleep(interval)
thread = threading.Thread(target=check, daemon=True)
thread.start()
System Health Check
import psutil
def system_health_check() -> Dict[str, Any]:
"""
Check system resource health.
"""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
"status": "healthy" if cpu_percent < 90 and memory.percent < 85 else "warning",
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent,
"timestamp": time.time(),
"warnings": [
f"High CPU usage: {cpu_percent}%" if cpu_percent > 90 else None,
f"High memory usage: {memory.percent}%" if memory.percent > 85 else None,
f"Low disk space: {disk.percent}%" if disk.percent > 90 else None,
],
}
Alerting
Simple Alert System
from enum import Enum
from typing import Callable, List
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class AlertManager:
def __init__(self):
self.handlers: List[Callable] = []
def add_handler(self, handler: Callable):
self.handlers.append(handler)
def alert(self, level: AlertLevel, message: str, context: dict = None):
alert_data = {
"level": level.value,
"message": message,
"context": context or {},
"timestamp": time.time(),
}
for handler in self.handlers:
try:
handler(alert_data)
except Exception as e:
logger.error(f"Alert handler failed: {e}")
# Alert handlers
def log_alert(alert_data):
level = alert_data["level"]
message = alert_data["message"]
if level == "critical":
logger.critical(message)
elif level == "error":
logger.error(message)
elif level == "warning":
logger.warning(message)
else:
logger.info(message)
def email_alert(alert_data):
# Implement email notification
pass
# Setup alerts
alert_manager = AlertManager()
alert_manager.add_handler(log_alert)
# alert_manager.add_handler(email_alert)
# Use alerts
if error_rate > 0.1:
alert_manager.alert(
AlertLevel.ERROR,
"High error rate detected",
{"error_rate": error_rate}
)
Best Practices
1. Use Structured Logging
# Good: Structured
logger.info(
"Task completed",
extra={
"agent": agent.agent_name,
"duration": duration,
"success": True,
}
)
# Avoid: Unstructured
logger.info(f"Task completed by {agent.agent_name} in {duration}s")
2. Log at Appropriate Levels
- DEBUG: Detailed diagnostic information
- INFO: General informational messages
- WARNING: Warning messages for recoverable issues
- ERROR: Error messages for failures
- CRITICAL: Critical errors requiring immediate attention
3. Include Context
logger.info(
"Processing request",
extra={
"request_id": request_id,
"user_id": user_id,
"agent": agent.agent_name,
"task_length": len(task),
}
)
4. Monitor Key Metrics
- Response time (mean, median, p95, p99)
- Throughput (requests per second/minute)
- Error rate
- Queue sizes (if using AOP)
- Resource usage (CPU, memory)
5. Set Up Alerts
- High error rates
- Slow response times
- Resource exhaustion
- Queue backlogs