Overview
DispatchAI includes basic observability infrastructure with plans for comprehensive metrics and tracing. Current implementation focuses on structured logging with hooks for future metric collection.
Logging
Configuration
Logging is configured in app/core/logging.py:
import logging
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
setup_logging()
Log levels:
INFO: Normal operations (call events, agent outputs)
WARNING: Non-critical issues (API fallbacks)
ERROR: Operation failures (transcription errors, API failures)
Log Output
All logs write to stdout by default:
# View live logs
./scripts/dev_start.sh
# Save to file
./scripts/dev_start.sh 2>&1 | tee logs/app.log
# Filter by component
./scripts/dev_start.sh 2>&1 | grep "\[telephony\]"
Structured Logging
Implement structured (JSON) logging for production:
# app/core/logging.py (recommended enhancement)
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
def setup_logging():
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.root.addHandler(handler)
logging.root.setLevel(logging.INFO)
Logs are tagged by component for easy filtering:
| Tag | Component | Location |
|---|
[telephony] | Call control, webhooks | app/main.py |
[stt] | Speech transcription | app/main.py:346 |
[emotion] | Emotion classification | app/agents/emotion.py |
[summary] | Summary generation | app/agents/summary.py |
[triage:minimal] | Triage decisions | app/main.py:579 |
Example logs:
2026-03-03 23:45:12 INFO [telephony] webhook hit
2026-03-03 23:45:12 INFO [telephony] answering ccid=abc123
2026-03-03 23:45:15 INFO [telephony] answer -> 200
2026-03-03 23:45:15 INFO [telephony] speak -> 200
2026-03-03 23:45:18 INFO [telephony] streaming_start -> 200
2026-03-03 23:45:45 INFO [stt] deepgram transcription complete
2026-03-03 23:45:46 INFO [triage:minimal] {'call_id': 'call_123', 'risk': {'level': 'CRITICAL', 'score': 0.87}}
Log Analysis
# Count calls by risk level
grep "triage:minimal" logs.txt | grep -o '"level":"[A-Z]*"' | sort | uniq -c
# Find transcription errors
grep "\[stt\].*error" logs.txt
# API failures
grep -E "(openai|deepgram|telnyx).*error" logs.txt -i
# Call lifecycle duration
grep "call_session_123" logs.txt | head -1 # Start
grep "call_session_123" logs.txt | tail -1 # End
Metrics
Current Implementation
Metric collection stubs exist in app/core/observability.py:
def record_metric(name: str, value: float) -> None:
# stub: attach to Prometheus/OTel later
print(f"metric {name}={value}")
Usage (currently prints, does not persist):
from app.core.observability import record_metric
record_metric("calls.received", 1)
record_metric("distress.score", 0.87)
record_metric("transcription.latency_ms", 1234)
Recommended Metrics
Implement these metrics for production:
Call Metrics
# Counter: Total calls received
record_metric("calls.received.total", 1)
# Counter: Calls by risk level
record_metric(f"calls.risk.{risk_level.lower()}.total", 1)
# Counter: Calls by category
record_metric(f"calls.category.{category.lower()}.total", 1)
# Counter: Call outcomes
record_metric("calls.completed.total", 1)
record_metric("calls.failed.total", 1)
# Histogram: Call duration
record_metric("calls.duration_seconds", duration)
Processing Metrics
# Histogram: Transcription latency
record_metric("transcription.latency_ms", latency)
# Counter: Transcription failures
record_metric("transcription.failed.total", 1)
# Histogram: Emotion analysis latency
record_metric("emotion.latency_ms", latency)
# Histogram: Summary generation latency
record_metric("summary.latency_ms", latency)
# Counter: API provider usage
record_metric(f"api.{provider}.requests.total", 1)
record_metric(f"api.{provider}.errors.total", 1)
Queue Metrics
# Gauge: Current queue size
record_metric("queue.size", len(queue_items))
# Gauge: Queue by status
record_metric(f"queue.status.{status.lower()}", count)
# Histogram: Time in queue
record_metric("queue.wait_time_seconds", wait_time)
# Counter: Queue state transitions
record_metric(f"queue.transition.{from_status}_to_{to_status}.total", 1)
System Metrics
# Gauge: Active WebSocket connections
record_metric("websocket.connections.active", len(active_connections))
# Histogram: WebSocket message latency
record_metric("websocket.message.latency_ms", latency)
# Gauge: In-memory storage size
record_metric("storage.recent_calls", len(recent_calls))
record_metric("storage.queue_items", len(queue_items))
Prometheus Integration
Implement Prometheus metrics collection:
# app/core/observability.py (recommended)
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from fastapi import Response
# Define metrics
calls_received = Counter(
"dispatchai_calls_received_total",
"Total calls received",
["risk_level", "category"]
)
call_duration = Histogram(
"dispatchai_call_duration_seconds",
"Call duration in seconds",
buckets=[5, 10, 30, 60, 120, 300, 600]
)
transcription_latency = Histogram(
"dispatchai_transcription_latency_seconds",
"Transcription processing time",
buckets=[0.5, 1, 2, 5, 10, 30]
)
queue_size = Gauge(
"dispatchai_queue_size",
"Current queue size",
["status"]
)
def record_metric(name: str, value: float, labels: dict = None) -> None:
# Map to appropriate Prometheus metric
if name == "calls.received":
calls_received.labels(**labels).inc()
elif name == "calls.duration":
call_duration.observe(value)
# ... etc
# Add to FastAPI app
@app.get("/metrics")
def metrics():
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
Scrape configuration (prometheus.yml):
scrape_configs:
- job_name: 'dispatchai'
scrape_interval: 15s
static_configs:
- targets: ['localhost:8000']
Tracing
Distributed Tracing (Future)
Implement OpenTelemetry for request tracing:
# app/core/tracing.py (planned)
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Export to Jaeger/Tempo
span_processor = BatchSpanProcessor(OTLPSpanExporter(
endpoint="http://localhost:4317"
))
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
Trace call processing:
@tracer.start_as_current_span("process_call")
async def process_call(call_id: str):
with tracer.start_as_current_span("transcribe"):
transcript = await transcribe(wav_path)
with tracer.start_as_current_span("analyze_emotion"):
emotion = await analyze_emotion(transcript)
with tracer.start_as_current_span("generate_summary"):
summary = await generate_summary(transcript)
return {"transcript": transcript, "emotion": emotion, "summary": summary}
Monitoring Dashboards
Built-in Debug Views
DispatchAI includes browser-based monitoring:
Live Calls
http://localhost:8000/debug/live_calls/
Real-time view of active calls:
- Call status and duration
- Live transcript (1s polling)
- Audio distress metrics
- Emotion classification
- Risk assessment
- Service routing
Implementation: app/main.py:881-1185
Queue Status
http://localhost:8000/debug/list_queue/
Dispatcher queue interface:
- Sorted by priority (risk × status weight)
- Status badges (OPEN, IN_PROGRESS, etc.)
- Risk levels with color coding
- Emergency categories and tags
Implementation: app/main.py:1257-1550
Recent Calls
http://localhost:8000/debug/list_calls/
Historical call log:
- Last 200 calls (configurable)
- Full NLP analysis
- Audio signals
- Risk scoring
Implementation: app/main.py:1187-1255
Debug views are not secured and should be disabled in production or placed behind authentication.
Grafana Dashboard (Recommended)
Create a Grafana dashboard for production monitoring:
Panels:
-
Call Volume
- Query:
rate(dispatchai_calls_received_total[5m])
- Type: Graph
- Panel: Calls per second by risk level
-
Queue Depth
- Query:
dispatchai_queue_size{status="OPEN"}
- Type: Gauge
- Alert: > 10 pending calls
-
Processing Latency
- Query:
histogram_quantile(0.95, dispatchai_transcription_latency_seconds)
- Type: Graph
- Panel: P95 transcription latency
-
Risk Distribution
- Query:
sum by (risk_level) (dispatchai_calls_received_total)
- Type: Pie chart
-
Error Rate
- Query:
rate(dispatchai_api_errors_total[5m])
- Type: Graph
- Alert: > 0.1 errors/sec
-
System Health
- Query:
up{job="dispatchai"}
- Type: Stat
- Alert: = 0
Alerting
Alert Conditions
Recommended alerts for production:
Critical Alerts
# Service Down
alert: DispatchAIDown
expr: up{job="dispatchai"} == 0
for: 1m
annotations:
summary: "DispatchAI service is down"
description: "No metrics received for 1 minute"
# Queue Backlog
alert: QueueBacklog
expr: dispatchai_queue_size{status="OPEN"} > 10
for: 5m
annotations:
summary: "Emergency queue backlog"
description: "{{ $value }} calls waiting for dispatch"
# High Error Rate
alert: HighErrorRate
expr: rate(dispatchai_api_errors_total[5m]) > 0.1
for: 2m
annotations:
summary: "High API error rate"
description: "{{ $value }} errors per second"
Warning Alerts
# Slow Transcription
alert: SlowTranscription
expr: histogram_quantile(0.95, dispatchai_transcription_latency_seconds) > 10
for: 5m
annotations:
summary: "Transcription latency degraded"
description: "P95 latency: {{ $value }}s"
# Old Calls in Queue
alert: StaleQueueItems
expr: max(time() - dispatchai_queue_item_age_seconds) > 600
for: 1m
annotations:
summary: "Call in queue for >10 minutes"
description: "Oldest call: {{ $value }}s old"
Alert Routing
Configure Alertmanager for notification routing:
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 5m
repeat_interval: 3h
receiver: 'team-dispatch'
routes:
- match:
severity: critical
receiver: 'pagerduty'
continue: true
- match:
severity: warning
receiver: 'slack'
receivers:
- name: 'pagerduty'
pagerduty_configs:
- service_key: '<integration-key>'
- name: 'slack'
slack_configs:
- api_url: '<webhook-url>'
channel: '#dispatch-alerts'
- name: 'team-dispatch'
email_configs:
- to: '[email protected]'
Health Checks
Endpoint
Basic health check at app/main.py:703-705:
@app.get("/health")
def health_check():
return {"status": "ok"}
Enhanced Health Check (Recommended)
Implement comprehensive health checks:
@app.get("/health")
async def health_check():
checks = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat() + "Z",
"checks": {}
}
# Check external dependencies
checks["checks"]["telnyx"] = await check_telnyx_api()
checks["checks"]["deepgram"] = await check_deepgram_api()
checks["checks"]["openai"] = await check_openai_api() if OPENAI_API_KEY else {"status": "disabled"}
# Check internal state
checks["checks"]["storage"] = {
"status": "healthy",
"recent_calls": len(CALL_STORE.list_recent_calls(limit=1)),
"queue_items": len(CALL_STORE.list_queue_items())
}
checks["checks"]["websocket"] = {
"status": "healthy",
"active_connections": len(LIVE_QUEUE)
}
# Determine overall status
if any(c.get("status") == "unhealthy" for c in checks["checks"].values()):
checks["status"] = "degraded"
return JSONResponse(content=checks, status_code=503)
return checks
async def check_telnyx_api() -> dict:
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(
"https://api.telnyx.com/v2/available_phone_numbers",
headers={"Authorization": f"Bearer {TELNYX_API_KEY}"},
params={"filter[limit]": 1}
)
return {"status": "healthy" if r.status_code == 200 else "unhealthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Readiness vs Liveness
Implement separate probes for Kubernetes:
# Liveness: Is the process running?
@app.get("/healthz")
def liveness():
return {"status": "ok"}
# Readiness: Can it handle requests?
@app.get("/readyz")
async def readiness():
# Check critical dependencies
if not TELNYX_API_KEY or not DEEPGRAM_API_KEY:
return JSONResponse(
content={"status": "not ready", "reason": "missing credentials"},
status_code=503
)
return {"status": "ready"}
Request Timing
Add middleware to track request latency:
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
# Record metric
record_metric(
"http.request.duration_seconds",
process_time,
labels={"method": request.method, "path": request.url.path}
)
return response
Database Query Timing (Future)
When PostgreSQL is implemented:
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time'].pop(-1)
record_metric("database.query.duration_seconds", total)
Debugging
Debug Mode
Enable FastAPI debug mode in development:
# app/main.py
app = FastAPI(
title="dispatchai",
version="0.1.0",
debug=(APP_ENV == AppEnv.DEV)
)
Request Tracing
Add correlation IDs to trace requests:
import uuid
from contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar('request_id', default=None)
@app.middleware("http")
async def add_correlation_id(request: Request, call_next):
request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
request_id_var.set(request_id)
response = await call_next(request)
response.headers['X-Request-ID'] = request_id
return response
# Use in logs
import logging
logger = logging.getLogger(__name__)
logger.info(f"[{request_id_var.get()}] Processing call")
Interactive Debugging
For local development, use debugpy:
# app/main.py (development only)
import os
if os.getenv("DEBUGPY_ENABLE") == "1":
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("Debugger listening on port 5678")
Connect from VS Code with launch configuration:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
}
}
]
}