Architecture Overview
DispatchAI is built as a modular monolith — a single FastAPI application with clear separation of concerns. This architecture keeps development simple while maintaining clean boundaries for future scaling.
Design Philosophy
Modular Monolith Single process with well-defined module boundaries
Real-Time First WebSocket streaming for instant audio analysis
Provider Agnostic Swap STT, emotion, or storage providers via config
System Components
app/
├── agents/ # Streaming analysis pipeline
│ ├── pipeline.py # Orchestrator (minimal)
│ ├── audio_track.py # Audio distress analysis
│ ├── nlp_track.py # NLP processing
│ ├── stt_client.py # Deepgram WebSocket client
│ ├── emotion.py # Emotion classification
│ ├── service_classify.py # Category + tag extraction
│ ├── summary.py # LLM-based summarization
│ └── merger.py # Combine tracks into CallPacket
├── api/
│ ├── http/ # REST endpoints
│ │ ├── routes_health.py # /health
│ │ └── routes_queue.py # /api/v1/queue
│ └── ws/ # WebSocket handler
│ └── handler.py # /ws (audio streaming)
├── core/ # Configuration & observability
│ ├── config.py # Settings (Pydantic)
│ ├── logging.py # Structured logging
│ ├── observability.py # Metrics/tracing stubs
│ └── state.py # Shared in-memory state
├── ranking/ # Priority scoring
│ ├── ranking.py # Weight calculation
│ ├── service.py # Queue management
│ └── adapters/ # Storage interfaces
├── schemas/ # Pydantic models
│ ├── call_packet.py # CallPacket, NLPResult, AudioResult
│ ├── queue.py # Queue item schema
│ └── telephony.py # Telnyx webhook types
├── services/ # Third-party provider boundaries
│ ├── stt_provider.py # Speech-to-text (Deepgram)
│ ├── llm_provider.py # LLM (OpenAI)
│ ├── emotion_provider.py # Emotion detection
│ └── storage.py # Persistence layer
├── telephony/ # Telnyx integration
│ ├── telnyx_client.py # API client
│ ├── webhooks.py # Webhook handlers
│ └── commands.py # Call control (answer, speak, stream)
├── ui/static/ # Dashboard frontend (HTML/JS)
├── utils/ # Shared helpers
│ ├── ids.py # ID generation
│ ├── time.py # Timestamp utilities
│ └── errors.py # Custom exceptions
├── workers/ # Background task workers
│ └── background.py # Async task queue
└── main.py # FastAPI app + routes
Data Flow
Here’s how a call moves through the system:
Core Modules
1. Telephony Layer (app/telephony/)
Responsibility: Handle incoming calls, webhooks, and Telnyx API interactions.
Webhook Flow (app/main.py:353-626)
The /api/v1/call/incoming endpoint handles three key events:
call.initiated
When a call is initiated: if event == "call.initiated" :
call_control_id = payload[ "call_control_id" ]
call_id = payload[ "call_session_id" ]
# Map control ID to call ID for WebSocket
CALL_CONTROL_TO_CALL_ID [call_control_id] = call_id
# Answer + start streaming in background
bg.add_task(_answer_and_stream, call_control_id)
See app/main.py:363-403
call.answered
When the call is answered: if event == "call.answered" :
# Initialize LIVE_QUEUE entry
LIVE_QUEUE [call_id] = {
"id" : call_id,
"status" : "LIVE" ,
"risk_level" : "UNKNOWN" ,
"summary" : "Listening…"
}
See app/main.py:405-435
call.hangup
When the call ends: if event == "call.hangup" :
# Finalize transcript with Deepgram batch API
transcript = await _transcribe_wav_deepgram(wav_path)
# Analyze emotion
emotion = await analyze_emotion(transcript, distress_score)
# Classify service category + tags
service = classify_service_and_tags(transcript, distress_score)
# Compute risk level (considers tags!)
risk = compute_risk_level(distress_score, emotion, service[ "tags" ])
# Build priority ranking
ranking = build_ranking(RankingInputs( ... ))
# Add to dispatch queue
CALL_STORE .upsert_queue_item(queue_item)
See app/main.py:438-625
Answer & Stream (app/main.py:175-214)
The _answer_and_stream function:
Answers the call via Telnyx Call Control API
Speaks a greeting using TTS: “This is Dispatch AI. I’m listening…”
Starts audio streaming to the WebSocket endpoint:
await httpx.post(
f "https://api.telnyx.com/v2/calls/ { call_control_id } /actions/streaming_start" ,
json = {
"stream_url" : WS_PUBLIC_URL , # wss://your-ngrok.io/ws
"audio" : {
"direction" : "inbound" ,
"format" : "pcm16" ,
"sample_rate" : 8000
}
}
)
See app/main.py:175-214
2. WebSocket Handler (app/api/ws/handler.py)
Responsibility: Receive audio frames from Telnyx, compute distress, and stream to STT.
Telnyx sends audio as PCMU (µ-law) encoded bytes. We convert to PCM16 (little-endian) for processing.
Audio Processing Pipeline
Connection
Client connects to /ws. Server accepts and initializes buffers: await ws.accept()
pcm_wav_buffer = bytearray () # For saving WAV file
emit_buffer = bytearray () # For chunking
Start Event
Telnyx sends {"event": "start"} with metadata: call_control_id = evt[ "start" ][ "call_control_id" ]
call_id = CALL_CONTROL_TO_CALL_ID [call_control_id]
# Initialize per-call state
LIVE_SIGNALS [call_id] = {
"chunks" : 0 ,
"voiced_seconds" : 0.0 ,
"distress" : 0.0 ,
"transcript_live" : "" ,
# ...
}
# Start Deepgram STT session
await stt.start( sample_rate = 8000 )
See app/api/ws/handler.py:395-431
Media Frames
Audio arrives as base64-encoded PCMU: if kind == "media" :
b64 = evt[ "media" ][ "payload" ]
raw = base64.b64decode(b64)
# Convert PCMU → PCM16 little-endian
if len (raw) in ( 80 , 160 ): # PCMU frame sizes
le16 = mulaw_to_pcm16le(raw)
else :
le16 = swap_endian_16(raw)
# Stream to Deepgram
await stt.feed(le16)
See app/api/ws/handler.py:433-460
Distress Calculation
Chunk audio into 160ms blocks and compute distress via EMA: emit_buffer += le16
while len (emit_buffer) >= CHUNK_BYTES : # 2560 bytes = 160ms @ 8kHz
chunk = emit_buffer[: CHUNK_BYTES ]
# Voice Activity Detection (VAD)
rms = rms_norm_pcm16le(chunk) # RMS normalized to [0,1]
voiced = rms >= 0.02 # 2% threshold
if voiced:
signals[ "voiced_chunks" ] += 1
# Exponential Moving Average (EMA) distress
ema = alpha * rms + ( 1 - alpha) * ema
diff = max ( 0.0 , rms - ema)
score = max (score * 0.9 , min ( 1.0 , diff * 8.0 ))
signals[ "distress" ] = score
signals[ "max_distress" ] = max (signals[ "max_distress" ], score)
See app/api/ws/handler.py:462-498
Stop Event
When the stream ends, finalize the STT session: if kind == "stop" :
result = await stt.finalize()
if result.get( "transcript" ):
signals[ "transcript" ] = result[ "transcript" ].strip()
break
See app/api/ws/handler.py:502-512
VAD Algorithm
Voice Activity Detection uses RMS energy:
def rms_norm_pcm16le ( pcm : bytes ) -> float :
total = 0
count = 0
for i in range ( 0 , len (pcm), 2 ):
s = int .from_bytes(pcm[i:i + 2 ], "little" , signed = True )
total += s * s
count += 1
rms = math.sqrt(total / count)
return min ( 1.0 , rms / 32768.0 ) # Normalize to [0,1]
See app/api/ws/handler.py:309-325
3. Agents Pipeline (app/agents/)
Responsibility: Parallel audio and NLP analysis tracks.
The current implementation runs analysis inline during webhook handling. A future refactor will extract this into a true parallel pipeline.
Emotion Classification (app/agents/emotion.py)
Supports three providers:
Heuristic
Deepgram
OpenAI
Rule-based classifier using distress score + keyword matching: async def _analyze_emotion_heuristic ( transcript : str , distress : float ):
# Base label from audio distress
if distress >= 0.7 :
label = "HIGHLY_DISTRESSED"
elif distress >= 0.3 :
label = "DISTRESSED"
elif distress >= 0.15 :
label = "TENSE"
else :
label = "CALM"
# Override for life-threatening keywords
life_threatening = [
"shot" , "shooting" , "stabbed" , "can't breathe" ,
"not breathing" , "overdose" , "unconscious" , "bleeding out"
]
if any (k in transcript.lower() for k in life_threatening):
label = "HIGHLY_DISTRESSED"
intensity = max (intensity, 0.8 )
return { "label" : label, "intensity" : intensity, "sentiment" : sentiment}
See app/agents/emotion.py:27-116 Uses Deepgram Text Intelligence (sentiment analysis): async def _analyze_emotion_deepgram ( transcript : str , distress : float ):
response = await httpx.post(
"https://api.deepgram.com/v1/read?sentiment=true&language=en" ,
headers = { "Authorization" : f "Token { DEEPGRAM_API_KEY } " },
json = { "text" : transcript}
)
sentiment = response[ "results" ][ "sentiments" ][ "average" ][ "sentiment" ]
# → "positive", "neutral", "negative"
# Fuse sentiment with distress score
if sentiment == "negative" :
label = "HIGHLY_DISTRESSED" if distress >= 0.4 else "DISTRESSED"
return { "label" : label, "sentiment" : sentiment, "confidence" : ... }
See app/agents/emotion.py:119-251 Prompt GPT-4 to classify emotion given transcript + distress: async def _analyze_emotion_openai ( transcript : str , distress : float ):
response = await httpx.post(
"https://api.openai.com/v1/chat/completions" ,
json = {
"model" : "gpt-4o" ,
"temperature" : 0 ,
"response_format" : { "type" : "json_object" },
"messages" : [
{ "role" : "system" , "content" : "Classify emotion as JSON..." },
{ "role" : "user" , "content" : f "Transcript: { transcript } \n Distress: { distress } " }
]
}
)
data = json.loads(response[ "choices" ][ 0 ][ "message" ][ "content" ])
# → {"label": "HIGHLY_DISTRESSED", "sentiment": "negative", "intensity": 0.89}
See app/agents/emotion.py:254-331
Configure via .env:
EMOTION_PROVIDER = heuristic # or deepgram, openai
Service Classification (app/agents/service_classify.py)
Extract category (EMS/FIRE/POLICE/OTHER) and semantic tags:
def classify_service_and_tags ( transcript : str , distress : float ) -> dict :
text = transcript.lower()
# Medical keywords
if any (k in text for k in [ "chest pain" , "heart attack" , "stroke" , "seizure" ]):
category = "EMS"
tags = [ "CARDIAC_EVENT" ] if "heart" in text else [ "MEDICAL" ]
# Violence keywords
elif any (k in text for k in [ "shot" , "stabbed" , "assault" ]):
category = "EMS" # Medical response needed
tags = [ "GUNSHOT" ] if "shot" in text else [ "STABBING" ]
# Fire keywords
elif any (k in text for k in [ "fire" , "smoke" , "burning" ]):
category = "FIRE"
tags = [ "FIRE" , "SMOKE" ]
return { "category" : category, "tags" : tags, "confidence" : 0.85 }
Implementation varies; see app/agents/service_classify.py
4. Risk & Ranking (app/ranking/)
Responsibility: Score calls for priority queue.
Risk Level Computation (app/main.py:224-321)
Combines audio distress, emotion, and semantic tags :
def compute_risk_level ( distress_score : float , emotion : dict , tags : list ) -> dict :
base = float (distress_score or 0.0 )
# Critical tags override low distress (shock/dissociation)
critical_tags = {
"ACTIVE_SHOOTER" , "GUNSHOT" , "NOT_BREATHING" , "CARDIAC_ARREST" ,
"UNCONSCIOUS" , "SEVERE_BLEEDING" , "OVERDOSE" , "SUICIDE_ATTEMPT"
}
elevated_tags = {
"TRAUMA" , "VIOLENCE" , "FIRE" , "STROKE" , "SEIZURE" ,
"CHEST_PAIN" , "VEHICLE_ACCIDENT"
}
tags_upper = {t.upper() for t in tags}
if tags_upper & critical_tags:
score = max (score, 0.9 )
level = "CRITICAL"
elif tags_upper & elevated_tags:
score = max (score, 0.6 )
level = "ELEVATED" if score < 0.7 else "CRITICAL"
else :
# Bucketing by distress score
if score >= 0.7 :
level = "CRITICAL"
elif score >= 0.4 :
level = "ELEVATED"
elif score >= 0.15 :
level = "NORMAL"
else :
level = "LOW"
return { "level" : level, "score" : float ( round (score, 3 ))}
Critical Fix: Tags like GUNSHOT or NOT_BREATHING force CRITICAL risk even if the caller sounds calm (shock, numbness).
See app/main.py:224-321
Priority Weight Calculation (app/ranking/ranking.py:58-82)
Calls are sorted by a composite weight:
def build_ranking ( inputs : RankingInputs) -> dict :
base_risk = RISK_LEVEL_WEIGHT [inputs.risk_level] # 1-4
cat_weight = CATEGORY_PRIORITY [inputs.category] # 1-5 (EMS=5, OTHER=1)
tag_weight = sum ( TAG_WEIGHTS .get(t.upper(), 0 ) for t in inputs.tags)
# Example: ["GUNSHOT", "BLEEDING"] → 9 + 7 = 16
weight = (
base_risk * 20 # CRITICAL=80, LOW=20
+ cat_weight * 10 # EMS=50, OTHER=10
+ tag_weight * 1.5 # Tags stack up
+ inputs.risk_score * 10 # Fine-tune with 0-1 score
)
return { "weight" : float (weight), "score" : inputs.risk_score, ... }
Example:
Call A : CRITICAL risk (80) + EMS (50) + GUNSHOT tag (13.5) + 0.92 score (9.2) = 152.7
Call B : ELEVATED risk (60) + FIRE (40) + FIRE tag (12) + 0.65 score (6.5) = 118.5
Call A is ranked higher.
See app/ranking/ranking.py:58-82
5. Queue & Storage (app/main.py + app/ranking/service.py)
Current Implementation: In-memory storage via InMemoryCallStore
Production: Swap for PostgreSQL-backed CallStore implementation
class CallStore ( ABC ):
@abstractmethod
def save_call_record ( self , packet : dict ) -> None : ...
@abstractmethod
def upsert_queue_item ( self , item : dict ) -> None : ...
@abstractmethod
def list_queue_items ( self ) -> List[ dict ]: ...
See app/main.py:98-118 for interface definition
Queue Retrieval (app/main.py:732-775)
The /api/v1/queue endpoint returns ranked items:
@app.get ( "/api/v1/queue" )
def queue_summary ():
items = CALL_STORE .list_queue_items()
# Sort by weight (descending), then score, then created_at
def sort_key ( item : dict ):
lvl = item[ "risk_level" ] # CRITICAL, ELEVATED, ...
score = float (item[ "risk_score" ])
status = item[ "status" ] # OPEN, IN_PROGRESS, RESOLVED
combined_weight = RANK_WEIGHT [lvl] + STATUS_WEIGHT [status]
return ( - combined_weight, - score, item[ "created_at" ])
items_sorted = sorted (items, key = sort_key)
return items_sorted
See app/main.py:732-775
State Management
Shared State (app/core/state.py)
# Map call_control_id → call_id for WebSocket routing
CALL_CONTROL_TO_CALL_ID : Dict[ str , str ] = {}
# Per-call real-time signals (distress, transcript, etc.)
LIVE_SIGNALS : Dict[ str , dict ] = {}
# Calls currently streaming (shown in /debug/live_calls/)
LIVE_QUEUE : Dict[ str , dict ] = {}
# All active calls (RINGING, ACTIVE, ENDED)
LIVE_CALLS : Dict[ str , dict ] = {}
In-memory state is lost on restart. Production should use Redis or PostgreSQL for persistence.
API Endpoints
REST API
Method Endpoint Description Implementation GET/healthHealth check app/main.py:703POST/api/v1/call/incomingTelnyx webhook (call events) app/main.py:353GET/api/v1/queueGet ranked dispatch queue app/main.py:732GET/api/v1/live_queueGet calls currently streaming app/main.py:777GET/api/v1/callsList recent calls app/main.py:707GET/api/v1/calls/{call_id}Get call detail app/main.py:814PATCH/api/v1/queue/{call_id}/statusUpdate call status app/main.py:847
WebSocket API
Endpoint Protocol Description Implementation /wsWebSocket Audio frame streaming from Telnyx app/api/ws/handler.py:344
Scaling Considerations
Current Limits (Development)
Concurrent Calls : ~10-20 (single process, in-memory)
Storage : In-memory (lost on restart)
STT Sessions : Limited by Deepgram concurrency
Production Scaling
Horizontal Scaling
Deploy multiple FastAPI instances behind a load balancer:
HTTP endpoints : Round-robin load balancing
WebSocket /ws : Sticky sessions via call_control_id
Persistent Storage
Replace InMemoryCallStore with PostgreSQL: class PostgresCallStore ( CallStore ):
def upsert_queue_item ( self , item : dict ):
await db.execute(
"INSERT INTO queue_items (...) VALUES (...) ON CONFLICT ..." ,
item
)
Async Task Queue
Offload heavy processing (emotion, summarization) to Celery/RQ: @app.post ( "/api/v1/call/incoming" )
async def incoming_call (...):
if event == "call.hangup" :
# Queue background task
task_queue.enqueue(process_call_packet, call_id = call_id)
WebSocket State
Use Redis for shared state across instances: LIVE_SIGNALS = RedisDict(redis_client, prefix = "live_signals:" )
Configuration
Environment Variables
All configuration is loaded from .env via Pydantic:
class Settings ( BaseSettings ):
telnyx_api_key: str | None = None
deepgram_api_key: str | None = None
openai_api_key: str | None = None
emotion_provider: str = "heuristic" # heuristic | deepgram | openai
port: int = 8000
class Config :
env_prefix = ""
case_sensitive = False
settings = Settings()
See app/core/config.py
Feature Flags
APP_ENV = os.getenv( "APP_ENV" , "dev" ) # dev | prod
if APP_ENV == "dev" :
# Enable debug endpoints
app.include_router(debug_routes)
Testing
The project includes unit, integration, and end-to-end tests:
pytest tests/ # Run all tests
pytest tests/unit/ # Unit tests only
pytest tests/integration/ # Integration tests (require API keys)
pytest tests/e2e/ # E2E tests (require Telnyx number)
Test Structure
tests/
├── unit/
│ ├── test_emotion.py # Emotion classification
│ ├── test_ranking.py # Weight calculation
│ └── test_vad.py # VAD algorithm
├── integration/
│ ├── test_deepgram.py # STT provider
│ └── test_telnyx.py # Webhook handling
└── e2e/
└── test_full_flow.py # End-to-end call simulation
Next Steps
API Reference Full REST and WebSocket API documentation
Production Deployment Deploy to AWS/GCP with PostgreSQL and Redis
Custom Agents Build your own audio or NLP analysis agents
Webhook Security Verify Telnyx webhook signatures