System Overview
Sonore Phone Agent is built as a distributed system with two main FastAPI applications that handle different stages of the call lifecycle. The architecture is designed for scalability, reliability, and real-time performance.
Core Components
1. Call Handler Application
The primary service that handles incoming calls and manages active sessions.
Webhook Endpoint
Receives and processes OpenAI SIP call events with built-in security and deduplication:
# From src/apps/calls/api/v1/endpoints/openai_webhook.py:52
@router.post ( "/webhook" )
async def openai_webhook ( request : Request) -> JSONResponse:
"""
Main webhook endpoint for OpenAI SIP call events.
Handles:
- Webhook verification
- Deduplication
- Capacity gating
- Call acceptance/rejection
- Call ended/hangup events
"""
Key responsibilities:
Verify webhook signatures using OpenAI SDK
Deduplicate incoming events (30-minute window)
Route events to appropriate handlers
Enforce capacity limits at global and tenant levels
The webhook endpoint returns 200 OK for all valid requests, even when rejecting calls. This prevents OpenAI from retrying unsuccessfully.
Call Manager
Orchestrates multiple concurrent call sessions with lifecycle management:
# From src/apps/calls/app/call_manager.py:17
class CallManager :
"""
Owns many CallSession instances.
Responsibilities:
- Start/stop sessions on demand
- Enforce max concurrent sessions
- Track active sessions and expose metrics
- Cleanup finished sessions automatically
"""
def __init__ (
self ,
db_client : AsyncMongoClient,
metrics_store : LiveMetricsStore,
max_concurrent : int = settings.max_concurrent_calls,
) -> None :
self .max_concurrent = max_concurrent
self ._sessions: dict[ str , CallSession] = {}
self ._session_tasks: dict[ str , asyncio.Task[ None ]] = {}
Capacity management:
# From src/apps/calls/app/call_manager.py:124
def active_count ( self ) -> int :
return sum (
1
for s in self ._sessions.values()
if s.status in (
CallSessionStatus. RUNNING ,
CallSessionStatus. NEW ,
CallSessionStatus. STOPPING ,
)
)
def active_count_by_tenant ( self , tenant_id : str ) -> int :
return sum (
1
for s in self ._sessions.values()
if s.tenant_id == tenant_id and s.status in ( ... )
)
Call Session
Manages the complete lifecycle of a single phone call:
# From src/apps/calls/app/call_session.py:38
class CallSession ( ToolInvocationDispatcher ):
"""
Owns one call lifecycle:
- owns one WSOpenaiCallClient (which owns the CallRegistry)
- runs the WS session
- reconstructs transcripts after WS ends
- exposes final TranscriptResult
"""
Session states:
# From src/apps/calls/app/call_session.py:29
class CallSessionStatus ( str , Enum ):
NEW = "new"
RUNNING = "running"
STOPPING = "stopping"
FINISHED = "finished"
FAILED = "failed"
CANCELLED = "cancelled"
State transitions:
WebSocket Client
Handles bidirectional communication with OpenAI’s Realtime API:
# From src/apps/calls/ws/client_openai_calls.py:20
class WSOpenaiCallClient ( WSOpenaiClient ):
def __init__ (
self ,
call_id : str ,
tenant_id : str ,
tool_dispatcher : ToolInvocationDispatcher | None ,
registry : CallRegistry,
instructions : ActiveInstructions,
tools_build : ToolBuildResult | None ,
cfg : TenantConfig,
) -> None :
self .endpoint = settings.ws_server_url + "call_id=" + self .call_id
Event processing:
# From src/apps/calls/ws/client_openai_calls.py:71
async def receive_events ( self ) -> None :
async for message in self ._ws:
event = json.loads(message)
event_type = event.get( "type" )
# Session events
if event_type == "session.updated" : ...
# Response events
elif event_type == "response.created" : ...
elif event_type == "response.done" : ...
# Tool events
elif event_type == "response.function_call_arguments.done" : ...
# Transcript events
elif event_type == "conversation.item.input_audio_transcription.completed" : ...
elif event_type == "response.output_audio_transcript.done" : ...
2. Post-Call Processor Application
Handles asynchronous processing after calls complete:
# From src/apps/post_call/main.py:17
@asynccontextmanager
async def lifespan ( app : FastAPI):
"""
Application lifespan management.
Initializes:
- MongoDB client
- OpenAI async client
- Call queue and worker loop
"""
app.state.call_queue = CallQueue()
worker = WorkerLoop(
call_queue = app.state.call_queue,
client = app.state.mongo_client
)
app.state.worker_task = asyncio.create_task(worker.run_forever())
Processing pipeline:
Call session triggers webhook to post-call service
Request added to queue for async processing
Worker loop picks up queued calls
Generates summaries and analytics
Updates MongoDB with processed results
3. Data Layer
MongoDB stores all persistent data with the following collections:
Call Registries
Complete call metadata and transcripts:
# From src/models/registry/call.py
class CallRegistry ( BaseModel ):
call_id: str
tenant_id: str
caller_number: str
created_at: datetime
call_duration: float
transcript_registry: TranscriptRegistry
metadata: CallMetadata
Tenant Configurations
Per-tenant settings and feature flags:
# From src/models/instructions/tenant_config.py
class TenantConfig ( BaseModel ):
tenant_id: str
dialed_numbers: list[ str ]
features: dict[ str , Any]
created_at: datetime
updated_at: datetime
Prompts and Instructions
System prompts and greeting messages:
# From src/models/instructions/prompts.py
class ActiveInstructions ( BaseModel ):
tenant_id: str
greeting_text: str
instruction_text: str
greeting_id: str | None
instruction_id: str | None
updated_at: datetime | None
Request Flow
Incoming Call Acceptance
Webhook Reception
OpenAI sends realtime.call.incoming event: {
"type" : "realtime.call.incoming" ,
"data" : {
"call_id" : "call_abc123" ,
"sip_headers" : [ ... ]
}
}
Signature Verification
Validate webhook authenticity: event = client.webhooks.unwrap(
raw_body,
headers = headers,
secret = settings.openai_webhook_secret.get_secret_value(),
)
Deduplication Check
Prevent duplicate processing: if webhook_id in request.app.state.seen_webhook_ids:
return JSONResponse(
status_code = 200 ,
content = { "ok" : True , "deduped" : True }
)
Tenant Resolution
Map phone number to tenant: tenant_resolver = TenantResolver()
tenant_id = await tenant_resolver.resolve_tenant(dialed_number)
Capacity Gating
Check concurrent call limits: tenant_in_use = tenant_active + tenant_pending
global_in_use = global_active + global_pending
reject_capacity = (
(tenant_in_use >= tenant_limit) or
(global_in_use >= global_limit)
)
Instruction Loading
Fetch tenant-specific prompts: instruction_reader = InstructionReader( client = mongo_client)
instructions = await instruction_reader.get_prompt_by_tenant(tenant_id)
Call Acceptance
Accept the call via OpenAI API: response = await openai_calls_service.accept_call(
call_id,
idempotency_key = f "accept_ { webhook_id } "
)
Session Creation
Start call session in background: asyncio.create_task(
call_manager.start_call(
call_id = call_id,
tenant_id = tenant_id,
instructions = instructions,
cfg = tenant_config,
tools_build = tools_build,
)
)
Active Call Processing
WebSocket Connection
CallSession establishes WebSocket to OpenAI: await self .ws_client.run()
Greeting Delivery
Send initial greeting message: await self .create_response( greeting = True )
Conversation Loop
Process bidirectional audio and events:
Receive audio from caller
Send to OpenAI for transcription
Get AI response
Stream audio back to caller
Tool Execution
Handle function calls during conversation: # From src/apps/calls/app/call_session.py:321
async def submit_tool_invocation (
self , invocation : ToolInvocation
) -> None :
self ._tool_tasks[function_call_id] = asyncio.create_task(
self ._run_tool_and_respond(invocation)
)
Transcript Collection
Build transcript registry from events: # User transcription
if event_type == "conversation.item.input_audio_transcription.completed" :
self .registry.transcript_registry.items[item_id].user_asr[
str (content_index)
] = transcript
# Assistant transcription
elif event_type == "response.output_audio_transcript.done" :
self .registry.transcript_registry.items[item_id].assistant_asr[
str (content_index)
] = transcript
Post-Call Processing
Session Completion
CallSession finishes and writes to MongoDB:
Webhook Trigger
Notify post-call service with retry logic: async def post_process ( self , max_retries : int = 3 ) -> None :
url = f " { settings.post_call_uri } /api/v1/process"
payload = { "call_id" : self .call_id, "tenant_id" : self .tenant_id}
# Exponential backoff retry logic
Queue Processing
Worker loop picks up the call: # From src/apps/post_call/app/worker_loop.py
async def run_forever ( self ):
while True :
call_data = await self .call_queue.dequeue()
await self .process_call(call_data)
Summary Generation
Generate AI summary and analytics using OpenAI
Data Persistence
Update MongoDB with processed results
Scaling Considerations
Horizontal Scaling
Call Handler Stateless design allows running multiple instances behind a load balancer.Each instance manages its own CallManager and sessions. Use sticky sessions or call ID hashing for webhook routing.
Post-Call Processor Queue-based processing enables multiple workers.Use MongoDB or Redis for shared queue. Each worker picks up calls independently.
Capacity Management
# Global settings
MAX_CONCURRENT_CALLS = 50 # System-wide limit
MAX_CONCURRENT_CALLS_PER_TENANT = 10 # Per-tenant limit
Limits enforce:
System stability under load
Fair resource distribution across tenants
Predictable cost management
Database Optimization
Recommended indexes for MongoDB: // Call registries
db . call_registries . createIndex ({ call_id: 1 }, { unique: true })
db . call_registries . createIndex ({ tenant_id: 1 , created_at: - 1 })
// Tenant configs
db . tenant_configs . createIndex ({ tenant_id: 1 }, { unique: true })
db . tenant_configs . createIndex ({ "dialed_numbers" : 1 })
// Prompts
db . prompts . createIndex ({ tenant_id: 1 , prompt_type: 1 , active: 1 })
Error Handling
The system implements comprehensive error handling at each layer:
Webhook Level
Invalid signature → Return 401 Unauthorized
Duplicate webhook → Return 200 OK (deduped)
Capacity exceeded → Reject call gracefully
Missing tenant → Reject call, record metric
Session Level
WebSocket disconnect → Retry connection, fallback to cleanup
Tool execution failure → Return error to AI, continue call
Timeout → Graceful shutdown, save partial transcript
Post-Call Level
Processing failure → Retry with exponential backoff (3 attempts)
Database error → Log and alert, queue for manual review
All errors are logged with structured events for easy debugging: log_event(
logging. ERROR ,
"call_session_failed" ,
call_id = self .call_id,
error = str (e)
)
Monitoring and Observability
Structured Logging
All components use structured logging with context:
# From src/core/logger.py
call_id_var = ContextVar( "call_id" , default = "" )
tenant_id_var = ContextVar( "tenant_id" , default = "" )
def log_event ( level : int , event_name : str , ** kwargs ):
# Automatic context injection
Live Metrics
Real-time metrics endpoint:
GET /api/v1/metrics/live
# Response:
{
"timestamp" : "2024-03-02T14:30:00Z",
"active_calls" : 5,
"active_by_tenant" : {
"tenant-a" : 3,
"tenant-b" : 2
},
"total_accepted" : 150,
"total_rejected" : 10
}
Health Checks
GET /health
# Response:
{ "status" : "ok"}
Security
All webhooks are verified using OpenAI’s signature verification: event = client.webhooks.unwrap(
raw_body,
headers = headers,
secret = settings.openai_webhook_secret.get_secret_value(),
)
Secrets managed via Pydantic SecretStr: class Settings ( BaseSettings ):
openai_api_key: SecretStr
openai_webhook_secret: SecretStr
mongodb_uri: SecretStr
Complete data isolation between tenants:
Separate configurations
Isolated capacity limits
Tenant ID in all database queries
Next Steps
Call Lifecycle Deep dive into call state management
Multi-Tenancy Configure tenant isolation and routing
Custom Tools Build tools for call transfers and actions
Deployment Production deployment patterns