Skip to main content

System Overview

Sonore Phone Agent is built as a distributed system with two main FastAPI applications that handle different stages of the call lifecycle. The architecture is designed for scalability, reliability, and real-time performance.

Core Components

1. Call Handler Application

The primary service that handles incoming calls and manages active sessions.

Webhook Endpoint

Receives and processes OpenAI SIP call events with built-in security and deduplication:
# From src/apps/calls/api/v1/endpoints/openai_webhook.py:52
@router.post("/webhook")
async def openai_webhook(request: Request) -> JSONResponse:
    """
    Main webhook endpoint for OpenAI SIP call events.

    Handles:
    - Webhook verification
    - Deduplication
    - Capacity gating
    - Call acceptance/rejection
    - Call ended/hangup events
    """
Key responsibilities:
  • Verify webhook signatures using OpenAI SDK
  • Deduplicate incoming events (30-minute window)
  • Route events to appropriate handlers
  • Enforce capacity limits at global and tenant levels
The webhook endpoint returns 200 OK for all valid requests, even when rejecting calls. This prevents OpenAI from retrying unsuccessfully.

Call Manager

Orchestrates multiple concurrent call sessions with lifecycle management:
# From src/apps/calls/app/call_manager.py:17
class CallManager:
    """
    Owns many CallSession instances.

    Responsibilities:
    - Start/stop sessions on demand
    - Enforce max concurrent sessions
    - Track active sessions and expose metrics
    - Cleanup finished sessions automatically
    """

    def __init__(
        self,
        db_client: AsyncMongoClient,
        metrics_store: LiveMetricsStore,
        max_concurrent: int = settings.max_concurrent_calls,
    ) -> None:
        self.max_concurrent = max_concurrent
        self._sessions: dict[str, CallSession] = {}
        self._session_tasks: dict[str, asyncio.Task[None]] = {}
Capacity management:
# From src/apps/calls/app/call_manager.py:124
def active_count(self) -> int:
    return sum(
        1
        for s in self._sessions.values()
        if s.status in (
            CallSessionStatus.RUNNING,
            CallSessionStatus.NEW,
            CallSessionStatus.STOPPING,
        )
    )

def active_count_by_tenant(self, tenant_id: str) -> int:
    return sum(
        1
        for s in self._sessions.values()
        if s.tenant_id == tenant_id and s.status in (...)
    )

Call Session

Manages the complete lifecycle of a single phone call:
# From src/apps/calls/app/call_session.py:38
class CallSession(ToolInvocationDispatcher):
    """
    Owns one call lifecycle:
    - owns one WSOpenaiCallClient (which owns the CallRegistry)
    - runs the WS session
    - reconstructs transcripts after WS ends
    - exposes final TranscriptResult
    """
Session states:
# From src/apps/calls/app/call_session.py:29
class CallSessionStatus(str, Enum):
    NEW = "new"
    RUNNING = "running"
    STOPPING = "stopping"
    FINISHED = "finished"
    FAILED = "failed"
    CANCELLED = "cancelled"
State transitions:

WebSocket Client

Handles bidirectional communication with OpenAI’s Realtime API:
# From src/apps/calls/ws/client_openai_calls.py:20
class WSOpenaiCallClient(WSOpenaiClient):
    def __init__(
        self,
        call_id: str,
        tenant_id: str,
        tool_dispatcher: ToolInvocationDispatcher | None,
        registry: CallRegistry,
        instructions: ActiveInstructions,
        tools_build: ToolBuildResult | None,
        cfg: TenantConfig,
    ) -> None:
        self.endpoint = settings.ws_server_url + "call_id=" + self.call_id
Event processing:
# From src/apps/calls/ws/client_openai_calls.py:71
async def receive_events(self) -> None:
    async for message in self._ws:
        event = json.loads(message)
        event_type = event.get("type")

        # Session events
        if event_type == "session.updated": ...

        # Response events
        elif event_type == "response.created": ...
        elif event_type == "response.done": ...

        # Tool events
        elif event_type == "response.function_call_arguments.done": ...

        # Transcript events
        elif event_type == "conversation.item.input_audio_transcription.completed": ...
        elif event_type == "response.output_audio_transcript.done": ...

2. Post-Call Processor Application

Handles asynchronous processing after calls complete:
# From src/apps/post_call/main.py:17
@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Application lifespan management.

    Initializes:
    - MongoDB client
    - OpenAI async client
    - Call queue and worker loop
    """
    app.state.call_queue = CallQueue()
    worker = WorkerLoop(
        call_queue=app.state.call_queue,
        client=app.state.mongo_client
    )
    app.state.worker_task = asyncio.create_task(worker.run_forever())
Processing pipeline:
  1. Call session triggers webhook to post-call service
  2. Request added to queue for async processing
  3. Worker loop picks up queued calls
  4. Generates summaries and analytics
  5. Updates MongoDB with processed results

3. Data Layer

MongoDB stores all persistent data with the following collections:

Call Registries

Complete call metadata and transcripts:
# From src/models/registry/call.py
class CallRegistry(BaseModel):
    call_id: str
    tenant_id: str
    caller_number: str
    created_at: datetime
    call_duration: float
    transcript_registry: TranscriptRegistry
    metadata: CallMetadata

Tenant Configurations

Per-tenant settings and feature flags:
# From src/models/instructions/tenant_config.py
class TenantConfig(BaseModel):
    tenant_id: str
    dialed_numbers: list[str]
    features: dict[str, Any]
    created_at: datetime
    updated_at: datetime

Prompts and Instructions

System prompts and greeting messages:
# From src/models/instructions/prompts.py
class ActiveInstructions(BaseModel):
    tenant_id: str
    greeting_text: str
    instruction_text: str
    greeting_id: str | None
    instruction_id: str | None
    updated_at: datetime | None

Request Flow

Incoming Call Acceptance

1

Webhook Reception

OpenAI sends realtime.call.incoming event:
{
  "type": "realtime.call.incoming",
  "data": {
    "call_id": "call_abc123",
    "sip_headers": [...]
  }
}
2

Signature Verification

Validate webhook authenticity:
event = client.webhooks.unwrap(
    raw_body,
    headers=headers,
    secret=settings.openai_webhook_secret.get_secret_value(),
)
3

Deduplication Check

Prevent duplicate processing:
if webhook_id in request.app.state.seen_webhook_ids:
    return JSONResponse(
        status_code=200,
        content={"ok": True, "deduped": True}
    )
4

Tenant Resolution

Map phone number to tenant:
tenant_resolver = TenantResolver()
tenant_id = await tenant_resolver.resolve_tenant(dialed_number)
5

Capacity Gating

Check concurrent call limits:
tenant_in_use = tenant_active + tenant_pending
global_in_use = global_active + global_pending

reject_capacity = (
    (tenant_in_use >= tenant_limit) or
    (global_in_use >= global_limit)
)
6

Instruction Loading

Fetch tenant-specific prompts:
instruction_reader = InstructionReader(client=mongo_client)
instructions = await instruction_reader.get_prompt_by_tenant(tenant_id)
7

Call Acceptance

Accept the call via OpenAI API:
response = await openai_calls_service.accept_call(
    call_id,
    idempotency_key=f"accept_{webhook_id}"
)
8

Session Creation

Start call session in background:
asyncio.create_task(
    call_manager.start_call(
        call_id=call_id,
        tenant_id=tenant_id,
        instructions=instructions,
        cfg=tenant_config,
        tools_build=tools_build,
    )
)

Active Call Processing

1

WebSocket Connection

CallSession establishes WebSocket to OpenAI:
await self.ws_client.run()
2

Greeting Delivery

Send initial greeting message:
await self.create_response(greeting=True)
3

Conversation Loop

Process bidirectional audio and events:
  • Receive audio from caller
  • Send to OpenAI for transcription
  • Get AI response
  • Stream audio back to caller
4

Tool Execution

Handle function calls during conversation:
# From src/apps/calls/app/call_session.py:321
async def submit_tool_invocation(
    self, invocation: ToolInvocation
) -> None:
    self._tool_tasks[function_call_id] = asyncio.create_task(
        self._run_tool_and_respond(invocation)
    )
5

Transcript Collection

Build transcript registry from events:
# User transcription
if event_type == "conversation.item.input_audio_transcription.completed":
    self.registry.transcript_registry.items[item_id].user_asr[
        str(content_index)
    ] = transcript

# Assistant transcription
elif event_type == "response.output_audio_transcript.done":
    self.registry.transcript_registry.items[item_id].assistant_asr[
        str(content_index)
    ] = transcript

Post-Call Processing

1

Session Completion

CallSession finishes and writes to MongoDB:
await self.write_to_db()
2

Webhook Trigger

Notify post-call service with retry logic:
async def post_process(self, max_retries: int = 3) -> None:
    url = f"{settings.post_call_uri}/api/v1/process"
    payload = {"call_id": self.call_id, "tenant_id": self.tenant_id}
    # Exponential backoff retry logic
3

Queue Processing

Worker loop picks up the call:
# From src/apps/post_call/app/worker_loop.py
async def run_forever(self):
    while True:
        call_data = await self.call_queue.dequeue()
        await self.process_call(call_data)
4

Summary Generation

Generate AI summary and analytics using OpenAI
5

Data Persistence

Update MongoDB with processed results

Scaling Considerations

Horizontal Scaling

Call Handler

Stateless design allows running multiple instances behind a load balancer.Each instance manages its own CallManager and sessions. Use sticky sessions or call ID hashing for webhook routing.

Post-Call Processor

Queue-based processing enables multiple workers.Use MongoDB or Redis for shared queue. Each worker picks up calls independently.

Capacity Management

# Global settings
MAX_CONCURRENT_CALLS=50          # System-wide limit
MAX_CONCURRENT_CALLS_PER_TENANT=10  # Per-tenant limit
Limits enforce:
  • System stability under load
  • Fair resource distribution across tenants
  • Predictable cost management

Database Optimization

Recommended indexes for MongoDB:
// Call registries
db.call_registries.createIndex({ call_id: 1 }, { unique: true })
db.call_registries.createIndex({ tenant_id: 1, created_at: -1 })

// Tenant configs
db.tenant_configs.createIndex({ tenant_id: 1 }, { unique: true })
db.tenant_configs.createIndex({ "dialed_numbers": 1 })

// Prompts
db.prompts.createIndex({ tenant_id: 1, prompt_type: 1, active: 1 })

Error Handling

The system implements comprehensive error handling at each layer:

Webhook Level

  • Invalid signature → Return 401 Unauthorized
  • Duplicate webhook → Return 200 OK (deduped)
  • Capacity exceeded → Reject call gracefully
  • Missing tenant → Reject call, record metric

Session Level

  • WebSocket disconnect → Retry connection, fallback to cleanup
  • Tool execution failure → Return error to AI, continue call
  • Timeout → Graceful shutdown, save partial transcript

Post-Call Level

  • Processing failure → Retry with exponential backoff (3 attempts)
  • Database error → Log and alert, queue for manual review
All errors are logged with structured events for easy debugging:
log_event(
    logging.ERROR,
    "call_session_failed",
    call_id=self.call_id,
    error=str(e)
)

Monitoring and Observability

Structured Logging

All components use structured logging with context:
# From src/core/logger.py
call_id_var = ContextVar("call_id", default="")
tenant_id_var = ContextVar("tenant_id", default="")

def log_event(level: int, event_name: str, **kwargs):
    # Automatic context injection

Live Metrics

Real-time metrics endpoint:
GET /api/v1/metrics/live

# Response:
{
  "timestamp": "2024-03-02T14:30:00Z",
  "active_calls": 5,
  "active_by_tenant": {
    "tenant-a": 3,
    "tenant-b": 2
  },
  "total_accepted": 150,
  "total_rejected": 10
}

Health Checks

GET /health

# Response:
{"status": "ok"}

Security

All webhooks are verified using OpenAI’s signature verification:
event = client.webhooks.unwrap(
    raw_body,
    headers=headers,
    secret=settings.openai_webhook_secret.get_secret_value(),
)
Secrets managed via Pydantic SecretStr:
class Settings(BaseSettings):
    openai_api_key: SecretStr
    openai_webhook_secret: SecretStr
    mongodb_uri: SecretStr
Complete data isolation between tenants:
  • Separate configurations
  • Isolated capacity limits
  • Tenant ID in all database queries

Next Steps

Call Lifecycle

Deep dive into call state management

Multi-Tenancy

Configure tenant isolation and routing

Custom Tools

Build tools for call transfers and actions

Deployment

Production deployment patterns

Build docs developers (and LLMs) love