Skip to main content

Overview

The Text-to-Speech (TTS) component uses Kyutai’s streaming TTS models to synthesize natural-sounding speech from LLM-generated text in real-time with voice cloning support. Key Features:
  • Real-time streaming synthesis
  • Voice cloning from 30-second audio samples
  • Word-level timing synchronization
  • WebSocket-based binary protocol (MessagePack)
  • Configurable voice parameters (temperature, CFG scale)

Architecture

TTS Service

Technology: Rust (moshi-server) Location: services/moshi-server/ Model: Kyutai TTS 1.6B Deployment: Docker container with GPU access

Service Configuration

Docker Compose (docker-compose.yml:55):
tts:
  image: moshi-server:latest
  command: ["worker", "--config", "configs/tts.toml"]
  environment:
    - HUGGING_FACE_HUB_TOKEN=$HUGGING_FACE_HUB_TOKEN
  deploy:
    resources:
      reservations:
        devices:
          - driver: nvidia
            count: all
            capabilities: [gpu]
Resource Usage:
  • VRAM: ~5.3 GB
  • Concurrent streams: Limited by capacity management

Python Client

File: unmute/tts/text_to_speech.py

TextToSpeech Class

class TextToSpeech(ServiceWithStartup):
    def __init__(
        self,
        tts_instance: str = TTS_SERVER,
        recorder: Recorder | None = None,
        get_time: Callable[[], float] | None = None,
        voice: str | None = None,
    ):
        self.websocket: websockets.ClientConnection | None
        self.voice: str | None = voice
        self.query: TtsStreamingQuery
        self.text_output_queue: RealtimeQueue
        
        # Timing
        self.received_samples: int = 0
        self.received_samples_yielded: int = 0

Connection Flow

Startup Sequence

File: text_to_speech.py:206
async def start_up(self):
    # 1. Build URL with query parameters
    url = (
        self.tts_instance
        + TEXT_TO_SPEECH_PATH
        + self.query.to_url_params()
    )
    
    # 2. Connect WebSocket
    self.websocket = await websockets.connect(
        url,
        additional_headers=HEADERS,
    )
    
    # 3. Send custom voice if needed
    if self.voice and self.voice.startswith("custom:"):
        voice_embedding = voice_embeddings_cache.get(self.voice)
        if voice_embedding:
            await self.websocket.send(voice_embedding)
    
    # 4. Wait for Ready message
    for _ in range(10):  # Handle race condition
        message_bytes = await self.websocket.recv(decode=False)
        message = TTSMessageAdapter.validate_python(
            msgpack.unpackb(message_bytes)
        )
        
        if isinstance(message, TTSReadyMessage):
            return  # Ready!
        elif isinstance(message, TTSErrorMessage):
            raise MissingServiceAtCapacity("tts")

Sending Text

File: text_to_speech.py:181
async def send(self, message: str | TTSClientMessage):
    # String messages are preprocessed
    if isinstance(message, str):
        message = TTSClientTextMessage(
            type="Text",
            text=prepare_text_for_tts(message)
        )
    
    # Don't send empty messages
    if isinstance(message, TTSClientTextMessage):
        if message.text == "":
            return
        
        mt.TTS_SENT_FRAMES.inc()
        self.time_since_first_text_sent.start_if_not_started()
    
    # Send as MessagePack
    await self.websocket.send(
        msgpack.packb(message.model_dump())
    )

Text Preprocessing

File: text_to_speech.py:97
def prepare_text_for_tts(text: str) -> str:
    text = text.strip()
    
    # Remove unpronounceable characters
    unpronounceable = "*_`"
    for char in unpronounceable:
        text = text.replace(char, "")
    
    # Normalize quotes
    text = text.replace("“", '"').replace("”", '"')
    text = text.replace("‘", "'").replace("’", "'")
    
    # Remove awkward spacing
    text = text.replace(" : ", " ")
    
    return text

Receiving Audio

File: text_to_speech.py:267 The TTS client is an async iterator with built-in timing control:
async def __aiter__(self) -> AsyncIterator[TTSMessage]:
    mt.TTS_SESSIONS.inc()
    mt.TTS_ACTIVE_SESSIONS.inc()
    
    output_queue: RealtimeQueue[TTSMessage] = RealtimeQueue()
    
    try:
        async for message_bytes in self.websocket:
            message = TTSMessageAdapter.validate_python(
                msgpack.unpackb(message_bytes)
            )
            
            if isinstance(message, TTSAudioMessage):
                # Queue audio with timestamp
                output_queue.put(
                    message,
                    release_time=(
                        self.received_samples / SAMPLE_RATE
                        - AUDIO_BUFFER_SEC
                    )
                )
                self.received_samples += len(message.pcm)
                
                # Record TTFT
                if self.waiting_first_audio:
                    self.waiting_first_audio = False
                    ttft = self.time_since_first_text_sent.time()
                    mt.TTS_TTFT.observe(ttft)
                
            elif isinstance(message, TTSTextMessage):
                # Queue text with its timestamp
                output_queue.put(message, message.start_s)
            
            # Release messages at correct time
            for _, msg in output_queue.get_nowait():
                if isinstance(msg, TTSAudioMessage):
                    self.received_samples_yielded += len(msg.pcm)
                yield msg
    
    finally:
        # Empty remaining queue
        async for _, msg in output_queue:
            yield msg

Message Types

Client → Server

Text Message

class TTSClientTextMessage(BaseModel):
    type: Literal["Text"] = "Text"
    text: str
Example:
{"type": "Text", "text": "Hello world"}

Voice Message

class TTSClientVoiceMessage(BaseModel):
    type: Literal["Voice"] = "Voice"
    embeddings: list[float]
    shape: list[int]
Sent at connection start for custom voices.

End-of-Stream Message

class TTSClientEosMessage(BaseModel):
    type: Literal["Eos"] = "Eos"
Signals no more text will be sent.

Server → Client

Ready Message

class TTSReadyMessage(BaseModel):
    type: Literal["Ready"]
Sent when TTS is ready to receive text.

Text Message

class TTSTextMessage(BaseModel):
    type: Literal["Text"]
    text: str
    start_s: float  # When this word starts (in generated audio)
    stop_s: float   # When this word ends
Example:
{
    "type": "Text",
    "text": "hello",
    "start_s": 0.5,
    "stop_s": 0.8
}
Purpose: Synchronize text display with audio playback.

Audio Message

class TTSAudioMessage(BaseModel):
    type: Literal["Audio"]
    pcm: list[float]  # Variable length, float32
Example:
{
    "type": "Audio",
    "pcm": [0.001, -0.002, 0.003, ...]  # ~1000-5000 samples
}
Chunk Size: Variable (typically 1000-5000 samples) Format: PCM float32, 24kHz, mono

Error Message

class TTSErrorMessage(BaseModel):
    type: Literal["Error"]
    message: str

Real-Time Queue

File: unmute/tts/realtime_queue.py Manages timed release of audio/text messages.
class RealtimeQueue[T]:
    def __init__(self, get_time: Callable[[], float] | None = None):
        self.items: list[tuple[float, T]] = []  # (release_time, item)
        self.get_time = get_time or time.time
    
    def put(self, item: T, release_time: float):
        # Add item with release timestamp
        self.items.append((release_time, item))
    
    def get_nowait(self) -> list[tuple[float, T]]:
        # Return items ready to be released
        current_time = self.get_time()
        ready = [
            (t, item)
            for t, item in self.items
            if t <= current_time
        ]
        # Remove from queue
        self.items = [
            (t, item)
            for t, item in self.items
            if t > current_time
        ]
        return ready
    
    async def __aiter__(self):
        # Wait and yield remaining items
        while self.items:
            next_time = min(t for t, _ in self.items)
            wait_time = next_time - self.get_time()
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            for item in self.get_nowait():
                yield item
Audio Buffering (text_to_speech.py:94):
AUDIO_BUFFER_SEC = FRAME_TIME_SEC * 4  # 80ms

# Release audio 80ms ahead of playback time
release_time = (samples_received / SAMPLE_RATE) - AUDIO_BUFFER_SEC

Voice Configuration

Query Parameters

File: text_to_speech.py:111
class TtsStreamingQuery(BaseModel):
    seed: int | None = None
    temperature: float | None = None
    top_k: int | None = None
    format: str = "PcmMessagePack"
    voice: str | None = None  # Pre-defined voices
    voices: list[str] | None = None
    max_seq_len: int | None = None
    cfg_alpha: float | None = None  # Classifier-Free Guidance
    auth_id: str | None = None
Example URL:
ws://tts:8080/api/tts?voice=watercooler&cfg_alpha=1.5&format=PcmMessagePack
Default Settings:
TtsStreamingQuery(
    voice=self.voice,  # From session config
    cfg_alpha=1.5,     # Quality vs. speed trade-off
)

Voice Sources

File: voices.yaml
- name: Watercooler
  good: true
  instructions:
    type: smalltalk
  source:
    source_type: file
    path_on_server: unmute-prod-website/p329_022.wav
    description: From the Device Recorded VCTK dataset.
Voice Types:
  1. Pre-defined: Voices from voices.yaml (loaded from HuggingFace)
  2. Custom: User-uploaded voice cloning (custom:xyz123)

Voice Cloning

File: unmute/tts/voice_cloning.py
def clone_voice(audio_bytes: bytes) -> str:
    # 1. Send audio to voice cloning service
    response = requests.post(
        VOICE_CLONING_SERVER + "/api/clone",
        files={"audio": audio_bytes}
    )
    
    # 2. Get voice embeddings
    embeddings = response.json()["embeddings"]
    shape = response.json()["shape"]
    
    # 3. Generate unique ID
    voice_id = f"custom:{uuid.uuid4().hex[:8]}"
    
    # 4. Cache embeddings
    voice_embeddings_cache[voice_id] = msgpack.packb({
        "type": "Voice",
        "embeddings": embeddings,
        "shape": shape,
    })
    
    return voice_id
Upload Flow:

Integration with UnmuteHandler

Startup

File: unmute/unmute_handler.py:470
async def start_up_tts(self, generating_message_i: int):
    async def _init() -> TextToSpeech:
        factory = partial(
            TextToSpeech,
            recorder=self.recorder,
            get_time=self.audio_received_sec,
            voice=self.tts_voice,
        )
        
        # Retry with backoff
        for trial in range(5):
            try:
                return await find_instance("tts", factory)
            except Exception:
                if trial == 4:
                    raise
                await asyncio.sleep(0.05 * (1.5 ** trial))
                await emit_warning("Looking for resources...")
    
    async def _run(tts: TextToSpeech):
        await self._tts_loop(tts, generating_message_i)
    
    async def _close(tts: TextToSpeech):
        await tts.shutdown()
    
    return await self.quest_manager.add(
        Quest("tts", _init, _run, _close)
    )

Message Loop

File: unmute/unmute_handler.py:508
async def _tts_loop(self, tts: TextToSpeech, generating_message_i: int):
    output_queue = self.output_queue  # Snapshot for interruption
    audio_started = None
    
    try:
        async for message in tts:
            # Check for interruption
            if len(self.chatbot.chat_history) > generating_message_i:
                break  # User interrupted
            
            if isinstance(message, TTSAudioMessage):
                # Convert to numpy array
                audio = np.array(message.pcm, dtype=np.float32)
                
                # Send to frontend (via emit loop)
                await output_queue.put((SAMPLE_RATE, audio))
                
                if audio_started is None:
                    audio_started = self.audio_received_sec()
                
            elif isinstance(message, TTSTextMessage):
                # Send text delta to frontend
                await output_queue.put(
                    ora.ResponseTextDelta(delta=message.text)
                )
                
                # Update chat history
                await self.add_chat_message_delta(
                    message.text,
                    "assistant",
                    generating_message_i=generating_message_i,
                )
    
    finally:
        # Flush Opus encoder with silence
        await output_queue.put(
            (SAMPLE_RATE, np.zeros(SAMPLES_PER_FRAME, dtype=np.float32))
        )
        
        await output_queue.put(ora.ResponseAudioDone())

LLM → TTS Pipeline

File: unmute/unmute_handler.py:184
async def _generate_response_task(self):
    # Start TTS connection
    quest = await self.start_up_tts(generating_message_i)
    
    # Stream from LLM
    llm = VLLMStream(self.openai_client, temperature=0.7)
    
    tts = None
    async for delta in rechunk_to_words(llm.chat_completion(messages)):
        # Send to frontend (for subtitles)
        await self.output_queue.put(
            ora.UnmuteResponseTextDeltaReady(delta=delta)
        )
        
        # Get TTS connection
        if tts is None:
            tts = await quest.get()
        
        # Check for interruption
        if len(self.chatbot.chat_history) > generating_message_i:
            break
        
        # Send word to TTS
        await tts.send(delta)
    
    # Signal end of text
    if tts is not None:
        await tts.send(TTSClientEosMessage())

Timing & Latency

Time to First Token (TTFT)

File: text_to_speech.py:286
if self.waiting_first_audio and self.time_since_first_text_sent.started:
    self.waiting_first_audio = False
    ttft = self.time_since_first_text_sent.time()
    mt.TTS_TTFT.observe(ttft)
Typical Latency:
  • Single GPU: 750ms
  • Dedicated GPU: 450ms
  • Depends on: GPU model, model size, queue depth

Real-Time Factor

TTS generates audio faster than real-time:
# Example metrics
time_received = 5.2s       # Audio duration received
time_since_start = 2.1s    # Wall clock time
rtf = 5.2 / 2.1 = 2.48     # 2.48x real-time
Why Important: RTF > 1.0 ensures smooth playback without stuttering.

Synchronization

File: unmute/unmute_handler.py:523
self.debug_dict["tts_throughput"] = {
    "time_received": time_received,        # Audio duration received
    "time_received_yielded": time_yielded, # Audio duration sent
    "time_since_start": elapsed,           # Wall clock time
    "ratio": time_yielded / (elapsed + 0.01),  # RTF
}

Metrics

File: unmute/metrics.py

TTS-Specific Metrics

# Session metrics
TTS_SESSIONS = Counter('unmute_tts_sessions_total')
TTS_ACTIVE_SESSIONS = Gauge('unmute_tts_active_sessions')

# Frame metrics
TTS_SENT_FRAMES = Counter('unmute_tts_sent_frames_total')
TTS_RECV_FRAMES = Counter('unmute_tts_recv_frames_total')

# Word metrics
TTS_RECV_WORDS = Counter('unmute_tts_recv_words_total')

# Duration metrics
TTS_AUDIO_DURATION = Histogram('unmute_tts_audio_duration_seconds')
TTS_GEN_DURATION = Histogram('unmute_tts_gen_duration_seconds')

# Latency metrics
TTS_TTFT = Histogram(
    'unmute_tts_ttft_seconds',
    buckets=[0.1, 0.2, 0.5, 1.0, 2.0, 5.0]
)

Error Handling

Connection Failures

try:
    tts = await find_instance("tts", factory)
except MissingServiceAtCapacity:
    await emit_error("TTS service at capacity")
except MissingServiceTimeout:
    await emit_error("TTS service timeout")

Race Condition Handling

File: text_to_speech.py:226
# Sometimes receive old packets from previous client
for _ in range(10):
    message = await self.websocket.recv()
    if isinstance(message, TTSReadyMessage):
        return  # Found the Ready message
    else:
        logger.warning(f"Unexpected message: {message.type}")

Voice Donation

File: unmute/tts/voice_donation.py System for collecting voice donations:
class VoiceDonationSubmission(BaseModel):
    verification_text: str
    verification_hash: str
    email: str | None
    donate_publicly: bool

def submit_voice_donation(
    submission: VoiceDonationSubmission,
    audio_bytes: bytes,
):
    # 1. Verify text matches recording
    # 2. Upload to S3 or storage
    # 3. Send confirmation email

Next Steps

Build docs developers (and LLMs) love