Skip to main content

Backend Architecture

FastAPI Application

File: unmute/main_websocket.py The main backend server is a FastAPI application that handles:
app = FastAPI()

# Key endpoints
@app.get("/")                    # Health check
@app.get("/v1/health")         # Service status
@app.get("/v1/voices")         # Available voices
@app.post("/v1/voices")        # Voice cloning upload
@app.websocket("/v1/realtime") # Main WebSocket endpoint
Concurrency Control (main_websocket.py:71):
MAX_CLIENTS = 4
SEMAPHORE = asyncio.Semaphore(MAX_CLIENTS)
Each WebSocket connection acquires the semaphore, limiting concurrent sessions to prevent resource exhaustion.

UnmuteHandler

File: unmute/unmute_handler.py The core orchestration class that manages a single conversation session. Key Responsibilities:
  • Receive audio from frontend
  • Route audio to STT
  • Manage conversation state
  • Generate LLM responses
  • Stream TTS audio back to frontend
  • Handle interruptions
Architecture:
class UnmuteHandler(AsyncStreamHandler):
    def __init__(self):
        self.output_queue: asyncio.Queue[HandlerOutput]
        self.chatbot = Chatbot()
        self.quest_manager = QuestManager()
        self.recorder = Recorder()  # Optional recording
        
    async def receive(self, frame: tuple[int, np.ndarray]):
        # Process incoming audio frames
        
    async def emit(self) -> HandlerOutput | None:
        # Emit audio/events to frontend
Service Management: Services (STT, TTS, LLM) are managed as “Quests” for clean lifecycle:
quest = Quest(
    name="stt",
    init=connect_to_stt,      # Async initialization
    run=stt_message_loop,     # Long-running task
    close=disconnect_stt      # Cleanup
)
await quest_manager.add(quest)

WebSocket Protocol Handler

Files:
  • unmute/main_websocket.py:380-404 - Main route handler
  • unmute/main_websocket.py:406-492 - Receive loop
  • unmute/main_websocket.py:512-582 - Emit loop
Two-Loop Architecture:
  1. Receive Loop - Handles incoming messages:
    async def receive_loop(websocket, handler, emit_queue):
        while True:
            message = await websocket.receive_text()
            event = ClientEventAdapter.validate_json(message)
            
            if isinstance(event, InputAudioBufferAppend):
                # Decode Opus, send to STT
            elif isinstance(event, SessionUpdate):
                # Update voice/instructions
    
  2. Emit Loop - Sends messages to frontend:
    async def emit_loop(websocket, handler, emit_queue):
        while True:
            # Check emit_queue first (high priority)
            if emit_queue.not_empty():
                to_emit = await emit_queue.get()
            else:
                # Otherwise get from handler
                to_emit = await handler.emit()
            
            if isinstance(to_emit, tuple):
                # Audio - encode to Opus
            elif isinstance(to_emit, ServerEvent):
                # JSON event
            
            await websocket.send_text(to_emit.model_dump_json())
    

Quest Manager

File: unmute/quest_manager.py Manages lifecycle of background services with clean cancellation.
class Quest[T]:
    def __init__(
        self,
        name: str,
        init: Callable[[], Awaitable[T]],
        run: Callable[[T], Awaitable[None]],
        close: Callable[[T], Awaitable[None]],
    )
Features:
  • Async context manager for automatic cleanup
  • Service initialization with retries
  • Graceful shutdown on errors
  • Quest removal (for interruptions)
Example Usage:
async with quest_manager:
    # STT quest runs entire session
    await quest_manager.add(stt_quest)
    
    # TTS quest created per response
    tts_quest = await start_up_tts()
    
    # Can be removed on interruption
    await quest_manager.remove("tts")

Chatbot State Manager

File: unmute/llm/chatbot.py Manages conversation history and state transitions.
class Chatbot:
    chat_history: list[dict[Any, Any]]  # OpenAI message format
    
    def conversation_state(self) -> ConversationState:
        # Returns: waiting_for_user | user_speaking | bot_speaking
    
    async def add_chat_message_delta(self, delta: str, role: str):
        # Append or create message with automatic spacing
    
    def preprocessed_messages(self):
        # Clean messages for LLM (remove interruption chars, etc.)
Message Format:
[
    {"role": "system", "content": "You are..."},
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": "Hi there!"},
    ...
]

Service Discovery

File: unmute/service_discovery.py Finds available service instances with capacity.
async def find_instance[T](
    service_name: str,
    factory: Callable[[], T],
) -> T:
    # In Docker Compose: use environment variable URL
    # In Docker Swarm: query DNS for service instances
    # Try each instance until one accepts connection
Capacity Handling:
  • Services can reject with Error message when at capacity
  • Backend tries next instance
  • Raises MissingServiceAtCapacity if all exhausted

Metrics Collection

File: unmute/metrics.py Prometheus metrics using prometheus_client. Counter Metrics:
SESSIONS = Counter('unmute_sessions_total')
STT_SENT_FRAMES = Counter('unmute_stt_sent_frames_total')
VLLM_SENT_WORDS = Counter('unmute_vllm_sent_words_total')
Gauge Metrics:
ACTIVE_SESSIONS = Gauge('unmute_active_sessions')
STT_ACTIVE_SESSIONS = Gauge('unmute_stt_active_sessions')
Histogram Metrics:
STT_TTFT = Histogram(
    'unmute_stt_ttft_seconds',
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
Integration:
from prometheus_fastapi_instrumentator import Instrumentator

Instrumentator().instrument(app).expose(app)
# Metrics available at /metrics endpoint

Audio Processing

Opus Codec:
import sphn

# Encoding (emit loop)
opus_writer = sphn.OpusStreamWriter(SAMPLE_RATE)
opus_bytes = await asyncio.to_thread(
    opus_writer.append_pcm, audio
)

# Decoding (receive loop)
opus_reader = sphn.OpusStreamReader(SAMPLE_RATE)
pcm = await asyncio.to_thread(
    opus_reader.append_bytes, opus_bytes
)
Audio Format Conversion:
from fastrtc import audio_to_float32

# Normalize to float32 range [-1, 1]
float_audio = audio_to_float32(int16_audio)

Voice Management

File: unmute/tts/voices.py Loads and validates voices from voices.yaml.
class Voice(BaseModel):
    name: str
    good: bool  # Should appear in UI?
    instructions: Instructions
    source: VoiceSource

class VoiceList:
    voices: list[Voice]
    
    def __init__(self):
        # Load from voices.yaml
        self.voices = self._load_voices()
Voice Sources:
  1. File: Pre-recorded audio on server
  2. Freesound: Creative Commons audio from Freesound.org
  3. Custom: User-uploaded voice cloning
API Endpoint (main_websocket.py:200):
@app.get("/v1/voices")
def voices():
    voice_list = VoiceList()
    return [
        voice.model_dump(exclude={"comment"})
        for voice in voice_list.voices
        if voice.good
    ]

Voice Cloning

File: unmute/tts/voice_cloning.py Generates voice embeddings from uploaded audio.
def clone_voice(audio_bytes: bytes) -> str:
    # Send to voice cloning service
    # Returns: "custom:xyz123"
    
vice_embeddings_cache: dict[str, bytes] = {}
# Maps voice IDs to msgpack-encoded embeddings
Upload Endpoint (main_websocket.py:240):
@app.post("/v1/voices")
async def post_voices(file: UploadFile):
    name = clone_voice(file.file.read())
    return {"name": name}  # "custom:xyz123"

Recording System

File: unmute/recorder.py Optional conversation recording for debugging/analysis.
class Recorder:
    async def add_event(self, source: str, event: BaseEvent):
        # Append to JSONL file
        
    async def shutdown(self, keep_recording: bool = True):
        # Close file, optionally delete
Privacy:
  • Audio data anonymized (only sample counts, not PCM)
  • User can opt-out via allow_recording: false
  • Recordings stored in RECORDINGS_DIR (configurable)

Timer Utilities

File: unmute/timer.py Stopwatch for accurate timing measurements.
class Stopwatch:
    def __init__(self, autostart: bool = True):
        self.start_time: float | None
        
    def time(self) -> float:
        # Elapsed time in seconds
        
    def stop(self) -> float | None:
        # Stop and return elapsed, or None if already stopped
Usage:
watch = Stopwatch()
# ... do work ...
mt.VLLM_TTFT.observe(watch.time())

Exception Handling

File: unmute/exceptions.py Custom exceptions for service failures.
class MissingServiceTimeout(Exception):
    service: str

class MissingServiceAtCapacity(Exception):
    service: str

class WebSocketClosedError(Exception):
    pass

def make_ora_error(
    type: str,  # "warning" | "fatal" | "invalid_request_error"
    message: str
) -> Error:
    # Creates OpenAI Realtime API error event
Error Reporting (main_websocket.py:334):
async def _report_websocket_exception(
    websocket: WebSocket,
    exc: Exception
):
    # Send error message to client
    # Close WebSocket with reason
    # Record metrics

CORS Configuration

File: unmute/main_websocket.py:84 CORS middleware for local development.
CORS_ALLOW_ORIGINS = [
    "http://localhost",
    "http://localhost:3000"
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=CORS_ALLOW_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Middleware

Upload Size Limiting

File: unmute/main_websocket.py:213 Limits voice upload file size.
class LimitUploadSizeForPath(BaseHTTPMiddleware):
    def __init__(self, max_upload_size: int, path: str):
        # Check Content-Length header
        # Return 413 if too large

app.add_middleware(
    LimitUploadSizeForPath,
    max_upload_size=MAX_VOICE_FILE_SIZE_MB * 1024 * 1024,
    path="/v1/voices",
)

Prometheus Instrumentation

File: unmute/main_websocket.py:74 Automatic HTTP request metrics.
Instrumentator().instrument(app).expose(app)
# Adds metrics for all HTTP endpoints
# Exposes at /metrics

Configuration

File: unmute/kyutai_constants.py Centralized configuration from environment variables.
SAMPLE_RATE = 24000
FRAME_TIME_SEC = 0.02
SAMPLES_PER_FRAME = 480

STT_SERVER = os.getenv("KYUTAI_STT_URL", "ws://localhost:8080")
TTS_SERVER = os.getenv("KYUTAI_TTS_URL", "ws://localhost:8080")
LLM_SERVER = os.getenv("KYUTAI_LLM_URL", "http://localhost:8000")

KYUTAI_LLM_API_KEY = os.getenv("KYUTAI_LLM_API_KEY")
KYUTAI_LLM_MODEL = os.getenv("KYUTAI_LLM_MODEL")

RECORDINGS_DIR = os.getenv("KYUTAI_RECORDINGS_DIR")

Next Steps

Build docs developers (and LLMs) love