Overview
The pipeline is the central coordinator that manages the flow of data through the agent system. It kicks off parallel processing tracks and merges results into a unified analysis.
Source Code
Location: app/agents/pipeline.py
def run_pipeline ( call_id : str ):
# kick off audio & nlp tracks; for now, just return a placeholder packet
return { "call_id" : call_id, "status" : "processed" }
The current implementation is a minimal stub . In production, this would orchestrate the full agent workflow.
Pipeline Workflow
The full pipeline (in development) follows this pattern:
1. Initialization
def run_pipeline ( call_id : str ):
# Create STT client for real-time transcription
stt_client = STTClient(
on_partial = handle_partial_transcript,
on_final = handle_final_transcript
)
await stt_client.start( sample_rate = 8000 )
2. Parallel Track Processing
Audio Track (audio_track.py)
def analyze_audio_frame ( frame : bytes ) -> dict :
# Computes distress score from audio features
return { "distress_score" : 0.0 , "hazards" : []}
NLP Track (nlp_track.py)
def process_text ( text : str ) -> dict :
# Runs text through LLM agents
return { "transcript" : text, "intent" : None , "summary" : None }
3. Results Merging
Location: app/agents/merger.py
def merge_tracks ( call_id : str , nlp : dict , audio : dict ) -> dict :
return { "call_id" : call_id, "nlp" : nlp, "audio" : audio}
The merger combines:
Audio distress metrics
Transcript and NLP results
Emotion classification
Service categorization
Generated summary
Integration Points
WebSocket Communication
The pipeline receives audio streams via WebSocket:
# Pseudo-code for integration
async def handle_call_stream ( websocket , call_id ):
# Start pipeline
pipeline = run_pipeline(call_id)
# Feed audio chunks
async for audio_chunk in websocket:
await stt_client.feed(audio_chunk)
# Finalize and get results
results = await stt_client.finalize()
State Management
The pipeline maintains call state in LIVE_SIGNALS (see API documentation):
LIVE_SIGNALS [call_id] = {
"partial_transcript" : "" ,
"final_transcript" : "" ,
"emotion" : {},
"service" : {},
"summary" : ""
}
Error Handling
The pipeline implements graceful degradation:
API Failures : Falls back to heuristic analysis
Missing Transcripts : Uses audio-only analysis
Network Issues : Queues results for retry
try :
emotion = await analyze_emotion(transcript, distress)
except Exception as e:
print ( f "[pipeline] Emotion analysis failed: { e } " )
emotion = { "label" : "UNKNOWN" , "intensity" : distress}
Latency Targets
Partial Transcript : < 500ms
Emotion Classification : < 1s
Service Classification : < 1s
Summary Generation : < 2s
Parallel Execution
Agents run concurrently to minimize total latency:
# Run independent analyses in parallel
results = await asyncio.gather(
analyze_emotion(transcript, distress),
classify_service_and_tags(transcript, distress),
generate_summary(transcript, category, tags)
)
Use asyncio.gather() to run independent agent calls in parallel. This can reduce total processing time by 2-3x.
Real-Time Updates
The pipeline provides progressive enhancement:
Partial Transcripts → Shown immediately to dispatcher
Initial Classification → Based on first few seconds
Refined Analysis → Updated as more context arrives
Final Summary → Generated when call ends
# Example progressive update
def on_partial ( text : str , call_id : str ):
# Quick classification from partial text
quick_service = classify_service_and_tags(text, distress = 0.5 )
update_dispatcher_ui(call_id, { "service" : quick_service})
def on_final ( text : str , call_id : str ):
# Full analysis with complete context
full_results = await run_all_agents(text)
update_dispatcher_ui(call_id, full_results)
Testing
Test the pipeline with mock calls:
import pytest
from app.agents.pipeline import run_pipeline
def test_pipeline_returns_call_id ():
result = run_pipeline( "test-call-123" )
assert result[ "call_id" ] == "test-call-123"
assert result[ "status" ] == "processed"
Next Steps
Audio Track Audio distress analysis
NLP Track Text processing pipeline