Skip to main content

Overview

The NLP Track processes speech-to-text transcripts through multiple language agents to extract semantic meaning, classify emergencies, and generate summaries.

Source Code

Location: app/agents/nlp_track.py
def process_text(text: str) -> dict:
    # stub for STT -> LLM -> nlp results
    return {"transcript": text, "intent": None, "summary": None}
Current implementation is a stub. Production version orchestrates multiple NLP agents in sequence.

NLP Pipeline

The full NLP track processes text through multiple stages:
Transcript → Service Classification → Emotion Analysis → Summary Generation

Pipeline Implementation

import asyncio
from app.agents.emotion import analyze_emotion
from app.agents.service_classify import classify_service_and_tags
from app.agents.summary import generate_summary

async def process_text(text: str, distress: float = 0.5) -> dict:
    """
    Run transcript through NLP agent pipeline.
    
    Args:
        text: Transcribed speech from STT
        distress: Audio distress score (0.0-1.0)
        
    Returns:
        Complete NLP analysis with service, emotion, tags, and summary
    """
    # Run service classification and emotion analysis in parallel
    service_result, emotion_result = await asyncio.gather(
        asyncio.to_thread(
            classify_service_and_tags,
            transcript=text,
            distress=distress
        ),
        analyze_emotion(
            transcript=text,
            distress=distress
        )
    )
    
    # Generate summary using classification results
    summary = await generate_summary(
        transcript=text,
        category=service_result['category'],
        tags=service_result['tags']
    )
    
    return {
        "transcript": text,
        "service": service_result['category'],
        "confidence": service_result['confidence'],
        "tags": service_result['tags'],
        "emotion": emotion_result,
        "summary": summary
    }

Agent Sequence

1. Service Classification

First, determine what type of emergency this is:
from app.agents.service_classify import classify_service_and_tags

service = classify_service_and_tags(
    transcript="Someone's having a heart attack!",
    distress=0.8
)
# Returns:
# {
#   "category": "EMS",
#   "confidence": 0.95,
#   "tags": ["CARDIAC_EVENT", "TRAUMA"]
# }
See Service Classification for details.

2. Emotion Analysis

Next, assess the caller’s emotional state:
from app.agents.emotion import analyze_emotion

emotion = await analyze_emotion(
    transcript="Someone's having a heart attack!",
    distress=0.8
)
# Returns:
# {
#   "label": "HIGHLY_DISTRESSED",
#   "sentiment": "negative",
#   "intensity": 0.8
# }
See Emotion Classification for details.

3. Summary Generation

Finally, create a dispatcher-friendly brief:
from app.agents.summary import generate_summary

summary = await generate_summary(
    transcript="My dad is clutching his chest and can't breathe...",
    category="EMS",
    tags=["CARDIAC_EVENT", "BREATHING_DIFFICULTY"]
)
# Returns: "Caller reports father experiencing chest pain and difficulty breathing."
See Summary Generation for details.

Real-Time Processing

Partial Transcripts

The NLP track handles streaming transcripts with incremental updates:
def on_partial_transcript(text: str, call_id: str):
    """
    Process partial transcript for quick initial classification.
    
    Called by STT client as speech is recognized.
    """
    if len(text) < 20:  # Wait for meaningful text
        return
        
    # Quick classification from partial text
    service = classify_service_and_tags(text, distress=0.5)
    
    # Update dispatcher UI immediately
    update_live_signal(call_id, {
        "service_preliminary": service['category'],
        "tags_preliminary": service['tags'][:3]  # Top 3 tags
    })

def on_final_transcript(text: str, call_id: str):
    """
    Process complete transcript for full analysis.
    
    Called when speech segment is finalized.
    """
    # Run complete pipeline
    results = await process_text(text, distress=get_distress(call_id))
    
    # Update with final results
    update_live_signal(call_id, results)

Progressive Enhancement

# T+0s: Partial "Someone's been..."
{
  "service_preliminary": "OTHER",
  "tags_preliminary": []
}

# T+2s: Partial "Someone's been shot"
{
  "service_preliminary": "EMS",
  "tags_preliminary": ["ACTIVE_SHOOTER", "TRAUMA"]
}

# T+5s: Final "Someone's been shot, they're bleeding badly"
{
  "service": "EMS",
  "confidence": 0.95,
  "tags": ["ACTIVE_SHOOTER", "TRAUMA", "MAJOR_BLEEDING"],
  "emotion": {"label": "HIGHLY_DISTRESSED", "intensity": 0.9},
  "summary": "Caller reports shooting victim with severe bleeding."
}

Error Handling

The NLP track handles failures gracefully:
async def process_text(text: str, distress: float = 0.5) -> dict:
    try:
        service_result = classify_service_and_tags(text, distress)
    except Exception as e:
        print(f"[nlp] Service classification failed: {e}")
        service_result = {
            "category": "OTHER",
            "confidence": 0.0,
            "tags": []
        }
    
    try:
        emotion_result = await analyze_emotion(text, distress)
    except Exception as e:
        print(f"[nlp] Emotion analysis failed: {e}")
        emotion_result = {
            "label": "UNKNOWN",
            "intensity": distress
        }
    
    # Always return something, even on partial failure
    return {
        "transcript": text,
        "service": service_result['category'],
        "emotion": emotion_result,
        "summary": text[:200]  # Fallback to raw transcript
    }
Always provide fallback values so the dispatcher UI never shows empty states, even when AI services fail.

Performance

Latency Budget

AgentTargetTypical
Service Classification< 100ms20-50ms
Emotion Analysis< 1s200-800ms
Summary Generation< 2s500-1500ms
Total Pipeline< 3s1-2s

Optimization Strategies

1. Parallel Execution
# Run independent agents concurrently
service, emotion = await asyncio.gather(
    classify_service_and_tags(...),
    analyze_emotion(...)
)
2. Caching
from functools import lru_cache

@lru_cache(maxsize=1000)
def classify_cached(text_hash: str, distress: float):
    return classify_service_and_tags(text, distress)
3. Early Exit
# Skip expensive operations for obvious cases
if "fire" in text.lower() and len(text) < 50:
    return {"category": "FIRE", "confidence": 0.9}

Testing

Unit Tests

import pytest
from app.agents.nlp_track import process_text

@pytest.mark.asyncio
async def test_process_text_medical():
    result = await process_text(
        text="My mom is having chest pain",
        distress=0.7
    )
    
    assert result['service'] == 'EMS'
    assert 'CARDIAC_EVENT' in result['tags']
    assert result['emotion']['label'] in ['DISTRESSED', 'HIGHLY_DISTRESSED']
    assert len(result['summary']) > 0

@pytest.mark.asyncio
async def test_process_text_empty():
    result = await process_text(text="", distress=0.0)
    
    # Should handle empty text gracefully
    assert result['service'] == 'OTHER'
    assert result['tags'] == []

Integration Tests

@pytest.mark.asyncio
async def test_full_nlp_pipeline():
    """Test complete flow from transcript to final results"""
    
    # Simulate streaming transcript
    partial = "Someone's been"
    final = "Someone's been shot and they're not breathing"
    
    # Partial processing
    partial_result = await process_text(partial, distress=0.6)
    assert partial_result['service'] in ['EMS', 'OTHER']
    
    # Final processing should be more accurate
    final_result = await process_text(final, distress=0.9)
    assert final_result['service'] == 'EMS'
    assert 'ACTIVE_SHOOTER' in final_result['tags']
    assert 'NOT_BREATHING' in final_result['tags']
    assert final_result['confidence'] > 0.8

Next Steps

Service Classification

Emergency type detection

Emotion Analysis

Emotional state classification

Summary Generation

AI-generated summaries

Speech-to-Text

Real-time transcription

Build docs developers (and LLMs) love