The Sonore Phone Agent manages calls through a well-defined lifecycle that spans from initial webhook reception to final cleanup. This document explains each stage of the call lifecycle.
Overview
A call flows through several key stages:
Stage 1: Webhook Reception
When a call arrives, OpenAI sends a webhook to your configured endpoint.
Webhook Verification
The first step is verifying the webhook signature to ensure authenticity:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:70-84
client = OpenAI(api_key=settings.openai_api_key.get_secret_value())
try:
event = client.webhooks.unwrap(
raw_body,
headers=headers,
secret=settings.openai_webhook_secret.get_secret_value(),
)
except Exception as e:
log_event(logging.ERROR, "webhook_verification_failed", str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid signature"
) from e
Webhook verification uses the OpenAI SDK’s built-in signature validation to prevent unauthorized requests.
Deduplication
The system maintains a cache of recently seen webhook IDs to prevent duplicate processing:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:118-135
async with request.app.state.dedup_lock:
now = time.time()
# Cleanup old entries every 30 minutes
if now - request.app.state.last_dedup_cleanup > 1800:
request.app.state.seen_webhook_ids = {
wid: ts
for wid, ts in request.app.state.seen_webhook_ids.items()
if now - ts < 1800
}
request.app.state.last_dedup_cleanup = now
if webhook_id in request.app.state.seen_webhook_ids:
log_event(logging.WARNING, "duplicate_webhook_id", webhook_id)
return JSONResponse(
status_code=status.HTTP_200_OK, content={"ok": True, "deduped": True}
)
Stage 2: Tenant Resolution
Each incoming call must be mapped to a tenant based on the dialed number.
Phone Number Lookup
The TenantResolver queries the database to find which tenant owns the dialed number:
# From: src/apps/calls/app/tenant_resolution.py:18-41
class TenantResolver:
async def resolve_tenant(self, phone_number: str) -> str:
query = {"phone_number": phone_number}
try:
doc = await fetch_from_mongodb(
query=query,
client=self.client,
db=self.db,
collection=self.collection,
)
if not doc:
raise ValueError(
f"No document found for the given phone number : {phone_number}"
)
tenant_map = TenantNumberMap(**doc)
return tenant_map.tenant_id
except Exception as e:
logger.error(
f"TenantResolver.resolve_tenant: error fetching tenant for phone number {phone_number}: {e}"
)
raise
If tenant resolution fails, the call is immediately rejected. Ensure all phone numbers are properly configured in the phone-tenant-map collection.
Stage 3: Capacity Management
Before accepting a call, the system checks both global and per-tenant capacity limits.
Capacity Checks
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:276-288
tenant_pending = len(request.app.state.pending_by_tenant.get(tenant_id, set()))
global_pending = len(request.app.state.pending_call_ids)
tenant_active = call_manager.active_count_by_tenant(tenant_id)
global_active = call_manager.active_count()
tenant_in_use = tenant_active + tenant_pending
global_in_use = global_active + global_pending
reject_capacity = (tenant_in_use >= tenant_limit) or (
global_in_use >= global_limit
)
Check Pending
Count calls that have been accepted but not yet started
Check Active
Count calls that are currently running
Calculate Total
Sum pending + active for both tenant and global scopes
Enforce Limits
Reject if either tenant or global limit is exceeded
Stage 4: Configuration Loading
Once capacity is confirmed, the system loads configuration for the tenant.
Loading Instructions
Instructions are loaded from the database with caching:
# From: src/apps/calls/app/instructions_service.py:96-116
async def get_prompt_by_tenant(self, tenant_id: str) -> ActiveInstructions:
# Check cache first
try:
cached = self._get_cached(tenant_id)
if cached is not None:
return cached
except CacheEntryExpiredError:
logger.info("Cache entry expired for tenant_id=%s", tenant_id)
async with self._lock_for_tenant(tenant_id):
# Re-check cache inside lock
try:
cached = self._get_cached(tenant_id)
if cached is not None:
return cached
except CacheEntryExpiredError:
logger.info("Cache entry expired for tenant_id=%s", tenant_id)
prompt_pointer = await self.get_active_prompts_by_tenant(tenant_id)
# ... fetch greeting and instruction text
See Instructions for more details.
Tools are dynamically built based on tenant configuration:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:532-542
try:
if tenant_config:
tool_builder: ToolBuilder = request.app.state.tool_builder
tools_build = tool_builder.build_tools(tenant_config)
except ToolBuildError as e:
log_event(logging.ERROR, "tool_build_error", str(e))
tools_build = None
except Exception as e:
log_event(logging.ERROR, "tool_build_unexpected_error", str(e))
tools_build = None
See Tools for more details.
Stage 5: Call Acceptance
After all validations pass, the call is accepted via the OpenAI API.
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:545-586
response = await openai_calls_service.accept_call(
call_id, idempotency_key=f"accept_{webhook_id}"
)
if response.status_code == 200:
log_event(logging.INFO, "call_accepted")
await metrics_store.record_accept(tenant_id=tenant_id, call_id=call_id)
# Start the call session in background
asyncio.create_task(
_start_call_session(
request=request,
call_id=call_id,
tenant_id=tenant_id,
instructions=instructions,
tools_build=tools_build,
cfg=tenant_config,
caller_number=caller_number,
dialed_number=dialed_number,
tool_executor=tool_executor,
),
name=f"start-{call_id}",
)
The call session starts in a background task, allowing the webhook to return quickly to OpenAI.
Stage 6: Call Session Execution
The CallSession manages the active call lifecycle.
Session Startup
# From: src/apps/calls/app/call_session.py:113-128
async def start(self) -> None:
"""
Starts the call session in background.
- Does not block until completion.
"""
async with self._lock:
if self.status != CallSessionStatus.NEW:
return # Already started
self.status = CallSessionStatus.RUNNING
self._run_task = asyncio.create_task(
self._run(), name=f"CallSession-{self.call_id}"
)
await self.metrics_store.record_start(
tenant_id=self.tenant_id,
call_id=self.call_id,
)
WebSocket Connection
The session establishes a WebSocket connection to OpenAI’s Realtime API:
# From: src/apps/calls/app/call_session.py:259-294
async def _run(self) -> None:
"""
Runs WS client, then reconstructs transcript from client's registry.
"""
call_id_var.set(self.call_id)
tenant_id_var.set(self.tenant_id)
try:
await self.ws_client.run()
self.status = CallSessionStatus.FINISHED
self._end_reason = EndReason.COMPLETED
log_event(logging.INFO, "call_session_finished", call_id=self.call_id)
except asyncio.CancelledError:
self.status = CallSessionStatus.CANCELLED
self._end_reason = EndReason.ERROR
log_event(logging.INFO, "call_session_cancelled", call_id=self.call_id)
raise
except BaseException as e:
self.error = e
self.status = CallSessionStatus.FAILED
self._end_reason = EndReason.ERROR
log_event(
logging.ERROR, "call_session_failed", call_id=self.call_id, error=str(e)
)
finally:
await self._cancel_all_tool_tasks()
self._done.set()
await self.metrics_store.record_end(
tenant_id=self.tenant_id,
call_id=self.call_id,
end_reason=self._end_reason,
)
Stage 7: Call Termination
Calls can end in several ways:
Normal Completion
The caller or AI ends the conversation normally:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:675-709
async def _handle_call_ended(
request: Request, call_id: str, webhook_id: str
) -> JSONResponse:
"""
Handle call ended/hangup events.
"""
if not call_id:
log_event(
logging.ERROR, "missing_call_id", "Call ID is missing in call ended event"
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"ok": True, "ignored": True, "reason": "missing_call_id"},
)
try:
call_manager: CallManager = request.app.state.call_manager
# Remove from pending calls if present
await _release_pending_capacity_state(request, call_id)
async with request.app.state.capacity_lock:
request.app.state.accepted_call_ids.pop(call_id, None)
# Stop the call session
await call_manager.stop_call(call_id, reason="call_ended")
log_event(logging.INFO, "call_ended_handled", call_id)
return JSONResponse(status_code=status.HTTP_200_OK, content={"ok": True})
except Exception as e:
log_event(logging.ERROR, "call_ended_handling_failed", call_id, error=str(e))
return JSONResponse(
status_code=status.HTTP_200_OK, content={"ok": False, "error": str(e)}
)
Graceful Shutdown
The session stops gracefully:
# From: src/apps/calls/app/call_session.py:218-250
async def stop(self, reason: str | None = None) -> None:
"""
Stops the call session gracefully.
"""
async with self._lock:
if self.status in (
CallSessionStatus.STOPPING,
CallSessionStatus.FINISHED,
CallSessionStatus.FAILED,
CallSessionStatus.CANCELLED,
):
return # Already stopping or finished
self.status = CallSessionStatus.STOPPING
if reason:
log_event(
logging.INFO,
"call_session_stopping",
call_id=self.call_id,
reason=reason,
)
if self._run_task and not self._run_task.done():
self._run_task.cancel()
await self.wait_done()
if self._run_task:
try:
await asyncio.shield(self._run_task)
except asyncio.CancelledError:
pass
await self._cancel_all_tool_tasks()
Stage 8: Persistence & Cleanup
Writing to Database
The call registry is persisted to MongoDB:
# From: src/apps/calls/app/call_session.py:298-316
async def write_to_db(self) -> None:
try:
log_event(
logging.INFO,
"call_session_writing_to_db",
call_id=self.call_id,
object=self.call_registry.model_dump_json(),
)
await push_registry_to_mongodb(self.call_registry, client=self.db_client)
log_event(
logging.INFO, "call_session_db_write_successful", call_id=self.call_id
)
except Exception as e:
log_event(
logging.ERROR,
"call_session_db_write_failed",
call_id=self.call_id,
error=str(e),
)
Post-Processing Webhook
After the call ends, a post-processing webhook is triggered:
# From: src/apps/calls/app/call_session.py:643-658
base = (settings.post_call_uri or "").rstrip("/")
url = f"{base}/api/v1/process"
payload: dict[str, Any] = {"call_id": self.call_id, "tenant_id": self.tenant_id}
timeout = httpx.Timeout(connect=5.0, read=10.0, write=10.0, pool=5.0)
last_exc: Exception | None = None
attempt = 0
while attempt <= max_retries:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=payload)
if 200 <= resp.status_code < 300:
log_event(
logging.INFO,
"call_session_post_process_successful",
Post-processing webhooks use exponential backoff with configurable retry limits for transient errors.
Session States
Calls transition through well-defined states:
# From: src/apps/calls/app/call_session.py:29-35
class CallSessionStatus(str, Enum):
NEW = "new"
RUNNING = "running"
STOPPING = "stopping"
FINISHED = "finished"
FAILED = "failed"
CANCELLED = "cancelled"
Error Handling
The system handles errors at multiple levels:
Webhook Level
- Tenant Resolution Failed: Call is rejected
- Capacity Exceeded: Call is rejected with capacity metrics
- Instructions Missing: Call is rejected or uses fallback
- Configuration Invalid: Call is rejected
Session Level
- WebSocket Connection Failed: Call ends with ERROR reason
- Tool Execution Failed: Error returned to AI, call continues
- Database Write Failed: Logged but doesn’t fail the call
Cleanup Guarantees
The system ensures cleanup happens even on errors:
# From: src/apps/calls/app/call_manager.py:208-230
async def _await_session(
self, session: CallSession, *, timeout: float | None
) -> None:
"""
Manager-level awaiter. It centralizes:
- waiting for session completion
- status-based metric updates
- future hooks: write to DB, trigger post-call pipeline, etc.
"""
try:
await session.run(timeout=timeout)
except Exception as e:
log_event(
logging.ERROR,
"call_session_run_failed",
call_id=session.call_id,
error=str(e),
)
finally:
# Cleanup session references
self._sessions.pop(session.call_id, None)
self._session_tasks.pop(session.call_id, None)
Next Steps