Skip to main content

Monitoring and Observability

MoFA provides comprehensive monitoring capabilities through a web-based dashboard, metrics collection, WebSocket updates, and distributed tracing integration.

Dashboard Server

Quick Start

use mofa_monitoring::{DashboardServer, DashboardConfig};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let config = DashboardConfig::new()
        .with_host("127.0.0.1")
        .with_port(8080)
        .with_cors(true)
        .with_ws_interval(Duration::from_secs(1));

    let server = DashboardServer::new(config);
    server.start().await?;
    Ok(())
}
Access the dashboard at http://127.0.0.1:8080

Configuration

let config = DashboardConfig::new()
    .with_host("0.0.0.0")              // Bind address
    .with_port(3000)                   // Port number
    .with_cors(true)                   // Enable CORS
    .with_ws_interval(Duration::from_secs(2))  // WebSocket update interval
    .with_auth(Arc::new(token_auth));  // Optional authentication

let server = DashboardServer::new(config);

Metrics Collection

Agent Metrics

Track agent state and performance:
use mofa_monitoring::{MetricsCollector, AgentMetrics};
use std::sync::Arc;

let collector = Arc::new(MetricsCollector::new(
    MetricsConfig::default()
));

let metrics = AgentMetrics {
    agent_id: "agent-001".to_string(),
    name: "Research Agent".to_string(),
    state: "running".to_string(),
    tasks_completed: 42,
    tasks_failed: 2,
    tasks_in_progress: 3,
    messages_sent: 158,
    messages_received: 201,
    last_activity: SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs(),
    avg_task_duration_ms: 1250.5,
};

collector.update_agent(metrics).await;

Workflow Metrics

Monitor workflow execution:
use mofa_monitoring::WorkflowMetrics;

let metrics = WorkflowMetrics {
    workflow_id: "wf-001".to_string(),
    name: "Content Pipeline".to_string(),
    status: "running".to_string(),
    total_executions: 150,
    successful_executions: 145,
    failed_executions: 5,
    running_instances: 2,
    avg_execution_time_ms: 3400.0,
    node_count: 7,
};

collector.update_workflow(metrics).await;

LLM Metrics

Track LLM API usage and performance:
use mofa_monitoring::LLMMetrics;

let metrics = LLMMetrics {
    plugin_id: "openai-gpt4".to_string(),
    provider_name: "OpenAI".to_string(),
    model_name: "gpt-4".to_string(),
    state: "running".to_string(),
    total_requests: 1250,
    successful_requests: 1190,
    failed_requests: 60,
    total_tokens: 245000,
    prompt_tokens: 120000,
    completion_tokens: 125000,
    avg_latency_ms: 1850.5,
    tokens_per_second: Some(42.3),
    time_to_first_token_ms: Some(320.0),
    requests_per_minute: 25.5,
    error_rate: 4.8,
    last_request_timestamp: SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs(),
};

collector.update_llm(metrics).await;

Plugin Metrics

Monitor plugin health:
use mofa_monitoring::PluginMetrics;

let metrics = PluginMetrics {
    plugin_id: "plugin-001".to_string(),
    name: "Vector DB".to_string(),
    version: "2.0.0".to_string(),
    state: "running".to_string(),
    call_count: 5420,
    error_count: 12,
    avg_response_time_ms: 45.2,
    last_reload: Some(SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs()),
    reload_count: 3,
};

collector.update_plugin(metrics).await;

System Metrics

Track system resources:
use mofa_monitoring::SystemMetrics;

let metrics = SystemMetrics {
    cpu_usage: 45.2,        // Percentage
    memory_usage: 2048,     // MB
    memory_total: 16384,    // MB
    disk_usage: 50000,      // MB
    disk_total: 256000,     // MB
    network_rx_bytes: 1024000,
    network_tx_bytes: 512000,
    uptime_seconds: 86400,
    goroutines: 150,        // Active tasks/threads
};

collector.update_system(metrics).await;

REST API

The dashboard exposes REST endpoints:

Endpoints

# Dashboard overview
GET /api/overview

# Current metrics snapshot
GET /api/metrics

# List all agents
GET /api/agents

# Get specific agent
GET /api/agents/:id

# List all workflows
GET /api/workflows

# List all plugins
GET /api/plugins

# System status
GET /api/system

# Health check
GET /api/health

Example Response

{
  "agents": [
    {
      "agent_id": "agent-001",
      "name": "Research Agent",
      "state": "running",
      "tasks_completed": 42,
      "tasks_failed": 2,
      "tasks_in_progress": 3,
      "avg_task_duration_ms": 1250.5
    }
  ],
  "workflows": [...],
  "llm_providers": [...],
  "system": {...}
}

WebSocket Updates

Real-time updates via WebSocket:

Client Connection

const ws = new WebSocket('ws://localhost:8080/ws');

ws.onopen = () => {
    console.log('Connected to MoFA dashboard');
    
    // Subscribe to topics
    ws.send(JSON.stringify({
        type: 'subscribe',
        topics: ['agents', 'workflows', 'llm']
    }));
};

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    switch(data.type) {
        case 'metrics_update':
            updateDashboard(data.payload);
            break;
        case 'alert':
            showAlert(data.level, data.message);
            break;
    }
};

Server-side Alerts

use mofa_monitoring::WebSocketHandler;

if let Some(ws_handler) = server.ws_handler() {
    ws_handler.send_alert(
        "warning",
        "High error rate detected on agent-003",
        "monitoring-system"
    ).await;
}

Dashboard Integration

Integrate monitoring into your application:
use mofa_monitoring::{DashboardServer, DashboardConfig};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Create dashboard
    let config = DashboardConfig::new().with_port(8080);
    let mut server = DashboardServer::new(config);
    
    // Get collector for metrics updates
    let collector = server.collector();
    
    // Start background data generator
    let collector_clone = collector.clone();
    tokio::spawn(async move {
        generate_metrics(collector_clone).await;
    });
    
    // Build and start server
    let router = server.build_router();
    
    // Start metrics collection
    collector.start_collection();
    
    // Start WebSocket updates
    if let Some(ws_handler) = server.ws_handler() {
        tokio::spawn(async move {
            ws_handler.start_updates();
        });
    }
    
    // Run server
    let addr = "127.0.0.1:8080".parse()?;
    let listener = tokio::net::TcpListener::bind(addr).await?;
    axum::serve(listener, router).await?;
    
    Ok(())
}

async fn generate_metrics(collector: Arc<MetricsCollector>) {
    let mut interval = tokio::time::interval(Duration::from_secs(1));
    
    loop {
        interval.tick().await;
        
        // Update agent metrics
        collector.update_agent(get_agent_metrics()).await;
        
        // Update workflow metrics
        collector.update_workflow(get_workflow_metrics()).await;
        
        // Update LLM metrics
        collector.update_llm(get_llm_metrics()).await;
    }
}

Distributed Tracing

OpenTelemetry Integration

use mofa_monitoring::tracing::{init_tracing, TracingConfig};

let config = TracingConfig {
    service_name: "mofa-agent".to_string(),
    otlp_endpoint: Some("http://localhost:4317".to_string()),
    sample_ratio: 0.1,  // 10% sampling
};

init_tracing(config)?;

Span Instrumentation

use tracing::{info, warn, instrument};

#[instrument(skip(agent))]
async fn process_task(agent: &Agent, task: Task) -> Result<Output> {
    info!("Processing task: {}", task.id);
    
    let result = agent.execute(task).await?;
    
    warn!("Task completed in {}ms", result.duration_ms);
    Ok(result.output)
}

Context Propagation

use mofa_monitoring::tracing::propagate_context;

let trace_id = propagate_context(&request_headers)?;
info!(trace_id = %trace_id, "Request received");

Metrics Export

Prometheus Format

use mofa_monitoring::MetricsRegistry;

let registry = MetricsRegistry::new();

// Register metrics
registry.register_gauge("agent_tasks_completed", "Total tasks completed by agent");
registry.register_histogram("llm_latency_ms", "LLM API latency in milliseconds");

// Update values
registry.set_gauge("agent_tasks_completed", 42.0);
registry.observe_histogram("llm_latency_ms", 1250.5);

// Export Prometheus format
let metrics = registry.export_prometheus();

JSON Export

let snapshot = collector.get_snapshot().await;
let json = serde_json::to_string_pretty(&snapshot)?;
println!("{}", json);

Authentication

Token-based Auth

use mofa_monitoring::TokenAuthProvider;
use std::sync::Arc;

let auth = Arc::new(TokenAuthProvider::new(vec![
    "secret-token-1".to_string(),
    "secret-token-2".to_string(),
]));

let config = DashboardConfig::new()
    .with_auth(auth);

Custom Auth Provider

use mofa_monitoring::{AuthProvider, AuthInfo};
use async_trait::async_trait;

struct CustomAuth;

#[async_trait]
impl AuthProvider for CustomAuth {
    fn is_enabled(&self) -> bool {
        true
    }
    
    async fn authenticate(&self, token: &str) -> Result<AuthInfo, String> {
        // Custom authentication logic
        if validate_token(token) {
            Ok(AuthInfo {
                user_id: "user-123".to_string(),
                permissions: vec!["read".to_string()],
            })
        } else {
            Err("Invalid token".to_string())
        }
    }
}

let config = DashboardConfig::new()
    .with_auth(Arc::new(CustomAuth));

Best Practices

  1. Sample high-frequency events: Use sampling for high-throughput metrics
  2. Aggregate metrics: Compute averages and percentiles before export
  3. Set retention policies: Automatically clean old metrics data
  4. Monitor the monitor: Track dashboard performance and health
  5. Secure access: Use authentication in production environments
  6. Rate limit updates: Avoid overwhelming WebSocket clients
  7. Use structured logging: Enable correlation with tracing
  8. Alert on anomalies: Set up automated alerting for critical metrics

Performance Considerations

  • Metrics buffering: Metrics are buffered before updates
  • Async collection: Non-blocking metrics collection
  • Connection pooling: WebSocket connections are pooled
  • Lazy evaluation: Expensive metrics computed on-demand

Troubleshooting

High Memory Usage

let config = MetricsConfig {
    max_history_size: 1000,  // Limit history
    aggregation_window_secs: 60,  // Aggregate more frequently
    ..Default::default()
};

Slow Dashboard

let config = DashboardConfig::new()
    .with_ws_interval(Duration::from_secs(5));  // Reduce update frequency

WebSocket Disconnects

Implement reconnection logic:
function connectWebSocket() {
    const ws = new WebSocket('ws://localhost:8080/ws');
    
    ws.onclose = () => {
        setTimeout(connectWebSocket, 5000);  // Retry after 5s
    };
    
    return ws;
}

See Also

Build docs developers (and LLMs) love