Objectives
By the end of this lab you will be able to:- Implement comprehensive Application Insights integration for MCP servers
- Design structured logging patterns for effective troubleshooting
- Create performance metrics collection and analysis systems
- Configure intelligent alerting with actionable notifications
- Build operational dashboards for real-time monitoring
- Establish effective troubleshooting workflows and runbooks
Prerequisites
- Completed Lab 10: Deployment Strategies
- Application Insights resource deployed (from Lab 3)
APPLICATIONINSIGHTS_CONNECTION_STRINGset in your environment
The three pillars of observability
| Pillar | What it answers | Implementation |
|---|---|---|
| Metrics | Is something wrong? | OpenTelemetry counters, histograms, gauges |
| Logs | What happened? | Structured JSON logs via StructuredFormatter |
| Traces | Where did it break? | Distributed traces via trace_operation context manager |
Step 1: Application Insights integration
# mcp_server/monitoring.py
import logging
import time
import psutil
from typing import Dict, Any
from contextlib import contextmanager
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace, metrics
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
class MCPTelemetryManager:
"""Comprehensive telemetry management for MCP server."""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.tracer = None
self.meter = None
self.custom_metrics = {}
def initialize_telemetry(self, app):
"""Configure Azure Monitor and instrument all major components."""
configure_azure_monitor(
connection_string=self.connection_string,
logger_name="mcp_server",
disable_offline_storage=False
)
self.tracer = trace.get_tracer(__name__)
self.meter = metrics.get_meter(__name__)
self._setup_custom_metrics()
# Auto-instrument FastAPI and asyncpg
FastAPIInstrumentor.instrument_app(app)
AsyncPGInstrumentor().instrument()
logging.info("Telemetry initialization complete")
def _setup_custom_metrics(self):
self.custom_metrics = {
"mcp_requests_total": self.meter.create_counter(
name="mcp_requests_total",
description="Total number of MCP requests",
unit="1"
),
"mcp_request_duration": self.meter.create_histogram(
name="mcp_request_duration_seconds",
description="MCP request duration",
unit="s"
),
"database_queries_total": self.meter.create_counter(
name="database_queries_total",
description="Total database queries executed",
unit="1"
),
"database_query_duration": self.meter.create_histogram(
name="database_query_duration_seconds",
description="Database query duration",
unit="s"
),
"database_connections_active": self.meter.create_up_down_counter(
name="database_connections_active",
description="Active database connections",
unit="1"
),
"tool_executions_total": self.meter.create_counter(
name="tool_executions_total",
description="Total tool executions",
unit="1"
),
"tool_execution_duration": self.meter.create_histogram(
name="tool_execution_duration_seconds",
description="Tool execution duration",
unit="s"
),
"system_cpu_usage": self.meter.create_gauge(
name="system_cpu_usage_percent",
description="System CPU usage percentage",
unit="%"
),
"system_memory_usage": self.meter.create_gauge(
name="system_memory_usage_bytes",
description="System memory usage",
unit="byte"
),
"errors_total": self.meter.create_counter(
name="errors_total",
description="Total errors",
unit="1"
)
}
@contextmanager
def trace_operation(self, operation_name: str, attributes: Dict[str, Any] = None):
"""Trace an operation with automatic metric recording on success and failure."""
with self.tracer.start_as_current_span(operation_name) as span:
start_time = time.time()
if attributes:
for k, v in attributes.items():
span.set_attribute(k, v)
try:
yield span
duration = time.time() - start_time
self._record_success_metric(operation_name, duration)
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
self.custom_metrics["errors_total"].add(1, {
"operation": operation_name,
"error_type": type(e).__name__
})
raise
def _record_success_metric(self, operation_name: str, duration: float):
if "request" in operation_name.lower():
self.custom_metrics["mcp_requests_total"].add(1, {"status": "success"})
self.custom_metrics["mcp_request_duration"].record(duration)
elif "query" in operation_name.lower():
self.custom_metrics["database_queries_total"].add(1, {"status": "success"})
self.custom_metrics["database_query_duration"].record(duration)
elif "tool" in operation_name.lower():
self.custom_metrics["tool_executions_total"].add(1, {"status": "success"})
self.custom_metrics["tool_execution_duration"].record(duration)
def record_system_metrics(self):
"""Snapshot current CPU and memory usage."""
self.custom_metrics["system_cpu_usage"].set(psutil.cpu_percent(interval=1))
self.custom_metrics["system_memory_usage"].set(psutil.virtual_memory().used)
Step 2: Structured logging
# mcp_server/logging_config.py
import logging
import json
import sys
from datetime import datetime
import traceback
class StructuredFormatter(logging.Formatter):
"""Format log records as structured JSON for log aggregation."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info)
}
# Merge any custom fields passed via `extra={"extra_data": {...}}`
if hasattr(record, 'extra_data'):
log_entry.update(record.extra_data)
if hasattr(record, 'correlation_id'):
log_entry["correlation_id"] = record.correlation_id
if hasattr(record, 'rls_user_id'):
log_entry["rls_user_id"] = record.rls_user_id
return json.dumps(log_entry, ensure_ascii=False)
class MCPLogger:
"""Domain-specific structured logging utilities."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_mcp_request(self, method: str, user_id: str, rls_user_id: str,
duration: float = None, status: str = "success"):
self.logger.info(f"MCP request: {method} - {status}", extra={"extra_data": {
"event_type": "mcp_request",
"method": method, "user_id": user_id, "rls_user_id": rls_user_id,
"status": status,
**({ "duration_ms": duration * 1000 } if duration is not None else {})
}})
def log_database_query(self, query: str, duration: float,
row_count: int = None, user_id: str = None):
level = logging.WARNING if duration > 1.0 else logging.INFO
self.logger.log(level, f"Database query ({duration*1000:.2f}ms)", extra={"extra_data": {
"event_type": "database_query",
"query_hash": hash(query.strip()),
"duration_ms": duration * 1000,
"query_preview": query[:100] + "..." if len(query) > 100 else query,
**({"row_count": row_count} if row_count is not None else {}),
**({"user_id": user_id} if user_id else {})
}})
def log_security_event(self, event_type: str, user_id: str = None,
success: bool = True, details: dict = None):
level = logging.INFO if success else logging.WARNING
self.logger.log(level, f"Security: {event_type} - {'success' if success else 'failure'}",
extra={"extra_data": {
"event_type": "security_event",
"security_event_type": event_type, "success": success,
**({"user_id": user_id} if user_id else {}),
**({"details": details} if details else {})
}})
mcp_logger = MCPLogger("mcp_server")
Step 3: Intelligent alerting system
# mcp_server/alerting.py
import asyncio
import time
from typing import Dict, Any, Callable
from enum import Enum
from dataclasses import dataclass
from collections import deque
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiohttp
import json
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class AlertRule:
name: str
condition: Callable[[Dict[str, Any]], bool]
severity: AlertSeverity
cooldown_minutes: int
message_template: str
enabled: bool = True
@dataclass
class Alert:
rule_name: str
severity: AlertSeverity
message: str
timestamp: float
details: Dict[str, Any]
acknowledged: bool = False
class AlertManager:
"""Evaluate metrics against rules and dispatch notifications."""
def __init__(self):
self.alert_rules: Dict[str, AlertRule] = {}
self.active_alerts: Dict[str, Alert] = {}
self.alert_history: deque = deque(maxlen=1000)
self.notification_channels = {}
self._setup_default_rules()
def _setup_default_rules(self):
rules = [
AlertRule("database_connection_failure",
lambda m: m.get("database_status") != "healthy",
AlertSeverity.CRITICAL, 5,
"Database connection failure detected. Service may be unavailable."),
AlertRule("high_error_rate",
lambda m: m.get("error_rate", 0) > 0.05,
AlertSeverity.HIGH, 10,
"High error rate detected: {error_rate:.2%}. Investigate immediately."),
AlertRule("slow_query_performance",
lambda m: m.get("avg_query_duration", 0) > 2.0,
AlertSeverity.MEDIUM, 15,
"Slow query performance: avg {avg_query_duration:.2f}s"),
AlertRule("high_cpu_usage",
lambda m: m.get("cpu_usage", 0) > 85,
AlertSeverity.MEDIUM, 10,
"High CPU usage: {cpu_usage:.1f}%"),
AlertRule("high_memory_usage",
lambda m: m.get("memory_usage_percent", 0) > 90,
AlertSeverity.HIGH, 5,
"High memory usage: {memory_usage_percent:.1f}%"),
AlertRule("authentication_failures",
lambda m: m.get("auth_failure_rate", 0) > 0.1,
AlertSeverity.HIGH, 5,
"High auth failure rate: {auth_failure_rate:.2%}. Possible security incident."),
]
for rule in rules:
self.alert_rules[rule.name] = rule
async def evaluate_metrics(self, metrics: Dict[str, Any]):
"""Check all enabled rules against current metrics."""
for rule_name, rule in self.alert_rules.items():
if not rule.enabled:
continue
try:
if rule.condition(metrics):
await self._trigger_alert(rule, metrics)
else:
await self._clear_alert(rule_name)
except Exception as e:
mcp_logger.logger.error(f"Error evaluating rule {rule_name}: {e}")
async def _trigger_alert(self, rule: AlertRule, metrics: Dict[str, Any]):
current_time = time.time()
# Respect cooldown period
if rule.name in self.active_alerts:
if current_time - self.active_alerts[rule.name].timestamp < rule.cooldown_minutes * 60:
return
message = rule.message_template.format(**{k: v for k, v in metrics.items()
if isinstance(v, (int, float, str))})
alert = Alert(rule.name, rule.severity, message, current_time, metrics.copy())
self.active_alerts[rule.name] = alert
self.alert_history.append(alert)
await self._send_notifications(alert)
mcp_logger.log_security_event("alert_triggered", details={
"rule_name": rule.name, "severity": rule.severity.value, "message": message
})
async def _clear_alert(self, rule_name: str):
if rule_name in self.active_alerts:
alert = self.active_alerts.pop(rule_name)
if alert.severity in [AlertSeverity.HIGH, AlertSeverity.CRITICAL]:
resolution = Alert(rule_name, AlertSeverity.LOW,
f"RESOLVED: {alert.message}", time.time(), {"resolution": True})
await self._send_notifications(resolution)
async def _send_notifications(self, alert: Alert):
tasks = [asyncio.create_task(ch.send_notification(alert))
for ch in self.notification_channels.values()]
if tasks:
try:
await asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=30.0)
except asyncio.TimeoutError:
mcp_logger.logger.warning("Some alert notifications timed out")
class TeamsNotifier:
"""Send alert notifications to Microsoft Teams via webhook."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send_notification(self, alert: Alert):
color_map = {
AlertSeverity.LOW: "28a745", AlertSeverity.MEDIUM: "ffc107",
AlertSeverity.HIGH: "fd7e14", AlertSeverity.CRITICAL: "dc3545"
}
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"themeColor": color_map.get(alert.severity, "0076D7"),
"summary": f"MCP Alert: {alert.rule_name}",
"sections": [{"activityTitle": f"{alert.severity.value.upper()} Alert",
"activitySubtitle": alert.rule_name,
"text": alert.message}]
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as resp:
if resp.status != 200:
raise Exception(f"Teams webhook returned {resp.status}")
alert_manager = AlertManager()
Step 4: Custom metrics collector
# mcp_server/metrics_collector.py
import statistics
from typing import Dict, List, Any
from dataclasses import dataclass
from collections import defaultdict, deque
@dataclass
class MetricPoint:
timestamp: float
value: float
dimensions: Dict[str, str]
class MetricsCollector:
"""Rolling-window metrics collection and statistical analysis."""
def __init__(self, retention_minutes: int = 60):
self.retention_seconds = retention_minutes * 60
self.metrics_buffer: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
def record_metric(self, name: str, value: float, dimensions: Dict[str, str] = None):
self.metrics_buffer[name].append(
MetricPoint(timestamp=time.time(), value=value, dimensions=dimensions or {})
)
self._cleanup_old_metrics(name)
def _cleanup_old_metrics(self, metric_name: str):
cutoff = time.time() - self.retention_seconds
buf = self.metrics_buffer[metric_name]
while buf and buf[0].timestamp < cutoff:
buf.popleft()
def get_metric_summary(self, name: str, time_window_minutes: int = 5) -> Dict[str, Any]:
cutoff = time.time() - (time_window_minutes * 60)
points = [p for p in self.metrics_buffer[name] if p.timestamp >= cutoff]
if not points:
return {"error": "No data available"}
values = [p.value for p in points]
return {
"count": len(values),
"min": min(values),
"max": max(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"p95": sorted(values)[int(0.95 * len(values))],
"p99": sorted(values)[int(0.99 * len(values))],
"time_window_minutes": time_window_minutes
}
metrics_collector = MetricsCollector()
Step 5: Azure Monitor dashboard (KQL queries)
{
"version": "Notebook/1.0",
"items": [
{
"type": 10,
"content": {
"version": "KqlItem/1.0",
"query": "requests\n| where timestamp >= ago(1h)\n| where name contains \"mcp\"\n| summarize RequestCount = count(), AvgDuration = avg(duration) by bin(timestamp, 5m)\n| order by timestamp asc",
"title": "MCP Request Volume and Performance",
"visualization": "timechart"
},
"name": "request-metrics"
},
{
"type": 10,
"content": {
"version": "KqlItem/1.0",
"query": "customMetrics\n| where name == \"database_query_duration_seconds\"\n| where timestamp >= ago(1h)\n| summarize AvgDuration = avg(value), P95Duration = percentile(value, 95), P99Duration = percentile(value, 99) by bin(timestamp, 5m)\n| order by timestamp asc",
"title": "Database Query Performance (P95/P99)",
"visualization": "timechart"
},
"name": "database-performance"
},
{
"type": 10,
"content": {
"version": "KqlItem/1.0",
"query": "exceptions\n| where timestamp >= ago(24h)\n| summarize ErrorCount = count() by bin(timestamp, 1h), type\n| order by timestamp asc",
"title": "Error Rate Analysis",
"visualization": "barchart"
},
"name": "error-analysis"
}
]
}
Step 6: Automated diagnostics
# mcp_server/diagnostics.py
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
@dataclass
class DiagnosticResult:
check_name: str
status: str # "pass", "fail", "warning"
message: str
details: Dict[str, Any]
remediation: Optional[str] = None
class DiagnosticsEngine:
"""Run comprehensive health checks and return actionable results."""
async def run_full_diagnostics(self) -> List[DiagnosticResult]:
checks = [
self._check_database_connectivity,
self._check_azure_services,
self._check_system_resources,
self._check_configuration,
]
results = []
for check in checks:
try:
results.append(await check())
except Exception as e:
results.append(DiagnosticResult(
check_name=check.__name__, status="fail",
message=f"Diagnostic check raised: {e}", details={"exception": str(e)}
))
return results
async def _check_database_connectivity(self) -> DiagnosticResult:
try:
start = time.time()
health = await db_provider.health_check()
duration = time.time() - start
if health["status"] == "healthy":
status = "warning" if duration > 1.0 else "pass"
return DiagnosticResult(
check_name="database_connectivity", status=status,
message=f"Database {'responsive but slow' if status == 'warning' else 'healthy'} ({duration:.2f}s)",
details=health,
remediation="Check database load and network latency" if status == "warning" else None
)
return DiagnosticResult(
check_name="database_connectivity", status="fail",
message="Database not healthy", details=health,
remediation="Check database server status and connection parameters"
)
except Exception as e:
return DiagnosticResult(
check_name="database_connectivity", status="fail",
message=f"Database connectivity failed: {e}", details={"error": str(e)},
remediation="Verify database server is running and connection parameters are correct"
)
async def _check_system_resources(self) -> DiagnosticResult:
cpu = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
warnings = []
if cpu > 85: warnings.append(f"High CPU: {cpu:.1f}%")
if memory.percent > 85: warnings.append(f"High memory: {memory.percent:.1f}%")
if disk.percent > 85: warnings.append(f"High disk: {disk.percent:.1f}%")
details = {"cpu_percent": cpu, "memory_percent": memory.percent,
"disk_percent": disk.percent}
if warnings:
return DiagnosticResult("system_resources", "warning",
"; ".join(warnings), details,
"Monitor resource usage and consider scaling")
return DiagnosticResult("system_resources", "pass", "System resources normal", details)
Operational runbook (YAML)
# operational-runbooks.yml
runbooks:
database_connection_failure:
title: "Database Connection Failure"
severity: "critical"
steps:
- "Check docker-compose ps — verify PostgreSQL container is running"
- "Run: docker-compose logs postgres | tail -50"
- "Test direct connection: psql -h localhost -U postgres -d zava -c 'SELECT 1'"
- "Verify connection pool: curl http://localhost:8000/health/detailed"
- "Check firewall/network security group rules"
- "Review POSTGRES_* environment variables in .env"
high_error_rate:
title: "High Error Rate"
severity: "high"
steps:
- "Check Application Insights exceptions in the last 15 minutes"
- "Review structured logs: grep 'ERROR' /app/logs/mcp_server.log | tail -100"
- "Identify most common error types from the error-analysis dashboard"
- "Check recent deployments — roll back if a new version is implicated"
- "Run diagnostics: curl http://localhost:8000/dashboard/diagnostics"
authentication_failures:
title: "Authentication Failures"
severity: "high"
steps:
- "Query security audit log: SELECT * FROM retail.security_failed_auth;"
- "Check for IP-based brute force patterns"
- "Verify Azure Entra ID token issuer and audience configuration"
- "Review JWKS cache validity"
- "Consider temporary IP block if brute force is confirmed"
Key takeaways
- OpenTelemetry + Azure Monitor provides distributed tracing, metrics, and log correlation in one platform
- Structured JSON logs are machine-readable and searchable in Log Analytics
- Cooldown periods prevent alert storms when a rule condition persists
- Resolution notifications for high/critical alerts close the incident loop
- Automated diagnostics provide structured, actionable output for on-call engineers
- KQL dashboards in Azure Monitor Workbooks visualize trends without additional tooling
Next: Lab 12 — Best Practices
Apply performance optimization, security hardening, cost management, and operational excellence to make your MCP server production-ready.