Skip to main content

Architecture Overview

DispatchAI is built as a modular monolith — a single FastAPI application with clear separation of concerns. This architecture keeps development simple while maintaining clean boundaries for future scaling.

Design Philosophy

Modular Monolith

Single process with well-defined module boundaries

Real-Time First

WebSocket streaming for instant audio analysis

Provider Agnostic

Swap STT, emotion, or storage providers via config

System Components

app/
├── agents/          # Streaming analysis pipeline
│   ├── pipeline.py      # Orchestrator (minimal)
│   ├── audio_track.py   # Audio distress analysis
│   ├── nlp_track.py     # NLP processing
│   ├── stt_client.py    # Deepgram WebSocket client
│   ├── emotion.py       # Emotion classification
│   ├── service_classify.py  # Category + tag extraction
│   ├── summary.py       # LLM-based summarization
│   └── merger.py        # Combine tracks into CallPacket
├── api/
│   ├── http/            # REST endpoints
│   │   ├── routes_health.py   # /health
│   │   └── routes_queue.py    # /api/v1/queue
│   └── ws/              # WebSocket handler
│       └── handler.py   # /ws (audio streaming)
├── core/                # Configuration & observability
│   ├── config.py        # Settings (Pydantic)
│   ├── logging.py       # Structured logging
│   ├── observability.py # Metrics/tracing stubs
│   └── state.py         # Shared in-memory state
├── ranking/             # Priority scoring
│   ├── ranking.py       # Weight calculation
│   ├── service.py       # Queue management
│   └── adapters/        # Storage interfaces
├── schemas/             # Pydantic models
│   ├── call_packet.py   # CallPacket, NLPResult, AudioResult
│   ├── queue.py         # Queue item schema
│   └── telephony.py     # Telnyx webhook types
├── services/            # Third-party provider boundaries
│   ├── stt_provider.py      # Speech-to-text (Deepgram)
│   ├── llm_provider.py      # LLM (OpenAI)
│   ├── emotion_provider.py  # Emotion detection
│   └── storage.py           # Persistence layer
├── telephony/           # Telnyx integration
│   ├── telnyx_client.py  # API client
│   ├── webhooks.py       # Webhook handlers
│   └── commands.py       # Call control (answer, speak, stream)
├── ui/static/           # Dashboard frontend (HTML/JS)
├── utils/               # Shared helpers
│   ├── ids.py           # ID generation
│   ├── time.py          # Timestamp utilities
│   └── errors.py        # Custom exceptions
├── workers/             # Background task workers
│   └── background.py    # Async task queue
└── main.py              # FastAPI app + routes

Data Flow

Here’s how a call moves through the system:

Core Modules

1. Telephony Layer (app/telephony/)

Responsibility: Handle incoming calls, webhooks, and Telnyx API interactions.

Webhook Flow (app/main.py:353-626)

The /api/v1/call/incoming endpoint handles three key events:
1

call.initiated

When a call is initiated:
if event == "call.initiated":
    call_control_id = payload["call_control_id"]
    call_id = payload["call_session_id"]
    
    # Map control ID to call ID for WebSocket
    CALL_CONTROL_TO_CALL_ID[call_control_id] = call_id
    
    # Answer + start streaming in background
    bg.add_task(_answer_and_stream, call_control_id)
See app/main.py:363-403
2

call.answered

When the call is answered:
if event == "call.answered":
    # Initialize LIVE_QUEUE entry
    LIVE_QUEUE[call_id] = {
        "id": call_id,
        "status": "LIVE",
        "risk_level": "UNKNOWN",
        "summary": "Listening…"
    }
See app/main.py:405-435
3

call.hangup

When the call ends:
if event == "call.hangup":
    # Finalize transcript with Deepgram batch API
    transcript = await _transcribe_wav_deepgram(wav_path)
    
    # Analyze emotion
    emotion = await analyze_emotion(transcript, distress_score)
    
    # Classify service category + tags
    service = classify_service_and_tags(transcript, distress_score)
    
    # Compute risk level (considers tags!)
    risk = compute_risk_level(distress_score, emotion, service["tags"])
    
    # Build priority ranking
    ranking = build_ranking(RankingInputs(...))
    
    # Add to dispatch queue
    CALL_STORE.upsert_queue_item(queue_item)
See app/main.py:438-625

Answer & Stream (app/main.py:175-214)

The _answer_and_stream function:
  1. Answers the call via Telnyx Call Control API
  2. Speaks a greeting using TTS: “This is Dispatch AI. I’m listening…”
  3. Starts audio streaming to the WebSocket endpoint:
await httpx.post(
    f"https://api.telnyx.com/v2/calls/{call_control_id}/actions/streaming_start",
    json={
        "stream_url": WS_PUBLIC_URL,  # wss://your-ngrok.io/ws
        "audio": {
            "direction": "inbound",
            "format": "pcm16",
            "sample_rate": 8000
        }
    }
)
See app/main.py:175-214

2. WebSocket Handler (app/api/ws/handler.py)

Responsibility: Receive audio frames from Telnyx, compute distress, and stream to STT.
Telnyx sends audio as PCMU (µ-law) encoded bytes. We convert to PCM16 (little-endian) for processing.

Audio Processing Pipeline

1

Connection

Client connects to /ws. Server accepts and initializes buffers:
await ws.accept()
pcm_wav_buffer = bytearray()  # For saving WAV file
emit_buffer = bytearray()     # For chunking
2

Start Event

Telnyx sends {"event": "start"} with metadata:
call_control_id = evt["start"]["call_control_id"]
call_id = CALL_CONTROL_TO_CALL_ID[call_control_id]

# Initialize per-call state
LIVE_SIGNALS[call_id] = {
    "chunks": 0,
    "voiced_seconds": 0.0,
    "distress": 0.0,
    "transcript_live": "",
    # ...
}

# Start Deepgram STT session
await stt.start(sample_rate=8000)
See app/api/ws/handler.py:395-431
3

Media Frames

Audio arrives as base64-encoded PCMU:
if kind == "media":
    b64 = evt["media"]["payload"]
    raw = base64.b64decode(b64)
    
    # Convert PCMU → PCM16 little-endian
    if len(raw) in (80, 160):  # PCMU frame sizes
        le16 = mulaw_to_pcm16le(raw)
    else:
        le16 = swap_endian_16(raw)
    
    # Stream to Deepgram
    await stt.feed(le16)
See app/api/ws/handler.py:433-460
4

Distress Calculation

Chunk audio into 160ms blocks and compute distress via EMA:
emit_buffer += le16
while len(emit_buffer) >= CHUNK_BYTES:  # 2560 bytes = 160ms @ 8kHz
    chunk = emit_buffer[:CHUNK_BYTES]
    
    # Voice Activity Detection (VAD)
    rms = rms_norm_pcm16le(chunk)  # RMS normalized to [0,1]
    voiced = rms >= 0.02  # 2% threshold
    
    if voiced:
        signals["voiced_chunks"] += 1
    
    # Exponential Moving Average (EMA) distress
    ema = alpha * rms + (1 - alpha) * ema
    diff = max(0.0, rms - ema)
    score = max(score * 0.9, min(1.0, diff * 8.0))
    
    signals["distress"] = score
    signals["max_distress"] = max(signals["max_distress"], score)
See app/api/ws/handler.py:462-498
5

Stop Event

When the stream ends, finalize the STT session:
if kind == "stop":
    result = await stt.finalize()
    if result.get("transcript"):
        signals["transcript"] = result["transcript"].strip()
    break
See app/api/ws/handler.py:502-512

VAD Algorithm

Voice Activity Detection uses RMS energy:
def rms_norm_pcm16le(pcm: bytes) -> float:
    total = 0
    count = 0
    for i in range(0, len(pcm), 2):
        s = int.from_bytes(pcm[i:i+2], "little", signed=True)
        total += s * s
        count += 1
    rms = math.sqrt(total / count)
    return min(1.0, rms / 32768.0)  # Normalize to [0,1]
See app/api/ws/handler.py:309-325

3. Agents Pipeline (app/agents/)

Responsibility: Parallel audio and NLP analysis tracks.
The current implementation runs analysis inline during webhook handling. A future refactor will extract this into a true parallel pipeline.

Emotion Classification (app/agents/emotion.py)

Supports three providers:
Rule-based classifier using distress score + keyword matching:
async def _analyze_emotion_heuristic(transcript: str, distress: float):
    # Base label from audio distress
    if distress >= 0.7:
        label = "HIGHLY_DISTRESSED"
    elif distress >= 0.3:
        label = "DISTRESSED"
    elif distress >= 0.15:
        label = "TENSE"
    else:
        label = "CALM"
    
    # Override for life-threatening keywords
    life_threatening = [
        "shot", "shooting", "stabbed", "can't breathe",
        "not breathing", "overdose", "unconscious", "bleeding out"
    ]
    
    if any(k in transcript.lower() for k in life_threatening):
        label = "HIGHLY_DISTRESSED"
        intensity = max(intensity, 0.8)
    
    return {"label": label, "intensity": intensity, "sentiment": sentiment}
See app/agents/emotion.py:27-116
Configure via .env:
EMOTION_PROVIDER=heuristic  # or deepgram, openai

Service Classification (app/agents/service_classify.py)

Extract category (EMS/FIRE/POLICE/OTHER) and semantic tags:
def classify_service_and_tags(transcript: str, distress: float) -> dict:
    text = transcript.lower()
    
    # Medical keywords
    if any(k in text for k in ["chest pain", "heart attack", "stroke", "seizure"]):
        category = "EMS"
        tags = ["CARDIAC_EVENT"] if "heart" in text else ["MEDICAL"]
    
    # Violence keywords
    elif any(k in text for k in ["shot", "stabbed", "assault"]):
        category = "EMS"  # Medical response needed
        tags = ["GUNSHOT"] if "shot" in text else ["STABBING"]
    
    # Fire keywords
    elif any(k in text for k in ["fire", "smoke", "burning"]):
        category = "FIRE"
        tags = ["FIRE", "SMOKE"]
    
    return {"category": category, "tags": tags, "confidence": 0.85}
Implementation varies; see app/agents/service_classify.py

4. Risk & Ranking (app/ranking/)

Responsibility: Score calls for priority queue.

Risk Level Computation (app/main.py:224-321)

Combines audio distress, emotion, and semantic tags:
def compute_risk_level(distress_score: float, emotion: dict, tags: list) -> dict:
    base = float(distress_score or 0.0)
    
    # Critical tags override low distress (shock/dissociation)
    critical_tags = {
        "ACTIVE_SHOOTER", "GUNSHOT", "NOT_BREATHING", "CARDIAC_ARREST",
        "UNCONSCIOUS", "SEVERE_BLEEDING", "OVERDOSE", "SUICIDE_ATTEMPT"
    }
    
    elevated_tags = {
        "TRAUMA", "VIOLENCE", "FIRE", "STROKE", "SEIZURE",
        "CHEST_PAIN", "VEHICLE_ACCIDENT"
    }
    
    tags_upper = {t.upper() for t in tags}
    
    if tags_upper & critical_tags:
        score = max(score, 0.9)
        level = "CRITICAL"
    elif tags_upper & elevated_tags:
        score = max(score, 0.6)
        level = "ELEVATED" if score < 0.7 else "CRITICAL"
    else:
        # Bucketing by distress score
        if score >= 0.7:
            level = "CRITICAL"
        elif score >= 0.4:
            level = "ELEVATED"
        elif score >= 0.15:
            level = "NORMAL"
        else:
            level = "LOW"
    
    return {"level": level, "score": float(round(score, 3))}
Critical Fix: Tags like GUNSHOT or NOT_BREATHING force CRITICAL risk even if the caller sounds calm (shock, numbness).
See app/main.py:224-321

Priority Weight Calculation (app/ranking/ranking.py:58-82)

Calls are sorted by a composite weight:
def build_ranking(inputs: RankingInputs) -> dict:
    base_risk = RISK_LEVEL_WEIGHT[inputs.risk_level]  # 1-4
    cat_weight = CATEGORY_PRIORITY[inputs.category]    # 1-5 (EMS=5, OTHER=1)
    
    tag_weight = sum(TAG_WEIGHTS.get(t.upper(), 0) for t in inputs.tags)
    # Example: ["GUNSHOT", "BLEEDING"] → 9 + 7 = 16
    
    weight = (
        base_risk * 20       # CRITICAL=80, LOW=20
        + cat_weight * 10    # EMS=50, OTHER=10
        + tag_weight * 1.5   # Tags stack up
        + inputs.risk_score * 10  # Fine-tune with 0-1 score
    )
    
    return {"weight": float(weight), "score": inputs.risk_score, ...}
Example:
  • Call A: CRITICAL risk (80) + EMS (50) + GUNSHOT tag (13.5) + 0.92 score (9.2) = 152.7
  • Call B: ELEVATED risk (60) + FIRE (40) + FIRE tag (12) + 0.65 score (6.5) = 118.5
Call A is ranked higher. See app/ranking/ranking.py:58-82

5. Queue & Storage (app/main.py + app/ranking/service.py)

Current Implementation: In-memory storage via InMemoryCallStore Production: Swap for PostgreSQL-backed CallStore implementation
class CallStore(ABC):
    @abstractmethod
    def save_call_record(self, packet: dict) -> None: ...
    
    @abstractmethod
    def upsert_queue_item(self, item: dict) -> None: ...
    
    @abstractmethod
    def list_queue_items(self) -> List[dict]: ...
See app/main.py:98-118 for interface definition

Queue Retrieval (app/main.py:732-775)

The /api/v1/queue endpoint returns ranked items:
@app.get("/api/v1/queue")
def queue_summary():
    items = CALL_STORE.list_queue_items()
    
    # Sort by weight (descending), then score, then created_at
    def sort_key(item: dict):
        lvl = item["risk_level"]  # CRITICAL, ELEVATED, ...
        score = float(item["risk_score"])
        status = item["status"]    # OPEN, IN_PROGRESS, RESOLVED
        
        combined_weight = RANK_WEIGHT[lvl] + STATUS_WEIGHT[status]
        return (-combined_weight, -score, item["created_at"])
    
    items_sorted = sorted(items, key=sort_key)
    return items_sorted
See app/main.py:732-775

State Management

Shared State (app/core/state.py)

# Map call_control_id → call_id for WebSocket routing
CALL_CONTROL_TO_CALL_ID: Dict[str, str] = {}

# Per-call real-time signals (distress, transcript, etc.)
LIVE_SIGNALS: Dict[str, dict] = {}

# Calls currently streaming (shown in /debug/live_calls/)
LIVE_QUEUE: Dict[str, dict] = {}

# All active calls (RINGING, ACTIVE, ENDED)
LIVE_CALLS: Dict[str, dict] = {}
In-memory state is lost on restart. Production should use Redis or PostgreSQL for persistence.

API Endpoints

REST API

MethodEndpointDescriptionImplementation
GET/healthHealth checkapp/main.py:703
POST/api/v1/call/incomingTelnyx webhook (call events)app/main.py:353
GET/api/v1/queueGet ranked dispatch queueapp/main.py:732
GET/api/v1/live_queueGet calls currently streamingapp/main.py:777
GET/api/v1/callsList recent callsapp/main.py:707
GET/api/v1/calls/{call_id}Get call detailapp/main.py:814
PATCH/api/v1/queue/{call_id}/statusUpdate call statusapp/main.py:847

WebSocket API

EndpointProtocolDescriptionImplementation
/wsWebSocketAudio frame streaming from Telnyxapp/api/ws/handler.py:344

Scaling Considerations

Current Limits (Development)

  • Concurrent Calls: ~10-20 (single process, in-memory)
  • Storage: In-memory (lost on restart)
  • STT Sessions: Limited by Deepgram concurrency

Production Scaling

1

Horizontal Scaling

Deploy multiple FastAPI instances behind a load balancer:
  • HTTP endpoints: Round-robin load balancing
  • WebSocket /ws: Sticky sessions via call_control_id
2

Persistent Storage

Replace InMemoryCallStore with PostgreSQL:
class PostgresCallStore(CallStore):
    def upsert_queue_item(self, item: dict):
        await db.execute(
            "INSERT INTO queue_items (...) VALUES (...) ON CONFLICT ...",
            item
        )
3

Async Task Queue

Offload heavy processing (emotion, summarization) to Celery/RQ:
@app.post("/api/v1/call/incoming")
async def incoming_call(...):
    if event == "call.hangup":
        # Queue background task
        task_queue.enqueue(process_call_packet, call_id=call_id)
4

WebSocket State

Use Redis for shared state across instances:
LIVE_SIGNALS = RedisDict(redis_client, prefix="live_signals:")

Configuration

Environment Variables

All configuration is loaded from .env via Pydantic:
class Settings(BaseSettings):
    telnyx_api_key: str | None = None
    deepgram_api_key: str | None = None
    openai_api_key: str | None = None
    emotion_provider: str = "heuristic"  # heuristic | deepgram | openai
    port: int = 8000
    
    class Config:
        env_prefix = ""
        case_sensitive = False

settings = Settings()
See app/core/config.py

Feature Flags

APP_ENV = os.getenv("APP_ENV", "dev")  # dev | prod

if APP_ENV == "dev":
    # Enable debug endpoints
    app.include_router(debug_routes)

Testing

The project includes unit, integration, and end-to-end tests:
pytest tests/               # Run all tests
pytest tests/unit/          # Unit tests only
pytest tests/integration/   # Integration tests (require API keys)
pytest tests/e2e/           # E2E tests (require Telnyx number)

Test Structure

tests/
├── unit/
│   ├── test_emotion.py      # Emotion classification
│   ├── test_ranking.py      # Weight calculation
│   └── test_vad.py          # VAD algorithm
├── integration/
│   ├── test_deepgram.py     # STT provider
│   └── test_telnyx.py       # Webhook handling
└── e2e/
    └── test_full_flow.py    # End-to-end call simulation

Next Steps

API Reference

Full REST and WebSocket API documentation

Production Deployment

Deploy to AWS/GCP with PostgreSQL and Redis

Custom Agents

Build your own audio or NLP analysis agents

Webhook Security

Verify Telnyx webhook signatures

Build docs developers (and LLMs) love