Skip to main content
The Sonore Phone Agent manages calls through a well-defined lifecycle that spans from initial webhook reception to final cleanup. This document explains each stage of the call lifecycle.

Overview

A call flows through several key stages:

Stage 1: Webhook Reception

When a call arrives, OpenAI sends a webhook to your configured endpoint.

Webhook Verification

The first step is verifying the webhook signature to ensure authenticity:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:70-84
client = OpenAI(api_key=settings.openai_api_key.get_secret_value())

try:
    event = client.webhooks.unwrap(
        raw_body,
        headers=headers,
        secret=settings.openai_webhook_secret.get_secret_value(),
    )
except Exception as e:
    log_event(logging.ERROR, "webhook_verification_failed", str(e))
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid signature"
    ) from e
Webhook verification uses the OpenAI SDK’s built-in signature validation to prevent unauthorized requests.

Deduplication

The system maintains a cache of recently seen webhook IDs to prevent duplicate processing:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:118-135
async with request.app.state.dedup_lock:
    now = time.time()

    # Cleanup old entries every 30 minutes
    if now - request.app.state.last_dedup_cleanup > 1800:
        request.app.state.seen_webhook_ids = {
            wid: ts
            for wid, ts in request.app.state.seen_webhook_ids.items()
            if now - ts < 1800
        }
        request.app.state.last_dedup_cleanup = now

    if webhook_id in request.app.state.seen_webhook_ids:
        log_event(logging.WARNING, "duplicate_webhook_id", webhook_id)
        return JSONResponse(
            status_code=status.HTTP_200_OK, content={"ok": True, "deduped": True}
        )

Stage 2: Tenant Resolution

Each incoming call must be mapped to a tenant based on the dialed number.

Phone Number Lookup

The TenantResolver queries the database to find which tenant owns the dialed number:
# From: src/apps/calls/app/tenant_resolution.py:18-41
class TenantResolver:
    async def resolve_tenant(self, phone_number: str) -> str:
        query = {"phone_number": phone_number}

        try:
            doc = await fetch_from_mongodb(
                query=query,
                client=self.client,
                db=self.db,
                collection=self.collection,
            )

            if not doc:
                raise ValueError(
                    f"No document found for the given phone number : {phone_number}"
                )

            tenant_map = TenantNumberMap(**doc)
            return tenant_map.tenant_id

        except Exception as e:
            logger.error(
                f"TenantResolver.resolve_tenant: error fetching tenant for phone number {phone_number}: {e}"
            )
            raise
If tenant resolution fails, the call is immediately rejected. Ensure all phone numbers are properly configured in the phone-tenant-map collection.

Stage 3: Capacity Management

Before accepting a call, the system checks both global and per-tenant capacity limits.

Capacity Checks

# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:276-288
tenant_pending = len(request.app.state.pending_by_tenant.get(tenant_id, set()))
global_pending = len(request.app.state.pending_call_ids)

tenant_active = call_manager.active_count_by_tenant(tenant_id)
global_active = call_manager.active_count()

tenant_in_use = tenant_active + tenant_pending
global_in_use = global_active + global_pending

reject_capacity = (tenant_in_use >= tenant_limit) or (
    global_in_use >= global_limit
)
1

Check Pending

Count calls that have been accepted but not yet started
2

Check Active

Count calls that are currently running
3

Calculate Total

Sum pending + active for both tenant and global scopes
4

Enforce Limits

Reject if either tenant or global limit is exceeded

Stage 4: Configuration Loading

Once capacity is confirmed, the system loads configuration for the tenant.

Loading Instructions

Instructions are loaded from the database with caching:
# From: src/apps/calls/app/instructions_service.py:96-116
async def get_prompt_by_tenant(self, tenant_id: str) -> ActiveInstructions:
    # Check cache first
    try:
        cached = self._get_cached(tenant_id)
        if cached is not None:
            return cached
    except CacheEntryExpiredError:
        logger.info("Cache entry expired for tenant_id=%s", tenant_id)

    async with self._lock_for_tenant(tenant_id):
        # Re-check cache inside lock
        try:
            cached = self._get_cached(tenant_id)
            if cached is not None:
                return cached
        except CacheEntryExpiredError:
            logger.info("Cache entry expired for tenant_id=%s", tenant_id)

        prompt_pointer = await self.get_active_prompts_by_tenant(tenant_id)
        # ... fetch greeting and instruction text
See Instructions for more details.

Building Tools

Tools are dynamically built based on tenant configuration:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:532-542
try:
    if tenant_config:
        tool_builder: ToolBuilder = request.app.state.tool_builder
        tools_build = tool_builder.build_tools(tenant_config)
except ToolBuildError as e:
    log_event(logging.ERROR, "tool_build_error", str(e))
    tools_build = None
except Exception as e:
    log_event(logging.ERROR, "tool_build_unexpected_error", str(e))
    tools_build = None
See Tools for more details.

Stage 5: Call Acceptance

After all validations pass, the call is accepted via the OpenAI API.
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:545-586
response = await openai_calls_service.accept_call(
    call_id, idempotency_key=f"accept_{webhook_id}"
)

if response.status_code == 200:
    log_event(logging.INFO, "call_accepted")

    await metrics_store.record_accept(tenant_id=tenant_id, call_id=call_id)

    # Start the call session in background
    asyncio.create_task(
        _start_call_session(
            request=request,
            call_id=call_id,
            tenant_id=tenant_id,
            instructions=instructions,
            tools_build=tools_build,
            cfg=tenant_config,
            caller_number=caller_number,
            dialed_number=dialed_number,
            tool_executor=tool_executor,
        ),
        name=f"start-{call_id}",
    )
The call session starts in a background task, allowing the webhook to return quickly to OpenAI.

Stage 6: Call Session Execution

The CallSession manages the active call lifecycle.

Session Startup

# From: src/apps/calls/app/call_session.py:113-128
async def start(self) -> None:
    """
    Starts the call session in background.
    - Does not block until completion.
    """
    async with self._lock:
        if self.status != CallSessionStatus.NEW:
            return  # Already started
        self.status = CallSessionStatus.RUNNING
        self._run_task = asyncio.create_task(
            self._run(), name=f"CallSession-{self.call_id}"
        )
    await self.metrics_store.record_start(
        tenant_id=self.tenant_id,
        call_id=self.call_id,
    )

WebSocket Connection

The session establishes a WebSocket connection to OpenAI’s Realtime API:
# From: src/apps/calls/app/call_session.py:259-294
async def _run(self) -> None:
    """
    Runs WS client, then reconstructs transcript from client's registry.
    """
    call_id_var.set(self.call_id)
    tenant_id_var.set(self.tenant_id)
    try:
        await self.ws_client.run()

        self.status = CallSessionStatus.FINISHED
        self._end_reason = EndReason.COMPLETED
        log_event(logging.INFO, "call_session_finished", call_id=self.call_id)

    except asyncio.CancelledError:
        self.status = CallSessionStatus.CANCELLED
        self._end_reason = EndReason.ERROR
        log_event(logging.INFO, "call_session_cancelled", call_id=self.call_id)
        raise

    except BaseException as e:
        self.error = e
        self.status = CallSessionStatus.FAILED
        self._end_reason = EndReason.ERROR
        log_event(
            logging.ERROR, "call_session_failed", call_id=self.call_id, error=str(e)
        )

    finally:
        await self._cancel_all_tool_tasks()
        self._done.set()
        await self.metrics_store.record_end(
            tenant_id=self.tenant_id,
            call_id=self.call_id,
            end_reason=self._end_reason,
        )

Stage 7: Call Termination

Calls can end in several ways:

Normal Completion

The caller or AI ends the conversation normally:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:675-709
async def _handle_call_ended(
    request: Request, call_id: str, webhook_id: str
) -> JSONResponse:
    """
    Handle call ended/hangup events.
    """
    if not call_id:
        log_event(
            logging.ERROR, "missing_call_id", "Call ID is missing in call ended event"
        )
        return JSONResponse(
            status_code=status.HTTP_200_OK,
            content={"ok": True, "ignored": True, "reason": "missing_call_id"},
        )

    try:
        call_manager: CallManager = request.app.state.call_manager

        # Remove from pending calls if present
        await _release_pending_capacity_state(request, call_id)
        async with request.app.state.capacity_lock:
            request.app.state.accepted_call_ids.pop(call_id, None)

        # Stop the call session
        await call_manager.stop_call(call_id, reason="call_ended")

        log_event(logging.INFO, "call_ended_handled", call_id)

        return JSONResponse(status_code=status.HTTP_200_OK, content={"ok": True})

    except Exception as e:
        log_event(logging.ERROR, "call_ended_handling_failed", call_id, error=str(e))
        return JSONResponse(
            status_code=status.HTTP_200_OK, content={"ok": False, "error": str(e)}
        )

Graceful Shutdown

The session stops gracefully:
# From: src/apps/calls/app/call_session.py:218-250
async def stop(self, reason: str | None = None) -> None:
    """
    Stops the call session gracefully.
    """
    async with self._lock:
        if self.status in (
            CallSessionStatus.STOPPING,
            CallSessionStatus.FINISHED,
            CallSessionStatus.FAILED,
            CallSessionStatus.CANCELLED,
        ):
            return  # Already stopping or finished

        self.status = CallSessionStatus.STOPPING
        if reason:
            log_event(
                logging.INFO,
                "call_session_stopping",
                call_id=self.call_id,
                reason=reason,
            )

        if self._run_task and not self._run_task.done():
            self._run_task.cancel()

    await self.wait_done()
    if self._run_task:
        try:
            await asyncio.shield(self._run_task)
        except asyncio.CancelledError:
            pass

    await self._cancel_all_tool_tasks()

Stage 8: Persistence & Cleanup

Writing to Database

The call registry is persisted to MongoDB:
# From: src/apps/calls/app/call_session.py:298-316
async def write_to_db(self) -> None:
    try:
        log_event(
            logging.INFO,
            "call_session_writing_to_db",
            call_id=self.call_id,
            object=self.call_registry.model_dump_json(),
        )
        await push_registry_to_mongodb(self.call_registry, client=self.db_client)
        log_event(
            logging.INFO, "call_session_db_write_successful", call_id=self.call_id
        )
    except Exception as e:
        log_event(
            logging.ERROR,
            "call_session_db_write_failed",
            call_id=self.call_id,
            error=str(e),
        )

Post-Processing Webhook

After the call ends, a post-processing webhook is triggered:
# From: src/apps/calls/app/call_session.py:643-658
base = (settings.post_call_uri or "").rstrip("/")
url = f"{base}/api/v1/process"
payload: dict[str, Any] = {"call_id": self.call_id, "tenant_id": self.tenant_id}

timeout = httpx.Timeout(connect=5.0, read=10.0, write=10.0, pool=5.0)

last_exc: Exception | None = None
attempt = 0

while attempt <= max_retries:
    try:
        async with httpx.AsyncClient(timeout=timeout) as client:
            resp = await client.post(url, json=payload)

        if 200 <= resp.status_code < 300:
            log_event(
                logging.INFO,
                "call_session_post_process_successful",
Post-processing webhooks use exponential backoff with configurable retry limits for transient errors.

Session States

Calls transition through well-defined states:
# From: src/apps/calls/app/call_session.py:29-35
class CallSessionStatus(str, Enum):
    NEW = "new"
    RUNNING = "running"
    STOPPING = "stopping"
    FINISHED = "finished"
    FAILED = "failed"
    CANCELLED = "cancelled"

Error Handling

The system handles errors at multiple levels:

Webhook Level

  • Tenant Resolution Failed: Call is rejected
  • Capacity Exceeded: Call is rejected with capacity metrics
  • Instructions Missing: Call is rejected or uses fallback
  • Configuration Invalid: Call is rejected

Session Level

  • WebSocket Connection Failed: Call ends with ERROR reason
  • Tool Execution Failed: Error returned to AI, call continues
  • Database Write Failed: Logged but doesn’t fail the call

Cleanup Guarantees

The system ensures cleanup happens even on errors:
# From: src/apps/calls/app/call_manager.py:208-230
async def _await_session(
    self, session: CallSession, *, timeout: float | None
) -> None:
    """
    Manager-level awaiter. It centralizes:
    - waiting for session completion
    - status-based metric updates
    - future hooks: write to DB, trigger post-call pipeline, etc.
    """

    try:
        await session.run(timeout=timeout)
    except Exception as e:
        log_event(
            logging.ERROR,
            "call_session_run_failed",
            call_id=session.call_id,
            error=str(e),
        )
    finally:
        # Cleanup session references
        self._sessions.pop(session.call_id, None)
        self._session_tasks.pop(session.call_id, None)

Next Steps

Build docs developers (and LLMs) love