Overview
DispatchAI processes emergency calls through a dual-track streaming pipeline that analyzes audio and natural language processing (NLP) in parallel. This architecture enables real-time triage while the call is still in progress.
Architecture
The pipeline consists of two independent, concurrent tracks that process the live audio stream:
Audio Track Processes raw audio to detect distress signals, emotion, and acoustic hazards
NLP Track Converts speech to text and extracts intent, entities, and emergency classification
Audio Track Pipeline
The audio track analyzes the acoustic properties of the caller’s voice in real-time:
# Source: app/api/ws/handler.py:309-326
def rms_norm_pcm16le ( pcm : bytes ) -> float :
"""RMS normalized to [0,1] for int16"""
if not pcm:
return 0.0
total = 0
count = 0
for i in range ( 0 , len (pcm), 2 ):
s = int .from_bytes(pcm[i : i + 2 ], "little" , signed = True )
total += s * s
count += 1
if count == 0 :
return 0.0
import math
rms = math.sqrt(total / count)
return min ( 1.0 , rms / 32768.0 )
Distress Detection
The system uses an Exponential Moving Average (EMA) to smooth audio loudness and detect distress:
# Source: app/api/ws/handler.py:486-492
# Distress proxy: EMA of loudness, map difference to [0..1]
ema = alpha * rms + ( 1 - alpha) * ema
diff = max ( 0.0 , rms - ema)
score = max (signals[ "distress" ] * 0.9 , min ( 1.0 , diff * 8.0 ))
signals[ "ema" ] = ema
signals[ "distress" ] = score
signals[ "max_distress" ] = max (signals[ "max_distress" ], score)
EMA Alpha Value : The system uses α = 0.15 for smoothing. This balances responsiveness to sudden changes (screaming, panic) against noise stability.
Emotion Classification
The emotion analyzer fuses acoustic distress scores with transcript content to classify emotional state:
# Source: app/agents/emotion.py:14-20
EmotionLabel = Literal[
"CALM" ,
"RELIEVED" ,
"TENSE" ,
"DISTRESSED" ,
"HIGHLY_DISTRESSED" ,
]
The heuristic emotion classifier uses a multi-signal approach:
# Source: app/agents/emotion.py:40-48
# Base label from distress
if distress >= 0.7 :
label = "HIGHLY_DISTRESSED"
elif distress >= 0.3 :
label = "DISTRESSED"
elif distress >= 0.15 :
label = "TENSE"
else :
label = "CALM"
Life-Threatening Override : The system detects life-threatening keywords (“shot”, “not breathing”, “overdose”) and overrides low distress scores. This handles cases where callers sound calm due to shock or dissociation.
# Source: app/agents/emotion.py:54-77
life_threatening = [
"shot" , "shooting" , "stabbed" , "stabbing" ,
"can't breathe" , "not breathing" , "overdose" ,
"heart attack" , "unconscious" , "bleeding out" ,
"heavy bleeding" , "suicide" , "kill myself" ,
]
if any (k in txt for k in life_threatening):
sentiment = "negative"
label = "HIGHLY_DISTRESSED"
intensity = max (intensity, 0.8 )
NLP Track Pipeline
The NLP track converts speech to text and performs semantic analysis:
Speech-to-Text (STT)
The system uses Deepgram’s streaming STT API via WebSocket:
# Source: app/agents/stt_client.py:30-49
async def start ( self , sample_rate : int = 8000 ):
if not DEEPGRAM_API_KEY :
print ( "[stt] Deepgram key missing" )
return
self .session = aiohttp.ClientSession()
url = (
"wss://api.deepgram.com/v1/listen?"
f "model=nova-2&language=en-US&encoding=linear16&sample_rate= { sample_rate } "
"&punctuate=true&smart_format=true&numerals=true"
)
headers = { "Authorization" : f "Token { DEEPGRAM_API_KEY } " }
ssl_ctx = ssl.create_default_context( cafile = certifi.where())
self .ws = await self .session.ws_connect(
url, headers = headers, ssl = ssl_ctx, heartbeat = 30
)
print ( "[stt] connected to Deepgram streaming" )
self ._recv_task = asyncio.create_task( self ._recv_loop())
Deepgram’s latest streaming model optimized for real-time transcription
Audio sample rate in Hz. Phone calls typically use 8kHz.
Service Classification
The NLP track classifies calls into emergency service categories:
# Source: app/agents/service_classify.py:5
ServiceCategory = Literal[ "EMS" , "FIRE" , "POLICE" , "OTHER" ]
The classifier uses a sophisticated keyword matching system with:
Negation detection (“not bleeding” vs “bleeding”)
Phonetic variations (“cant breathe” → “can’t breathe”)
ASR error tolerance (“am balance” → “ambulance”)
Context awareness (“shooting a basketball” vs “active shooter”)
# Source: app/agents/service_classify.py:45-50
def is_negated ( phrase : str ) -> bool :
"""Check if a phrase is negated (e.g., 'not bleeding', 'no gun')"""
match = re.search(re.escape(phrase), text, re. IGNORECASE )
if not match:
return False
# Check 20 chars before for negation words
start = max ( 0 , match.start() - 20 )
context = text[start : match.start()]
Live Signal Tracking
Each call maintains a live state object that’s updated in real-time:
# Source: app/api/ws/handler.py:412-423
LIVE_SIGNALS [call_id] = {
"chunks" : 0 ,
"voiced_chunks" : 0 ,
"voiced_seconds" : 0.0 ,
"ema" : 0.0 ,
"distress" : 0.0 ,
"max_distress" : 0.0 ,
"transcript" : "" ,
"transcript_live" : "" ,
"wav_path" : None ,
"emotion" : None ,
}
Current distress score (0.0 - 1.0) based on acoustic analysis
Peak distress observed during the call
Real-time partial transcript as the caller speaks
Finalized, complete transcript when segments are confirmed
Audio Processing
µ-law Decoding
Phone systems use µ-law encoding (8-bit compressed audio). The pipeline decodes this to PCM16:
# Source: app/api/ws/handler.py:443-448
if len (raw) in ( 80 , 160 ):
le16 = mulaw_to_pcm16le(raw)
else :
le16 = swap_endian_16(raw)
Voice Activity Detection (VAD)
# Source: app/api/ws/handler.py:469-478
rms = rms_norm_pcm16le(chunk)
voiced = rms >= vad_threshold # threshold = 0.02
if voiced:
signals[ "voiced_chunks" ] += 1
signals[ "chunks" ] += 1
signals[ "voiced_seconds" ] = signals[ "voiced_chunks" ] * 0.16
Each audio chunk is 160ms (2560 bytes at 8kHz). The VAD threshold of 0.02 (2% of full scale) filters out background noise.
Summary Generation
Once both tracks complete, the system generates a dispatcher-friendly summary:
# Source: app/agents/summary.py:41-62
async def generate_summary ( transcript : str , category : str , tags : list[ str ]) -> str :
"""Generate a concise dispatcher-friendly summary."""
if not transcript:
return "No transcript available."
if not _client:
return heuristic_summary(transcript)
prompt = (
"You are an emergency dispatcher assistant. "
"Summarize the caller's situation in 1–2 clear, factual sentences. "
"Avoid speculation. Include critical details. "
f "Category: { category } . Tags: { ', ' .join(tags) } . \n\n "
f "Transcript: \n { transcript } "
)
resp = await _client.chat.completions.create(
model = "gpt-4o-mini" ,
messages = [{ "role" : "user" , "content" : prompt}],
max_tokens = 80 ,
temperature = 0 ,
)
Latency
STT partial : 100-300ms from speech
Distress score : Updated every 160ms chunk
Full analysis : < 2 seconds after call ends
CallPacket Structure Learn about the data structure that merges both tracks
Priority Ranking See how pipeline outputs feed into the ranking system