Sonore Phone Agent is built from the ground up as a multi-tenant system, allowing a single deployment to serve multiple customers with complete isolation and per-tenant configuration.
Architecture Overview
The multi-tenancy system operates at several levels:
Tenant Resolution
When a call arrives, the system must determine which tenant owns the dialed phone number.
Phone Number Mapping
The TenantResolver looks up the dialed number in the database:
# From: src/apps/calls/app/tenant_resolution.py:7-41
class TenantResolver:
def __init__(
self,
client=mongo_client,
db: str = "sonore-phone-agent",
collection: str = "phone-tenant-map",
):
self.client = client
self.db = db
self.collection = collection
async def resolve_tenant(self, phone_number: str) -> str:
query = {"phone_number": phone_number}
try:
doc = await fetch_from_mongodb(
query=query,
client=self.client,
db=self.db,
collection=self.collection,
)
if not doc:
raise ValueError(
f"No document found for the given phone number : {phone_number}"
)
tenant_map = TenantNumberMap(**doc)
return tenant_map.tenant_id
except Exception as e:
logger.error(
f"TenantResolver.resolve_tenant: error fetching tenant for phone number {phone_number}: {e}"
)
raise
Database Schema
The phone-tenant-map collection stores the mapping:
{
"phone_number": "+14155551234",
"tenant_id": "acme-corp",
"created_at": "2024-01-15T10:30:00Z",
"updated_at": "2024-01-15T10:30:00Z"
}
If no mapping is found for a phone number, the call is immediately rejected. Always ensure phone numbers are configured before routing calls to them.
Webhook Integration
Tenant resolution happens early in the webhook processing:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:213-235
try:
tenant_resolver: TenantResolver = request.app.state.tenant_resolver
tenant_id = await tenant_resolver.resolve_tenant(dialed_number)
tenant_id_var.set(tenant_id)
except Exception as e:
log_event(logging.ERROR, "tenant_resolution_failed", str(e))
try:
await openai_calls_service.reject_call(
call_id, idempotency_key=f"reject_tenant_resolve_{webhook_id}"
)
except Exception as reject_e:
log_event(logging.ERROR, "tenant_resolution_reject_failed", str(reject_e))
finally:
await _release_pending_capacity_state(request, call_id)
await metrics_store.record_reject_tenant_not_configured(
call_id=call_id,
tenant_id=None,
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"ok": True, "rejected": "tenant_resolve_failed"},
)
Per-Tenant Configuration
Once a tenant is identified, the system loads tenant-specific configuration.
Tenant Config Structure
Each tenant has a configuration document that controls features and behavior:
{
"tenant_id": "acme-corp",
"features": {
"refer": {
"enabled": true,
"require_confirmation": true,
"handoff_phrase": "Je vous transfère vers {{label}}",
"destinations": [
{
"destination_id": "sales",
"label": "Service Commercial",
"description_for_model": "For sales inquiries and new customer questions",
"target_uri": "sip:[email protected]",
"enabled": true,
"priority": 10
},
{
"destination_id": "support",
"label": "Service Technique",
"description_for_model": "For technical support and troubleshooting",
"target_uri": "+14155552000",
"enabled": true,
"priority": 5
}
]
}
}
}
Configuration Loading
The ConfigReader loads tenant configuration:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:459-508
try:
config_reader: ConfigReader = request.app.state.config_reader
tenant_config = await config_reader.get_tenant_config(tenant_id)
except TenantNotConfiguredError as e:
log_event(logging.ERROR, "tenant_config_not_configured", str(e))
await metrics_store.record_reject_tenant_not_configured(
call_id=call_id,
tenant_id=tenant_id,
)
try:
await openai_calls_service.reject_call(
call_id,
idempotency_key=f"reject_tenant_config_not_configured_{webhook_id}",
)
except Exception as reject_e:
log_event(
logging.ERROR,
"tenant_config_not_configured_reject_failed",
str(reject_e),
)
finally:
await _release_pending_capacity_state(request, call_id)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"ok": True, "rejected": "tenant_config_not_configured"},
)
except TenantConfigParseError as e:
log_event(logging.ERROR, "tenant_config_parse_error", str(e))
# ... handle parse error
Capacity Management
Sonore enforces capacity limits at both global and per-tenant levels.
Capacity Tracking
The CallManager tracks active calls per tenant:
# From: src/apps/calls/app/call_manager.py:124-147
def active_count(self) -> int:
return sum(
1
for s in self._sessions.values()
if s.status
in (
CallSessionStatus.RUNNING,
CallSessionStatus.NEW,
CallSessionStatus.STOPPING,
)
)
def active_count_by_tenant(self, tenant_id: str) -> int:
return sum(
1
for s in self._sessions.values()
if s.tenant_id == tenant_id
and s.status
in (
CallSessionStatus.RUNNING,
CallSessionStatus.NEW,
CallSessionStatus.STOPPING,
)
)
Capacity Enforcement
Before accepting a call, both limits are checked:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:237-297
tenant_limit = getattr(
settings, "max_concurrent_calls_per_tenant", settings.max_concurrent_calls
)
global_limit = settings.max_concurrent_calls
async with request.app.state.capacity_lock:
now = time.time()
tenant_pending = len(request.app.state.pending_by_tenant.get(tenant_id, set()))
global_pending = len(request.app.state.pending_call_ids)
tenant_active = call_manager.active_count_by_tenant(tenant_id)
global_active = call_manager.active_count()
tenant_in_use = tenant_active + tenant_pending
global_in_use = global_active + global_pending
reject_capacity = (tenant_in_use >= tenant_limit) or (
global_in_use >= global_limit
)
if not reject_capacity:
# reserve a slot for this tenant until the session starts
request.app.state.pending_call_ids.add(call_id)
request.app.state.pending_tenant_by_call_id[call_id] = tenant_id
request.app.state.pending_by_tenant.setdefault(tenant_id, set()).add(
call_id
)
Count Active Calls
Count calls currently running for the tenant and globally
Count Pending Calls
Count calls accepted but not yet started for the tenant and globally
Check Limits
Reject if either tenant-specific or global limit is exceeded
Reserve Slot
If capacity available, reserve a slot until session starts
Rejection on Capacity
When capacity is exceeded, the call is gracefully rejected:
# From: src/apps/calls/api/v1/endpoints/openai_webhook.py:299-337
if reject_capacity:
# Reject the call
try:
response = await openai_calls_service.reject_call(
call_id, idempotency_key=f"reject_{webhook_id}"
)
if response.status_code == 200:
# Increment reject counter
request.app.state.total_rejected_capacity = (
getattr(request.app.state, "total_rejected_capacity", 0) + 1
)
await metrics_store.record_reject_capacity(
call_id=call_id,
tenant_id=tenant_id,
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content={"ok": True, "rejected": "capacity"},
)
Capacity rejections are tracked in metrics, allowing you to monitor when tenants are hitting their limits.
Tenant Isolation
The system ensures complete isolation between tenants.
Data Isolation
Each tenant’s data is isolated in the database:
- Instructions: Stored in
tenant-prompt-state with tenant_id as key
- Configuration: Stored in
tenant-config with tenant_id as key
- Call Records: Tagged with tenant_id for filtering
- Metrics: Tracked per-tenant for observability
Resource Isolation
Tenants cannot access each other’s resources:
# All database queries include tenant_id filtering
doc = await fetch_from_mongodb(
query={"_id": oid, "tenant_id": tenant_id}, # Always filter by tenant
client=self.client,
collection=self.texts_collection,
)
Context Isolation
The tenant ID is propagated through context variables:
# From: src/apps/calls/app/call_session.py:263-264
call_id_var.set(self.call_id)
tenant_id_var.set(self.tenant_id)
This ensures all logs and traces include the correct tenant context.
Tenant-Specific Instructions
Each tenant can have their own greeting and instruction prompts.
Instruction Loading
Instructions are loaded per-tenant with caching:
# From: src/apps/calls/app/instructions_service.py:96-256
async def get_prompt_by_tenant(self, tenant_id: str) -> ActiveInstructions:
# Check cache first
try:
cached = self._get_cached(tenant_id)
if cached is not None:
return cached
except CacheEntryExpiredError:
logger.info("Cache entry expired for tenant_id=%s", tenant_id)
async with self._lock_for_tenant(tenant_id):
# Re-check cache inside lock
try:
cached = self._get_cached(tenant_id)
if cached is not None:
return cached
except CacheEntryExpiredError:
logger.info("Cache entry expired for tenant_id=%s", tenant_id)
prompt_pointer = await self.get_active_prompts_by_tenant(tenant_id)
# ... load greeting and instruction text
See Instructions for more details.
Cache Isolation
Each tenant has independent cache entries with per-tenant locks:
# From: src/apps/calls/app/instructions_service.py:65-75
self._cache: dict[str, CacheEntry] = {}
self._locks: dict[str, asyncio.Lock] = {}
def _lock_for_tenant(self, tenant_id: str) -> asyncio.Lock:
lock = self._locks.get(tenant_id)
if lock is None:
lock = asyncio.Lock()
self._locks[tenant_id] = lock
return lock
def invalidate_cache_for_tenant(self, tenant_id: str) -> None:
self._cache.pop(tenant_id, None)
Tools are built dynamically based on tenant configuration.
The ToolBuilder creates tools based on tenant features:
# From: src/apps/calls/tools/tool_builder.py:25-78
def build_tools(self, cfg: TenantConfig) -> ToolBuildResult:
log_event(
logging.INFO,
"tool_build_started",
tenant_id=cfg.tenant_id,
builder_count=len(self._builders),
)
tools: list[FunctionTool] = []
for build_fn in self._builders:
try:
log_event(
logging.INFO,
"tool_builder_phase_started",
tenant_id=cfg.tenant_id,
builder=build_fn.__name__,
)
tool = build_fn(cfg)
if tool is not None:
tools.append(tool)
log_event(
logging.INFO,
"tool_builder_phase_built_tool",
tenant_id=cfg.tenant_id,
builder=build_fn.__name__,
tool_name=tool.name,
)
See Tools for more details.
Metrics and Observability
All metrics are tracked per-tenant for visibility.
Metrics Store
The LiveMetricsStore tracks per-tenant metrics:
# Metrics include tenant_id for filtering
await metrics_store.record_accept(tenant_id=tenant_id, call_id=call_id)
await metrics_store.record_reject_capacity(call_id=call_id, tenant_id=tenant_id)
await metrics_store.record_end(
tenant_id=tenant_id,
call_id=call_id,
end_reason=self._end_reason,
)
Logging Context
All logs include tenant context:
log_event(
logging.INFO,
"call_accepted",
tenant_id=tenant_id,
call_id=call_id,
)
Configuration Management
Adding a New Tenant
Create Phone Number Mapping
Add a document to phone-tenant-map:{
"phone_number": "+14155551234",
"tenant_id": "new-tenant"
}
Create Tenant Configuration
Add a document to tenant-config with features enabled/disabled
Create Instructions
Add greeting and instruction prompts to tenant-prompt-state and prompt-texts
Test the Configuration
Place a test call to verify tenant resolution and configuration loading
Updating Tenant Configuration
Configuration changes take effect immediately for new calls. Active calls use the configuration loaded when they started.
To apply configuration changes to active calls, you would need to reload the configuration or restart the call session.
Removing a Tenant
Before removing a tenant, ensure:
- No active calls for that tenant
- Phone numbers are reassigned or disconnected
- Historical call data is archived if needed
Best Practices
Tenant Naming
- Use lowercase, hyphenated strings (e.g.,
acme-corp)
- Keep tenant IDs stable; avoid renaming
- Use descriptive names that identify the customer
Capacity Planning
- Set per-tenant limits based on expected call volume
- Monitor capacity rejection metrics
- Adjust limits as tenant usage grows
Configuration Validation
- Validate tenant configuration before saving to database
- Test configuration changes in a staging environment
- Use schema validation to prevent invalid configurations
Monitoring
- Track metrics per tenant for capacity planning
- Set up alerts for tenant-specific issues
- Monitor tenant-level error rates
Next Steps