Skip to main content
DispatchAI uses a pipeline of specialized agents to analyze emergency calls. This guide explains how each agent works and how to customize them.

Architecture

The agent pipeline runs in parallel tracks:
┌─────────────────────────────────────────┐
│            Incoming Call                │
└──────────────┬──────────────────────────┘

      ┌────────┴────────┐
      │                 │
┌─────▼────┐      ┌────▼─────┐
│  Audio   │      │   NLP    │
│  Track   │      │  Track   │
└─────┬────┘      └────┬─────┘
      │                │
      │  ┌─────────┐   │
      ├─►│ STT     │◄──┤
      │  └─────────┘   │
      │                │
      │  ┌─────────┐   │
      ├─►│ Distress│   │
      │  └─────────┘   │
      │                │
      │  ┌─────────┐   │
      └─►│ Emotion │◄──┤
         └────┬────┘   │
              │        │
         ┌────▼────────▼────┐
         │   Service        │
         │   Classify       │
         └────┬─────────────┘

         ┌────▼────┐
         │ Summary │
         └────┬────┘

         ┌────▼────┐
         │ Ranking │
         └─────────┘

Speech-to-Text (STT)

Implementation: app/agents/stt_client.py

Deepgram Integration

DispatchAI uses Deepgram Nova-2 for real-time transcription:
class STTClient:
    def __init__(self, on_partial=None, on_final=None):
        self.on_partial = on_partial  # Live transcript updates
        self.on_final = on_final      # Final sentence completions
        self.call_id = None
    
    async def start(self, sample_rate: int = 8000):
        # Connect to Deepgram WebSocket
        await self.deepgram_ws.connect(
            model="nova-2",
            language="en-US",
            sample_rate=sample_rate,
            encoding="linear16"
        )
    
    async def feed(self, pcm_data: bytes):
        # Stream audio chunks
        await self.deepgram_ws.send(pcm_data)
    
    async def finalize(self) -> dict:
        # Close connection, return final transcript
        await self.deepgram_ws.close()
        return {"transcript": self.final_text}

Callback System

Transcripts update in real-time via callbacks: Partial transcripts (interim results):
def _on_partial(text: str, call_id: str):
    LIVE_SIGNALS[call_id]["transcript_live"] = text
    print(f"[live] {text}")
Final transcripts (sentence completions):
def _on_final(text: str, call_id: str):
    LIVE_SIGNALS[call_id]["transcript"] = text
Referenced in: app/api/ws/handler.py:328-341, app/main.py:467

Emotion Detection

Implementation: app/agents/emotion.py Emotion analysis combines acoustic distress with text sentiment.

Provider Options

Choose via EMOTION_PROVIDER environment variable:

1. Heuristic (Default)

When to use: No API key available or need instant results Method: Rule-based keyword matching + distress score
async def _analyze_emotion_heuristic(transcript: str, distress: float) -> dict:
    txt = transcript.lower()
    
    # Base label from acoustic distress
    if distress >= 0.7:
        label = "HIGHLY_DISTRESSED"
    elif distress >= 0.3:
        label = "DISTRESSED"
    elif distress >= 0.15:
        label = "TENSE"
    else:
        label = "CALM"
    
    # Override with life-threatening keywords
    life_threatening = [
        "shot", "stabbed", "can't breathe", "not breathing",
        "overdose", "heart attack", "unconscious"
    ]
    if any(k in txt for k in life_threatening):
        label = "HIGHLY_DISTRESSED"
        intensity = max(distress, 0.8)
    
    # Negative keywords increase intensity
    neg_keywords = ["angry", "scared", "emergency", "hurt", "panic"]
    if any(k in txt for k in neg_keywords):
        sentiment = "negative"
        intensity = max(intensity, 0.5)
    
    return {
        "label": label,
        "intensity": intensity,
        "sentiment": sentiment,
        "source": "heuristic"
    }
Customization:
  • Add keywords to life_threatening, neg_keywords, pos_keywords
  • Adjust distress thresholds (lines 42-48)
  • Modify intensity boost factors (lines 76-77, 98)
File: app/agents/emotion.py:27-116

2. Deepgram Sentiment

When to use: Already using Deepgram for STT, want accurate sentiment Method: Deepgram Text Intelligence API
async def _analyze_emotion_deepgram(transcript: str, distress: float) -> dict:
    resp = await httpx.post(
        "https://api.deepgram.com/v1/read?sentiment=true&language=en",
        headers={"Authorization": f"Token {DEEPGRAM_API_KEY}"},
        json={"text": transcript}
    )
    
    sentiment = resp.json()["results"]["sentiments"]["average"]["sentiment"]
    confidence = resp.json()["results"]["sentiments"]["average"]["sentiment_score"]
    
    # Fusion logic: combine sentiment + distress + crisis keywords
    if sentiment == "negative" and has_crisis_keywords(transcript):
        label = "HIGHLY_DISTRESSED"
        intensity = max(distress, 0.7)
    elif sentiment == "positive":
        label = "RELIEVED" if distress < 0.4 else "CALM"
    else:
        label = label_from_distress(distress)
    
    return {"label": label, "sentiment": sentiment, "intensity": intensity}
Customization:
  • Adjust crisis keyword list (lines 172-185)
  • Modify fusion thresholds (lines 204-238)
File: app/agents/emotion.py:119-251

3. OpenAI LLM

When to use: Highest accuracy, don’t mind API cost Method: GPT-4o-mini structured classification
async def _analyze_emotion_openai(transcript: str, distress: float) -> dict:
    resp = await openai_client.chat.completions.create(
        model="gpt-4o-mini",
        response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": "Classify emotion as JSON..."},
            {"role": "user", "content": f"Transcript: {transcript}\nDistress: {distress}"}
        ]
    )
    
    result = json.loads(resp.choices[0].message.content)
    return {
        "label": result["label"],
        "sentiment": result["sentiment"],
        "intensity": result["intensity"]
    }
Customization:
  • Edit system prompt (lines 264-272)
  • Change model via OPENAI_MODEL_EMOTION env var
  • Adjust temperature (line 291)
File: app/agents/emotion.py:254-331

Emotion Labels

All providers output one of these labels:
LabelIntensity RangeDescription
CALM0.0 - 0.15Normal conversation, no stress
RELIEVED0.15 - 0.3Positive emotions, gratitude
TENSE0.15 - 0.3Mild stress, concern
DISTRESSED0.3 - 0.7Elevated stress, fear
HIGHLY_DISTRESSED0.7 - 1.0Extreme stress, panic
Output format:
{
  "label": "HIGHLY_DISTRESSED",
  "intensity": 0.87,
  "sentiment": "negative",
  "distress_input": 0.82,
  "has_transcript": true
}

Service Classification

Implementation: app/agents/service_classify.py Classifies calls into emergency service categories with semantic tags.

Categories

ServiceCategory = Literal["EMS", "FIRE", "POLICE", "OTHER"]
Examples:
  • EMS: Heart attack, overdose, trauma, breathing problems
  • FIRE: Structure fire, gas leak, smoke, explosion
  • POLICE: Active shooter, assault, burglary, domestic violence
  • OTHER: Noise complaint, lost pet, non-emergency

Classification Logic

Uses keyword detection with contextual rules:
def classify_service_and_tags(transcript: str, distress: float) -> dict:
    text = normalize_transcript(transcript)  # Fix ASR errors
    tags = set()
    cat_scores = {"EMS": 0.0, "FIRE": 0.0, "POLICE": 0.0, "OTHER": 0.0}
    
    # Gunshot detection
    if has_any(["shot", "shooting", "gunshot", "active shooter"]):
        if not has_any(["basketball", "hoops"]):  # Avoid false positives
            tags.add("ACTIVE_SHOOTER")
            tags.add("TRAUMA")
            cat_scores["EMS"] += 0.9
            cat_scores["POLICE"] += 0.9
    
    # Breathing emergency
    if has_any(["can't breathe", "not breathing", "turning blue"]):
        if not is_negated("breathe"):
            tags.add("NOT_BREATHING")
            cat_scores["EMS"] += 1.0
    
    # Fire detection
    if has_word("fire") and not has_any(["fireplace", "campfire"]):
        tags.add("FIRE")
        cat_scores["FIRE"] += 1.0
    
    # Select category with highest score
    category = max(cat_scores, key=cat_scores.get)
    confidence = calculate_confidence(cat_scores, tags, distress)
    
    return {"category": category, "confidence": confidence, "tags": sorted(tags)}
File: app/agents/service_classify.py:8-1192

Semantic Tags

Over 60 tags across four domains: Medical (EMS):
  • ACTIVE_SHOOTER - Gunshot wounds
  • STABBING - Knife/blade injuries
  • MAJOR_BLEEDING - Severe hemorrhage
  • NOT_BREATHING - Respiratory arrest
  • CARDIAC_ARREST - No pulse/heartbeat
  • OVERDOSE - Drug/poison ingestion
  • UNCONSCIOUS - Unresponsive patient
  • STROKE - Neurological emergency
  • SEIZURE - Convulsions
Fire/Hazmat:
  • FIRE - Structure fire
  • SMOKE - Smoke detected
  • GAS_LEAK - Natural gas/propane
  • EXPLOSION - Blast/detonation
  • HAZMAT - Chemical spill
Law Enforcement:
  • VIOLENCE - Physical altercation
  • ASSAULT - Attacking someone
  • DOMESTIC_VIOLENCE - Partner abuse
  • WEAPON_INVOLVED - Gun/knife present
  • ROBBERY - Armed robbery
  • BURGLARY - Breaking and entering
  • SUICIDAL - Suicide threat
Other:
  • VEHICLE_ACCIDENT - Car crash
  • TRAPPED - Stuck in elevator/building
  • EMOTIONAL_DISTRESS - Mental health
  • NOISE_COMPLAINT - Non-emergency

Customizing Keywords

Add your own detection rules:
# Example: Add opioid-specific detection
opioid_keywords = [
    "fentanyl", "heroin", "oxycodone", "narcan"
]

if has_word_any(opioid_keywords):
    add_tag("OPIOID_OVERDOSE")
    add_tag("OVERDOSE")  # Parent tag
    bump("EMS", 0.95, urgent=True)
Location to add: After line 483 in app/agents/service_classify.py

Negation Detection

Avoids false positives from negative statements:
def is_negated(phrase: str) -> bool:
    # "not bleeding" → True
    # "bleeding" → False
    match = re.search(re.escape(phrase), text)
    if not match:
        return False
    
    context = text[max(0, match.start() - 20):match.start()]
    negations = [r"\bno\b", r"\bnot\b", r"\bwithout\b", r"\bcan't\b"]
    return any(re.search(neg, context) for neg in negations)
Example:
  • ✅ “He’s bleeding” → BLEEDING tag
  • ❌ “He’s not bleeding” → No tag
File: app/agents/service_classify.py:45-71

ASR Error Normalization

Common speech-to-text mistakes are fixed:
def _normalize_transcript(text: str) -> str:
    text = re.sub(r"\bam balance\b", "ambulance", text)
    text = re.sub(r"\bkeep myself\b", "kill myself", text)  # ASR error
    text = re.sub(r"\band my life\b", "end my life", text)
    return text
File: app/agents/service_classify.py:1195-1235

Summary Generation

Implementation: app/agents/summary.py Creates dispatcher-friendly 1-2 sentence summaries.

OpenAI Mode

async def generate_summary(transcript: str, category: str, tags: list) -> str:
    prompt = (
        "You are an emergency dispatcher assistant. "
        "Summarize the caller's situation in 1–2 clear, factual sentences. "
        "Avoid speculation. Include critical details. "
        f"Category: {category}. Tags: {', '.join(tags)}.\n\n"
        f"Transcript:\n{transcript}"
    )
    
    resp = await openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        max_tokens=80,
        temperature=0
    )
    
    return resp.choices[0].message.content.strip()
Example output:
“Caller reports an active shooter at a shopping mall. Multiple victims with gunshot wounds. Police and EMS needed urgently.”
File: app/agents/summary.py:41-83

Heuristic Fallback

When no API key is available:
def heuristic_summary(transcript: str) -> str:
    # Extract first sentence
    text = re.sub(r"\s+", " ", transcript.strip())
    sentences = re.split(r"[.?!]", text)
    first = sentences[0].strip()
    
    # Cap at 200 chars
    return first[:200] + "..." if len(first) > 200 else first
Example output:
“There’s someone in my house with a gun and I’m hiding in the closet…”
File: app/agents/summary.py:14-38

Risk Scoring

Implementation: app/main.py:224-321 Combines multiple signals into a unified risk assessment.

Scoring Algorithm

def compute_risk_level(distress_score: float, emotion: dict, tags: list) -> dict:
    # Base score from audio distress
    score = max(0.0, min(1.0, distress_score))
    
    # Amplify with emotion intensity
    emo_intensity = emotion.get("intensity", 0.0)
    score = max(score, emo_intensity)
    
    # Boost for distress-related emotions
    if emotion.get("label") in ("DISTRESSED", "TENSE"):
        score = max(score, 0.5)
    
    # CRITICAL OVERRIDE: Life-threatening tags
    critical_tags = {
        "ACTIVE_SHOOTER", "CARDIAC_ARREST", "NOT_BREATHING",
        "UNCONSCIOUS", "MAJOR_BLEEDING", "OVERDOSE"
    }
    if set(tags) & critical_tags:
        score = max(score, 0.9)  # Force critical
        level = "CRITICAL"
    
    # ELEVATED OVERRIDE: Serious emergencies
    elevated_tags = {
        "ASSAULT", "FIRE", "STROKE", "SEIZURE",
        "BREATHING_DIFFICULTY", "VEHICLE_ACCIDENT"
    }
    elif set(tags) & elevated_tags:
        score = max(score, 0.6)
        level = "ELEVATED" if score < 0.7 else "CRITICAL"
    
    # Standard bucketing
    else:
        if score >= 0.7:
            level = "CRITICAL"
        elif score >= 0.4:
            level = "ELEVATED"
        elif score >= 0.15:
            level = "NORMAL"
        else:
            level = "LOW"
    
    return {"level": level, "score": round(score, 3)}

Risk Levels

LevelScore RangeDescriptionExample
CRITICAL0.7 - 1.0Life-threatening, immediate responseActive shooter, cardiac arrest
ELEVATED0.4 - 0.7Serious emergency, urgent responseAssault, breathing difficulty
NORMAL0.15 - 0.4Standard emergency, timely responseMinor injury, property crime
LOW0.0 - 0.15Non-emergency, routine responseNoise complaint, lost item

Customizing Risk Thresholds

Adjust score thresholds in compute_risk_level():
# Make system more sensitive (lower thresholds)
if score >= 0.5:  # Was 0.7
    level = "CRITICAL"
elif score >= 0.3:  # Was 0.4
    level = "ELEVATED"

# Add custom overrides
if "DOMESTIC_VIOLENCE" in tags and distress_score >= 0.5:
    score = max(score, 0.75)  # Boost domestic violence priority
File: app/main.py:224-321

Priority Ranking

Implementation: app/ranking/ranking.py Converts risk levels into queue priorities.
def build_ranking(inputs: RankingInputs) -> dict:
    # Weight by risk level
    weights = {
        "CRITICAL": 1000,
        "ELEVATED": 500,
        "NORMAL": 100,
        "LOW": 10
    }
    
    base_weight = weights[inputs.risk_level]
    
    # Amplify by risk score
    score_mult = inputs.risk_score  # 0-1
    final_weight = int(base_weight * (1 + score_mult))
    
    return {
        "priority": get_priority_number(inputs.risk_level),
        "weight": final_weight,
        "score": inputs.risk_score,
        "created_at": inputs.created_at or now_iso()
    }
Queue sorting:
def sort_key(item):
    return (
        -item["ranking"]["weight"],  # Highest weight first
        -item["risk"]["score"],      # Then highest score
        item["ranking"]["created_at"] # Then oldest call
    )
File: app/ranking/ranking.py, app/main.py:741-750

Testing Agents

Unit Tests

# tests/test_emotion.py
import pytest
from app.agents.emotion import analyze_emotion

@pytest.mark.asyncio
async def test_life_threatening_override():
    result = await analyze_emotion(
        transcript="Someone's been shot",
        distress=0.2  # Low acoustic distress
    )
    assert result["label"] == "HIGHLY_DISTRESSED"
    assert result["intensity"] >= 0.8

# tests/test_service_classify.py
from app.agents.service_classify import classify_service_and_tags

def test_active_shooter_detection():
    result = classify_service_and_tags(
        transcript="There's an active shooter at the mall",
        distress=0.9
    )
    assert result["category"] == "EMS"
    assert "ACTIVE_SHOOTER" in result["tags"]
    assert "POLICE" in result["tags"]
Run tests:
pytest tests/

Manual Testing

Use the call replay script:
# Replay a recorded call through the full pipeline
python scripts/replay_call.py data/calls/1234567890.wav

Live Testing

Make a test call and check real-time processing:
  1. Start dev server: ./scripts/dev_start.sh
  2. Call your Telnyx number
  3. Speak test scenario: “Help, there’s a fire in my kitchen!”
  4. Monitor logs:
[live-partial] call=428c31b6... "Help there's"
[live-partial] call=428c31b6... "Help there's a fire"
[live-final] call=428c31b6... "Help there's a fire in my kitchen"
[emotion] label=DISTRESSED, sentiment=negative, intensity=0.72
[service] category=FIRE, tags=['FIRE'], confidence=0.88
[risk] level=CRITICAL, score=0.86

Performance Tuning

Latency Optimization

Use faster STT:
# Switch to Deepgram Base (faster but less accurate)
model="base"
Disable OpenAI:
.env
EMOTION_PROVIDER=heuristic  # No API calls
OPENAI_API_KEY=  # Unset
Parallel execution:
# Run emotion + service classification concurrently
emotion_task = asyncio.create_task(analyze_emotion(transcript, distress))
service_task = asyncio.create_task(classify_service_and_tags(transcript, distress))

emotion, service = await asyncio.gather(emotion_task, service_task)

Accuracy Optimization

Use best providers:
.env
EMOTION_PROVIDER=openai
OPENAI_MODEL_EMOTION=gpt-4o
Add more keywords: Expand detection rules in service_classify.py with domain-specific terms. Tune thresholds: Adjust risk score thresholds based on historical data.

Next Steps

API Reference

Explore REST endpoints and data schemas

Quickstart

Build your first integration

Build docs developers (and LLMs) love