Skip to main content

Overview

The Unmute backend is a FastAPI application that orchestrates real-time voice conversations by coordinating between the frontend, STT, LLM, and TTS services. Technology Stack:
  • Framework: FastAPI (async Python web framework)
  • WebSocket: Native FastAPI WebSocket support
  • Async: Python asyncio for concurrent operations
  • Serialization: Pydantic for message validation, MessagePack for STT/TTS
  • Monitoring: Prometheus metrics
  • Audio: sphn (Opus codec), NumPy (processing)

Application Structure

Main File: unmute/main_websocket.py
from fastapi import FastAPI, WebSocket
from prometheus_fastapi_instrumentator import Instrumentator

app = FastAPI()

# Endpoints
@app.get("/")                      # Health check
@app.get("/v1/health")           # Service status
@app.get("/v1/voices")           # Available voices
@app.post("/v1/voices")          # Voice cloning
@app.websocket("/v1/realtime")   # Main conversation endpoint

# Metrics
Instrumentator().instrument(app).expose(app)

Request Lifecycle

HTTP Endpoints

WebSocket Endpoint

Concurrency Control

File: main_websocket.py:69
# Limit concurrent WebSocket connections
MAX_CLIENTS = 4
SEMAPHORE = asyncio.Semaphore(MAX_CLIENTS)

@app.websocket("/v1/realtime")
async def websocket_route(websocket: WebSocket):
    mt.SESSIONS.inc()
    mt.ACTIVE_SESSIONS.inc()
    
    async with SEMAPHORE:
        # Only MAX_CLIENTS can be in this block simultaneously
        try:
            await websocket.accept(subprotocol="realtime")
            handler = UnmuteHandler()
            
            async with handler:
                await handler.start_up()
                await _run_route(websocket, handler)
        finally:
            mt.ACTIVE_SESSIONS.dec()
Why Limit Concurrency:
  • Python GIL limits true parallelism
  • Better to scale horizontally (more backend instances)
  • Prevents resource exhaustion

WebSocket Protocol

Subprotocol Negotiation

File: main_websocket.py:314
await websocket.accept(subprotocol="realtime")
Purpose: OpenAI Realtime API compatibility. Client specifies supported protocols, server selects one.

Message Validation

File: main_websocket.py:79
from pydantic import TypeAdapter, Field

ClientEventAdapter = TypeAdapter(
    Annotated[ora.ClientEvent, Field(discriminator="type")]
)

# Usage
message_raw = await websocket.receive_text()
message: ora.ClientEvent = ClientEventAdapter.validate_json(message_raw)
Benefits:
  • Type safety
  • Automatic validation
  • Clear error messages

Two-Loop Architecture

File: main_websocket.py:380 The backend uses two concurrent loops:
  1. Receive Loop: Handle incoming messages from client
  2. Emit Loop: Send messages to client
async def _run_route(websocket: WebSocket, handler: UnmuteHandler):
    emit_queue: asyncio.Queue[ora.ServerEvent] = asyncio.Queue()
    
    async with asyncio.TaskGroup() as tg:
        tg.create_task(
            receive_loop(websocket, handler, emit_queue),
            name="receive_loop()"
        )
        tg.create_task(
            emit_loop(websocket, handler, emit_queue),
            name="emit_loop()"
        )
        tg.create_task(
            handler.quest_manager.wait(),
            name="quest_manager.wait()"
        )
TaskGroup Benefits:
  • All tasks cancelled if one fails
  • Automatic exception propagation
  • Clean shutdown

Receive Loop

File: main_websocket.py:406
async def receive_loop(
    websocket: WebSocket,
    handler: UnmuteHandler,
    emit_queue: asyncio.Queue[ora.ServerEvent],
):
    opus_reader = sphn.OpusStreamReader(SAMPLE_RATE)
    wait_for_first_opus = True
    
    while True:
        # 1. Receive message
        try:
            message_raw = await websocket.receive_text()
        except WebSocketDisconnect:
            raise WebSocketClosedError()
        
        # 2. Validate
        try:
            message: ora.ClientEvent = ClientEventAdapter.validate_json(
                message_raw
            )
        except ValidationError as e:
            await emit_queue.put(
                ora.Error(
                    error=ora.ErrorDetails(
                        type="invalid_request_error",
                        message="Invalid message",
                        details=json.loads(e.json()),
                    )
                )
            )
            continue
        
        # 3. Process by type
        if isinstance(message, ora.InputAudioBufferAppend):
            # Decode Opus
            opus_bytes = base64.b64decode(message.audio)
            
            # Handle reconnection edge case
            if wait_for_first_opus:
                if opus_bytes[5] & 2:  # First packet bit
                    wait_for_first_opus = False
                else:
                    continue  # Skip old packets
            
            # Decode to PCM
            pcm = await asyncio.to_thread(
                opus_reader.append_bytes, opus_bytes
            )
            
            # Send to handler
            if pcm.size:
                await handler.receive((SAMPLE_RATE, pcm[np.newaxis, :]))
            
        elif isinstance(message, ora.SessionUpdate):
            await handler.update_session(message.session)
            await emit_queue.put(ora.SessionUpdated(session=message.session))
        
        # 4. Record event
        if handler.recorder is not None:
            await handler.recorder.add_event("client", message)

Opus Decoding

File: main_websocket.py:461
import sphn

opus_reader = sphn.OpusStreamReader(SAMPLE_RATE)

# Decode Opus packet to PCM
opus_bytes = base64.b64decode(message.audio)
pcm = await asyncio.to_thread(
    opus_reader.append_bytes,
    opus_bytes
)
Why asyncio.to_thread:
  • Opus decoding is CPU-bound
  • Run in thread pool to avoid blocking event loop
  • Other connections can process concurrently

Reconnection Handling

File: main_websocket.py:462
if wait_for_first_opus:
    # Check for "first packet" bit in Opus header
    if opus_bytes[5] & 2:
        wait_for_first_opus = False
    else:
        continue  # Skip old packets from previous connection
Problem: Browser sometimes sends old Opus packets on reconnect Solution: Wait for packet marked as “first” before processing

Emit Loop

File: main_websocket.py:512
async def emit_loop(
    websocket: WebSocket,
    handler: UnmuteHandler,
    emit_queue: asyncio.Queue[ora.ServerEvent],
):
    opus_writer = sphn.OpusStreamWriter(SAMPLE_RATE)
    
    while True:
        # 1. Check if WebSocket still connected
        if (
            websocket.application_state == WebSocketState.DISCONNECTED
            or websocket.client_state == WebSocketState.DISCONNECTED
        ):
            raise WebSocketClosedError()
        
        # 2. Get next item to emit
        try:
            # Priority: emit_queue (high-priority events)
            to_emit = emit_queue.get_nowait()
        except asyncio.QueueEmpty:
            # Otherwise: handler output
            emitted_by_handler = await handler.emit()
            
            if emitted_by_handler is None:
                continue
            elif isinstance(emitted_by_handler, CloseStream):
                await websocket.close()
                break
            elif isinstance(emitted_by_handler, ora.ServerEvent):
                to_emit = emitted_by_handler
            elif isinstance(emitted_by_handler, tuple):
                # Audio: (sample_rate, pcm_array)
                _sr, audio = emitted_by_handler
                audio = audio_to_float32(audio)
                
                # Encode to Opus
                opus_bytes = await asyncio.to_thread(
                    opus_writer.append_pcm, audio
                )
                
                if opus_bytes:
                    to_emit = ora.ResponseAudioDelta(
                        delta=base64.b64encode(opus_bytes).decode("utf-8")
                    )
                else:
                    continue  # Opus buffering, no output yet
        
        # 3. Record event
        if handler.recorder is not None:
            await handler.recorder.add_event("server", to_emit)
        
        # 4. Send to client
        try:
            await websocket.send_text(to_emit.model_dump_json())
        except WebSocketDisconnect:
            raise WebSocketClosedError()

Opus Encoding

File: main_websocket.py:550
import sphn

opus_writer = sphn.OpusStreamWriter(SAMPLE_RATE)

# Encode PCM to Opus
audio_float32 = audio_to_float32(audio)
opus_bytes = await asyncio.to_thread(
    opus_writer.append_pcm,
    audio_float32
)

if opus_bytes:
    # Opus encoder has output
    to_emit = ora.ResponseAudioDelta(
        delta=base64.b64encode(opus_bytes).decode("utf-8")
    )
else:
    # Still buffering, wait for more audio
    continue
Buffering: Opus encoder buffers internally, doesn’t always output on every input.

Error Handling

Exception Reporter

File: main_websocket.py:334
async def _report_websocket_exception(
    websocket: WebSocket,
    exc: Exception
):
    if isinstance(exc, ExceptionGroup):
        exceptions = exc.exceptions
    else:
        exceptions = [exc]
    
    error_message = None
    
    for exc in exceptions:
        if isinstance(exc, MissingServiceAtCapacity):
            mt.FATAL_SERVICE_MISSES.inc()
            error_message = (
                f"Too many people are connected to service '{exc.service}'. "
                "Please try again later."
            )
        elif isinstance(exc, MissingServiceTimeout):
            mt.FATAL_SERVICE_MISSES.inc()
            error_message = f"Service '{exc.service}' timed out."
        elif isinstance(exc, WebSocketClosedError):
            # Normal disconnection
            return
        else:
            logger.exception("Unexpected error: %r", exc)
            mt.HARD_ERRORS.inc()
            error_message = "Internal server error"
    
    if error_message:
        mt.FORCE_DISCONNECTS.inc()
        
        # Try to send error to client
        try:
            await websocket.send_text(
                make_ora_error(type="fatal", message=error_message)
                .model_dump_json()
            )
        except WebSocketDisconnect:
            pass
        
        # Close connection
        try:
            await websocket.close(
                code=status.WS_1011_INTERNAL_ERROR,
                reason=error_message,
            )
        except RuntimeError:
            pass  # Already closed

CORS Error Handling

File: main_websocket.py:594
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    # Add CORS headers even on error
    return JSONResponse(
        status_code=exc.status_code,
        content={"detail": exc.detail},
        headers=_cors_headers_for_error(request),
    )

def _cors_headers_for_error(request: Request):
    origin = request.headers.get("origin")
    allow_origin = origin if origin in CORS_ALLOW_ORIGINS else None
    
    headers = {"Access-Control-Allow-Credentials": "true"}
    if allow_origin:
        headers["Access-Control-Allow-Origin"] = allow_origin
    
    return headers
Why Important: Without CORS headers on errors, browser shows confusing CORS error instead of actual error.

Health Checks

File: main_websocket.py:137
class HealthStatus(BaseModel):
    tts_up: bool
    stt_up: bool
    llm_up: bool
    voice_cloning_up: bool
    
    @computed_field
    @property
    def ok(self) -> bool:
        # Voice cloning optional
        return self.tts_up and self.stt_up and self.llm_up

@partial(async_ttl_cached, ttl_sec=0.5)
async def _get_health(_none: None):
    async with asyncio.TaskGroup() as tg:
        tts_up = tg.create_task(
            asyncio.to_thread(
                _check_server_status,
                _ws_to_http(TTS_SERVER) + "/api/build_info"
            )
        )
        stt_up = tg.create_task(
            asyncio.to_thread(
                _check_server_status,
                _ws_to_http(STT_SERVER) + "/api/build_info"
            )
        )
        llm_up = tg.create_task(
            asyncio.to_thread(
                _check_server_status,
                _ws_to_http(LLM_SERVER) + "/v1/models",
                headers={"Authorization": f"Bearer {KYUTAI_LLM_API_KEY}"},
            )
        )
        voice_cloning_up = tg.create_task(
            asyncio.to_thread(
                _check_server_status,
                _ws_to_http(VOICE_CLONING_SERVER) + "/api/build_info",
            )
        )
    
    return HealthStatus(
        tts_up=await tts_up,
        stt_up=await stt_up,
        llm_up=await llm_up,
        voice_cloning_up=await voice_cloning_up,
    )

@app.get("/v1/health")
async def get_health():
    health = await _get_health(None)
    mt.HEALTH_OK.observe(health.ok)
    return health
Features:
  • Parallel health checks (TaskGroup)
  • Cached for 0.5s (avoid hammering services)
  • Used before accepting WebSocket connections

CORS Configuration

File: main_websocket.py:84
CORS_ALLOW_ORIGINS = [
    "http://localhost",
    "http://localhost:3000"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=CORS_ALLOW_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
Production: In production (Docker Swarm), Traefik handles CORS.

Configuration

File: unmute/kyutai_constants.py
import os

# Audio settings
SAMPLE_RATE = 24000
FRAME_TIME_SEC = 0.02
SAMPLES_PER_FRAME = 480

# Service URLs
STT_SERVER = os.getenv("KYUTAI_STT_URL", "ws://localhost:8080")
TTS_SERVER = os.getenv("KYUTAI_TTS_URL", "ws://localhost:8080")
LLM_SERVER = os.getenv("KYUTAI_LLM_URL", "http://localhost:8000")
VOICE_CLONING_SERVER = os.getenv(
    "KYUTAI_VOICE_CLONING_URL", "ws://localhost:8080"
)

# LLM settings
KYUTAI_LLM_API_KEY = os.getenv("KYUTAI_LLM_API_KEY")
KYUTAI_LLM_MODEL = os.getenv("KYUTAI_LLM_MODEL")

# STT settings
STT_DELAY_SEC = float(os.getenv("KYUTAI_STT_DELAY_SEC", "2.5"))

# Paths
SPEECH_TO_TEXT_PATH = "/api/stt"
TEXT_TO_SPEECH_PATH = "/api/tts"

# Recording
RECORDINGS_DIR = os.getenv("KYUTAI_RECORDINGS_DIR")

# Headers
HEADERS = {
    # Custom headers if needed
}

# Voice cloning
MAX_VOICE_FILE_SIZE_MB = 10

Deployment

Docker Compose

File: docker-compose.yml:32
backend:
  image: unmute-backend:latest
  build:
    context: ./
    target: hot-reloading
  volumes:
    - ./unmute:/app/unmute  # Hot reload in dev
  environment:
    - KYUTAI_STT_URL=ws://stt:8080
    - KYUTAI_TTS_URL=ws://tts:8080
    - KYUTAI_LLM_URL=http://llm:8000
  labels:
    - "traefik.enable=true"
    - "traefik.http.routers.backend.rule=PathPrefix(`/api`)"
    - "traefik.http.routers.backend.middlewares=strip-api"
    - "traefik.http.services.backend.loadbalancer.server.port=80"

Dockerfile

File: Dockerfile
FROM python:3.11-slim

# Install uv
RUN pip install uv

# Copy dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync

# Copy code
COPY unmute/ ./unmute/
COPY voices.yaml ./

# Hot-reloading target
FROM base as hot-reloading
CMD ["uv", "run", "fastapi", "dev", "unmute/main_websocket.py", "--host", "0.0.0.0", "--port", "80"]

# Production target
FROM base as production
CMD ["uv", "run", "fastapi", "run", "unmute/main_websocket.py", "--host", "0.0.0.0", "--port", "80"]

Running Locally

# Development (auto-reload)
uv run fastapi dev unmute/main_websocket.py

# Production
uv run fastapi run unmute/main_websocket.py

Monitoring

Prometheus Metrics

File: main_websocket.py:74
from prometheus_fastapi_instrumentator import Instrumentator

Instrumentator().instrument(app).expose(app)
Metrics Available:
  • /metrics endpoint
  • HTTP request duration, status codes, etc.
  • Custom metrics from unmute/metrics.py

Grafana Dashboard

File: services/grafana/dashboards/unmute-monitoring-*.json Pre-configured dashboard for:
  • Active sessions
  • Latency percentiles (STT TTFT, TTS TTFT, VLLM TTFT)
  • Error rates
  • Throughput (words/sec)

Next Steps

Build docs developers (and LLMs) love