Skip to main content

Overview

The pipeline is the central coordinator that manages the flow of data through the agent system. It kicks off parallel processing tracks and merges results into a unified analysis.

Source Code

Location: app/agents/pipeline.py
def run_pipeline(call_id: str):
    # kick off audio & nlp tracks; for now, just return a placeholder packet
    return {"call_id": call_id, "status": "processed"}
The current implementation is a minimal stub. In production, this would orchestrate the full agent workflow.

Pipeline Workflow

The full pipeline (in development) follows this pattern:

1. Initialization

def run_pipeline(call_id: str):
    # Create STT client for real-time transcription
    stt_client = STTClient(
        on_partial=handle_partial_transcript,
        on_final=handle_final_transcript
    )
    await stt_client.start(sample_rate=8000)

2. Parallel Track Processing

Audio Track (audio_track.py)
def analyze_audio_frame(frame: bytes) -> dict:
    # Computes distress score from audio features
    return {"distress_score": 0.0, "hazards": []}
NLP Track (nlp_track.py)
def process_text(text: str) -> dict:
    # Runs text through LLM agents
    return {"transcript": text, "intent": None, "summary": None}

3. Results Merging

Location: app/agents/merger.py
def merge_tracks(call_id: str, nlp: dict, audio: dict) -> dict:
    return {"call_id": call_id, "nlp": nlp, "audio": audio}
The merger combines:
  • Audio distress metrics
  • Transcript and NLP results
  • Emotion classification
  • Service categorization
  • Generated summary

Integration Points

WebSocket Communication

The pipeline receives audio streams via WebSocket:
# Pseudo-code for integration
async def handle_call_stream(websocket, call_id):
    # Start pipeline
    pipeline = run_pipeline(call_id)
    
    # Feed audio chunks
    async for audio_chunk in websocket:
        await stt_client.feed(audio_chunk)
        
    # Finalize and get results
    results = await stt_client.finalize()

State Management

The pipeline maintains call state in LIVE_SIGNALS (see API documentation):
LIVE_SIGNALS[call_id] = {
    "partial_transcript": "",
    "final_transcript": "",
    "emotion": {},
    "service": {},
    "summary": ""
}

Error Handling

The pipeline implements graceful degradation:
  1. API Failures: Falls back to heuristic analysis
  2. Missing Transcripts: Uses audio-only analysis
  3. Network Issues: Queues results for retry
try:
    emotion = await analyze_emotion(transcript, distress)
except Exception as e:
    print(f"[pipeline] Emotion analysis failed: {e}")
    emotion = {"label": "UNKNOWN", "intensity": distress}

Performance Considerations

Latency Targets

  • Partial Transcript: < 500ms
  • Emotion Classification: < 1s
  • Service Classification: < 1s
  • Summary Generation: < 2s

Parallel Execution

Agents run concurrently to minimize total latency:
# Run independent analyses in parallel
results = await asyncio.gather(
    analyze_emotion(transcript, distress),
    classify_service_and_tags(transcript, distress),
    generate_summary(transcript, category, tags)
)
Use asyncio.gather() to run independent agent calls in parallel. This can reduce total processing time by 2-3x.

Real-Time Updates

The pipeline provides progressive enhancement:
  1. Partial Transcripts → Shown immediately to dispatcher
  2. Initial Classification → Based on first few seconds
  3. Refined Analysis → Updated as more context arrives
  4. Final Summary → Generated when call ends
# Example progressive update
def on_partial(text: str, call_id: str):
    # Quick classification from partial text
    quick_service = classify_service_and_tags(text, distress=0.5)
    update_dispatcher_ui(call_id, {"service": quick_service})

def on_final(text: str, call_id: str):
    # Full analysis with complete context
    full_results = await run_all_agents(text)
    update_dispatcher_ui(call_id, full_results)

Testing

Test the pipeline with mock calls:
import pytest
from app.agents.pipeline import run_pipeline

def test_pipeline_returns_call_id():
    result = run_pipeline("test-call-123")
    assert result["call_id"] == "test-call-123"
    assert result["status"] == "processed"

Next Steps

Audio Track

Audio distress analysis

NLP Track

Text processing pipeline

Build docs developers (and LLMs) love