Skip to main content

Overview

DispatchAI processes emergency calls through a dual-track streaming pipeline that analyzes audio and natural language processing (NLP) in parallel. This architecture enables real-time triage while the call is still in progress.

Architecture

The pipeline consists of two independent, concurrent tracks that process the live audio stream:

Audio Track

Processes raw audio to detect distress signals, emotion, and acoustic hazards

NLP Track

Converts speech to text and extracts intent, entities, and emergency classification

Audio Track Pipeline

The audio track analyzes the acoustic properties of the caller’s voice in real-time:
# Source: app/api/ws/handler.py:309-326
def rms_norm_pcm16le(pcm: bytes) -> float:
    """RMS normalized to [0,1] for int16"""
    if not pcm:
        return 0.0
    total = 0
    count = 0
    for i in range(0, len(pcm), 2):
        s = int.from_bytes(pcm[i : i + 2], "little", signed=True)
        total += s * s
        count += 1
    if count == 0:
        return 0.0
    import math
    rms = math.sqrt(total / count)
    return min(1.0, rms / 32768.0)

Distress Detection

The system uses an Exponential Moving Average (EMA) to smooth audio loudness and detect distress:
# Source: app/api/ws/handler.py:486-492
# Distress proxy: EMA of loudness, map difference to [0..1]
ema = alpha * rms + (1 - alpha) * ema
diff = max(0.0, rms - ema)
score = max(signals["distress"] * 0.9, min(1.0, diff * 8.0))
signals["ema"] = ema
signals["distress"] = score
signals["max_distress"] = max(signals["max_distress"], score)
EMA Alpha Value: The system uses α = 0.15 for smoothing. This balances responsiveness to sudden changes (screaming, panic) against noise stability.

Emotion Classification

The emotion analyzer fuses acoustic distress scores with transcript content to classify emotional state:
# Source: app/agents/emotion.py:14-20
EmotionLabel = Literal[
    "CALM",
    "RELIEVED",
    "TENSE",
    "DISTRESSED",
    "HIGHLY_DISTRESSED",
]
The heuristic emotion classifier uses a multi-signal approach:
# Source: app/agents/emotion.py:40-48
# Base label from distress
if distress >= 0.7:
    label = "HIGHLY_DISTRESSED"
elif distress >= 0.3:
    label = "DISTRESSED"
elif distress >= 0.15:
    label = "TENSE"
else:
    label = "CALM"
Life-Threatening Override: The system detects life-threatening keywords (“shot”, “not breathing”, “overdose”) and overrides low distress scores. This handles cases where callers sound calm due to shock or dissociation.
# Source: app/agents/emotion.py:54-77
life_threatening = [
    "shot", "shooting", "stabbed", "stabbing",
    "can't breathe", "not breathing", "overdose",
    "heart attack", "unconscious", "bleeding out",
    "heavy bleeding", "suicide", "kill myself",
]

if any(k in txt for k in life_threatening):
    sentiment = "negative"
    label = "HIGHLY_DISTRESSED"
    intensity = max(intensity, 0.8)

NLP Track Pipeline

The NLP track converts speech to text and performs semantic analysis:

Speech-to-Text (STT)

The system uses Deepgram’s streaming STT API via WebSocket:
# Source: app/agents/stt_client.py:30-49
async def start(self, sample_rate: int = 8000):
    if not DEEPGRAM_API_KEY:
        print("[stt] Deepgram key missing")
        return
    self.session = aiohttp.ClientSession()
    url = (
        "wss://api.deepgram.com/v1/listen?"
        f"model=nova-2&language=en-US&encoding=linear16&sample_rate={sample_rate}"
        "&punctuate=true&smart_format=true&numerals=true"
    )
    headers = {"Authorization": f"Token {DEEPGRAM_API_KEY}"}
    ssl_ctx = ssl.create_default_context(cafile=certifi.where())
    
    self.ws = await self.session.ws_connect(
        url, headers=headers, ssl=ssl_ctx, heartbeat=30
    )
    print("[stt] connected to Deepgram streaming")
    self._recv_task = asyncio.create_task(self._recv_loop())
model
string
default:"nova-2"
Deepgram’s latest streaming model optimized for real-time transcription
sample_rate
integer
default:"8000"
Audio sample rate in Hz. Phone calls typically use 8kHz.

Service Classification

The NLP track classifies calls into emergency service categories:
# Source: app/agents/service_classify.py:5
ServiceCategory = Literal["EMS", "FIRE", "POLICE", "OTHER"]
The classifier uses a sophisticated keyword matching system with:
  • Negation detection (“not bleeding” vs “bleeding”)
  • Phonetic variations (“cant breathe” → “can’t breathe”)
  • ASR error tolerance (“am balance” → “ambulance”)
  • Context awareness (“shooting a basketball” vs “active shooter”)
# Source: app/agents/service_classify.py:45-50
def is_negated(phrase: str) -> bool:
    """Check if a phrase is negated (e.g., 'not bleeding', 'no gun')"""
    match = re.search(re.escape(phrase), text, re.IGNORECASE)
    if not match:
        return False
    # Check 20 chars before for negation words
    start = max(0, match.start() - 20)
    context = text[start : match.start()]

Live Signal Tracking

Each call maintains a live state object that’s updated in real-time:
# Source: app/api/ws/handler.py:412-423
LIVE_SIGNALS[call_id] = {
    "chunks": 0,
    "voiced_chunks": 0,
    "voiced_seconds": 0.0,
    "ema": 0.0,
    "distress": 0.0,
    "max_distress": 0.0,
    "transcript": "",
    "transcript_live": "",
    "wav_path": None,
    "emotion": None,
}
distress
float
Current distress score (0.0 - 1.0) based on acoustic analysis
max_distress
float
Peak distress observed during the call
transcript_live
string
Real-time partial transcript as the caller speaks
transcript
string
Finalized, complete transcript when segments are confirmed

Audio Processing

µ-law Decoding

Phone systems use µ-law encoding (8-bit compressed audio). The pipeline decodes this to PCM16:
# Source: app/api/ws/handler.py:443-448
if len(raw) in (80, 160):
    le16 = mulaw_to_pcm16le(raw)
else:
    le16 = swap_endian_16(raw)

Voice Activity Detection (VAD)

# Source: app/api/ws/handler.py:469-478
rms = rms_norm_pcm16le(chunk)
voiced = rms >= vad_threshold  # threshold = 0.02
if voiced:
    signals["voiced_chunks"] += 1
signals["chunks"] += 1
signals["voiced_seconds"] = signals["voiced_chunks"] * 0.16
Each audio chunk is 160ms (2560 bytes at 8kHz). The VAD threshold of 0.02 (2% of full scale) filters out background noise.

Summary Generation

Once both tracks complete, the system generates a dispatcher-friendly summary:
# Source: app/agents/summary.py:41-62
async def generate_summary(transcript: str, category: str, tags: list[str]) -> str:
    """Generate a concise dispatcher-friendly summary."""
    if not transcript:
        return "No transcript available."
    
    if not _client:
        return heuristic_summary(transcript)
    
    prompt = (
        "You are an emergency dispatcher assistant. "
        "Summarize the caller's situation in 1–2 clear, factual sentences. "
        "Avoid speculation. Include critical details. "
        f"Category: {category}. Tags: {', '.join(tags)}.\n\n"
        f"Transcript:\n{transcript}"
    )
    
    resp = await _client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=80,
        temperature=0,
    )

Performance Characteristics

Latency

  • STT partial: 100-300ms from speech
  • Distress score: Updated every 160ms chunk
  • Full analysis: < 2 seconds after call ends

CallPacket Structure

Learn about the data structure that merges both tracks

Priority Ranking

See how pipeline outputs feed into the ranking system

Build docs developers (and LLMs) love