Skip to main content

Overview

Unmute integrates with any OpenAI-compatible LLM server to generate conversational responses. The system uses streaming completions to minimize latency and enable real-time text-to-speech synthesis. Key Features:
  • OpenAI-compatible API (works with VLLM, OpenAI, Ollama, etc.)
  • Streaming completions for low latency
  • Word-level chunking for TTS
  • Dynamic system prompts and character personalities
  • Context management and preprocessing

Architecture

LLM Server

Default: VLLM

Technology: VLLM (Very Large Language Model) Default Model: Llama 3.2 1B Instruct API: OpenAI-compatible /v1/chat/completions Docker Compose (docker-compose.yml:101):
llm:
  image: vllm/vllm-openai:v0.11.0
  command:
    - "--model=meta-llama/Llama-3.2-1B-Instruct"
    - "--max-model-len=1536"
    - "--dtype=bfloat16"
    - "--gpu-memory-utilization=0.4"
  environment:
    - HUGGING_FACE_HUB_TOKEN=$HUGGING_FACE_HUB_TOKEN
  deploy:
    resources:
      reservations:
        devices:
          - driver: nvidia
            count: all
            capabilities: [gpu]
Resource Usage:
  • VRAM: ~6.1 GB (Llama 3.2 1B)
  • Concurrent requests: Batched automatically by VLLM
Alternative Models:
  • Mistral Small 3.2 24B: Better quality, more VRAM
  • Gemma 3 12B: Good balance

External LLM Servers

Unmute supports any OpenAI-compatible server: Ollama:
backend:
  environment:
    - KYUTAI_LLM_URL=http://host.docker.internal:11434
    - KYUTAI_LLM_MODEL=gemma3
    - KYUTAI_LLM_API_KEY=ollama
OpenAI:
backend:
  environment:
    - KYUTAI_LLM_URL=https://api.openai.com/v1
    - KYUTAI_LLM_MODEL=gpt-4.1
    - KYUTAI_LLM_API_KEY=sk-...

Python Client

File: unmute/llm/llm_utils.py

OpenAI Client

from openai import AsyncOpenAI

def get_openai_client(
    server_url: str = LLM_SERVER,
    api_key: str | None = KYUTAI_LLM_API_KEY,
) -> AsyncOpenAI:
    # Use dummy key for VLLM (doesn't validate)
    return AsyncOpenAI(
        api_key=api_key or "EMPTY",
        base_url=server_url + "/v1"
    )

Model Selection

File: llm_utils.py:110
@cache
def autoselect_model() -> str:
    if KYUTAI_LLM_MODEL is not None:
        return KYUTAI_LLM_MODEL
    
    # Auto-detect if only one model available
    client_sync = OpenAI(
        api_key=openai_client.api_key or "EMPTY",
        base_url=openai_client.base_url
    )
    models = client_sync.models.list()
    
    if len(models.data) != 1:
        raise ValueError("Multiple models available. Specify KYUTAI_LLM_MODEL")
    
    return models.data[0].id

VLLMStream Class

File: llm_utils.py:126
class VLLMStream:
    def __init__(
        self,
        client: AsyncOpenAI,
        temperature: float = 1.0,
    ):
        self.client = client
        self.model = autoselect_model()
        self.temperature = temperature
    
    async def chat_completion(
        self,
        messages: list[dict[str, str]]
    ) -> AsyncIterator[str]:
        # Create streaming completion
        stream = await self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            stream=True,
            temperature=self.temperature,
        )
        
        # Yield text deltas
        async with stream:
            async for chunk in stream:
                chunk_content = chunk.choices[0].delta.content
                
                if chunk_content:  # Skip null chunks
                    yield chunk_content

Conversation Management

Chatbot Class

File: unmute/llm/chatbot.py
class Chatbot:
    def __init__(self):
        self.chat_history: list[dict[Any, Any]] = [
            {
                "role": "system",
                "content": ConstantInstructions().make_system_prompt()
            }
        ]
        self._instructions: Instructions | None = None

Message Format

OpenAI chat completion format:
[
    {
        "role": "system",
        "content": "You are a helpful assistant..."
    },
    {
        "role": "user",
        "content": "Hello!"
    },
    {
        "role": "assistant",
        "content": "Hi! How can I help you today?"
    },
    ...
]

Adding Messages

File: chatbot.py:39
async def add_chat_message_delta(
    self,
    delta: str,
    role: Literal["user", "assistant"],
    generating_message_i: int | None = None,
) -> bool:
    # Check for interruption
    if (
        generating_message_i is not None
        and len(self.chat_history) > generating_message_i
    ):
        return False  # Message was interrupted
    
    # Create new message or append to existing
    if not self.chat_history or self.chat_history[-1]["role"] != role:
        self.chat_history.append({"role": role, "content": delta})
        return True  # New message
    else:
        last_message = self.chat_history[-1]["content"]
        
        # Add space if needed
        needs_space = (
            last_message and not last_message[-1].isspace()
            and delta and not delta[0].isspace()
        )
        
        if needs_space:
            delta = " " + delta
        
        self.chat_history[-1]["content"] += delta
        return last_message == ""  # New if previously empty

Conversation State

File: chatbot.py:21
def conversation_state(self) -> ConversationState:
    if not self.chat_history:
        return "waiting_for_user"
    
    last_message = self.chat_history[-1]
    
    if last_message["role"] == "assistant":
        return "bot_speaking"
    elif last_message["role"] == "user":
        if last_message["content"].strip():
            return "user_speaking"
        else:
            return "waiting_for_user"
    elif last_message["role"] == "system":
        return "waiting_for_user"
States:
  • waiting_for_user: Empty user message, ready for input
  • user_speaking: Non-empty user message, accumulating speech
  • bot_speaking: Assistant message, TTS active

Message Preprocessing

File: llm_utils.py:16 Messages are preprocessed before sending to LLM:
def preprocess_messages_for_llm(
    chat_history: list[dict[str, str]],
) -> list[dict[str, str]]:
    output = []
    
    for message in chat_history:
        message = deepcopy(message)
        
        # 1. Remove empty interruptions
        if message["content"].replace(INTERRUPTION_CHAR, "") == "":
            continue
        
        # 2. Remove interruption markers
        message["content"] = message["content"].strip().removesuffix(
            INTERRUPTION_CHAR  # em-dash (—)
        )
        
        # 3. Merge consecutive same-role messages
        if output and message["role"] == output[-1]["role"]:
            output[-1]["content"] += " " + message["content"]
        else:
            output.append(message)
    
    # 4. Ensure proper message order
    if output[0]["role"] == "system" and output[1]["role"] in [None, "assistant"]:
        # Some models need user message first
        output = [
            output[0],
            {"role": "user", "content": "Hello."},
            *output[1:]
        ]
    
    # 5. Handle silence markers
    for message in output:
        if (
            message["role"] == "user"
            and message["content"].startswith(USER_SILENCE_MARKER)
            and message["content"] != USER_SILENCE_MARKER
        ):
            # User spoke after silence marker
            message["content"] = message["content"][len(USER_SILENCE_MARKER):]
    
    return output
Special Markers:
  • INTERRUPTION_CHAR = "—" (em-dash): Marks interrupted messages
  • USER_SILENCE_MARKER = "...": User silent for >7s

Word-Level Chunking

File: llm_utils.py:65 LLM output is rechunked to word boundaries for TTS:
async def rechunk_to_words(iterator: AsyncIterator[str]) -> AsyncIterator[str]:
    """Rechunk token stream to whole words.
    
    Input:  ["Hel", "lo", " wor", "ld"]
    Output: ["Hello", " world"]
    
    Spaces included with next word.
    """
    buffer = ""
    space_re = re.compile(r"\s+")
    prefix = ""
    
    async for delta in iterator:
        buffer += delta
        
        # Extract complete words
        while True:
            match = space_re.search(buffer)
            if match is None:
                break
            
            # Yield word (with previous space)
            chunk = buffer[:match.start()]
            buffer = buffer[match.end():]  # Skip whitespace
            
            if chunk:
                yield prefix + chunk
            prefix = " "
    
    # Yield remaining buffer
    if buffer:
        yield prefix + buffer
Why Needed: TTS needs word boundaries to pronounce correctly. Example:
# LLM tokens: ["The", " quick", " brown", " fox"]
# Rechunked:  ["The", " quick", " brown", " fox"]

# LLM tokens: ["Hel", "lo", " wor", "ld!"]
# Rechunked:  ["Hello", " world!"]

System Prompts

File: unmute/llm/system_prompt.py

Instructions Classes

class Instructions(BaseModel):
    type: str
    
    def make_system_prompt(self) -> str:
        # Override in subclasses
        raise NotImplementedError

class ConstantInstructions(Instructions):
    type: Literal["constant"] = "constant"
    text: str
    language: str = "en"
    
    def make_system_prompt(self) -> str:
        return self.text + "\n\n" + COMMON_INSTRUCTIONS

class SmalltalkInstructions(Instructions):
    type: Literal["smalltalk"] = "smalltalk"
    language: str = "en"
    
    def make_system_prompt(self) -> str:
        return f"""
You are a friendly conversational partner.
Your role is to engage in casual smalltalk.
{COMMON_INSTRUCTIONS}
"""

Common Instructions

COMMON_INSTRUCTIONS = """
You're in a voice conversation.

IMPORTANT RULES:
- Keep responses SHORT and CONCISE (1-3 sentences)
- Be natural and conversational
- Don't use markdown or formatting
- If interrupted (message ends with —), the user started speaking
- If user says "...", they've been silent for a while
- End with "Bye!" to close the conversation

CONVERSATION FLOW:
- First message: Introduce yourself briefly
- Following messages: Respond naturally to the user
- If user silent ("..."): Ask if they're still there or prompt them
- Be concise - this is voice, not text chat!
"""

Dynamic Prompts

Quiz Show (system_prompt.py:200):
class QuizShowInstructions(Instructions):
    type: Literal["quiz_show"] = "quiz_show"
    
    def make_system_prompt(self) -> str:
        # Randomly select 5 questions
        questions = random.sample(QUIZ_QUESTIONS, 5)
        
        return f"""
You are a quiz show host.
Ask these 5 questions one at a time:

{format_questions(questions)}

Wait for the user's answer before moving to the next question.
{COMMON_INSTRUCTIONS}
"""
News (system_prompt.py:300):
class NewsInstructions(Instructions):
    type: Literal["news"] = "news"
    
    def make_system_prompt(self) -> str:
        # Fetch latest headlines from NewsAPI
        headlines = fetch_news_headlines()
        
        return f"""
You are a news anchor.
Discuss these recent headlines:

{format_headlines(headlines)}

{COMMON_INSTRUCTIONS}
"""

Response Generation

File: unmute/unmute_handler.py:184

Full Pipeline

async def _generate_response_task(self):
    generating_message_i = len(self.chatbot.chat_history)
    
    # 1. Notify frontend
    await self.output_queue.put(
        ora.ResponseCreated(
            response=ora.Response(
                status="in_progress",
                voice=self.tts_voice,
                chat_history=self.chatbot.chat_history,
            )
        )
    )
    
    # 2. Start TTS connection
    tts_quest = await self.start_up_tts(generating_message_i)
    
    # 3. Create LLM stream
    llm = VLLMStream(
        self.openai_client,
        temperature=(
            FIRST_MESSAGE_TEMPERATURE if generating_message_i == 2
            else FURTHER_MESSAGES_TEMPERATURE
        )
    )
    
    # 4. Get preprocessed messages
    messages = self.chatbot.preprocessed_messages()
    
    # 5. Stream response
    tts = None
    async for delta in rechunk_to_words(llm.chat_completion(messages)):
        # Notify frontend (for subtitles)
        await self.output_queue.put(
            ora.UnmuteResponseTextDeltaReady(delta=delta)
        )
        
        # Get TTS connection (waits if not ready)
        if tts is None:
            tts = await tts_quest.get()
        
        # Check for interruption
        if len(self.chatbot.chat_history) > generating_message_i:
            break  # User interrupted
        
        # Send to TTS
        await tts.send(delta)
    
    # 6. Signal end
    if tts is not None:
        await tts.send(TTSClientEosMessage())
    
    await self.output_queue.put(
        ora.ResponseTextDone(text=" ".join(response_words))
    )

Temperature Settings

File: unmute/unmute_handler.py:58
FIRST_MESSAGE_TEMPERATURE = 0.7   # More creative greeting
FURTHER_MESSAGES_TEMPERATURE = 0.3  # More consistent responses
Why Different:
  • First message: Variety in greetings
  • Later messages: Consistent personality

Interruption Handling

File: unmute/unmute_handler.py:583
async def interrupt_bot(self):
    if self.chatbot.conversation_state() != "bot_speaking":
        raise RuntimeError("Can't interrupt when not speaking")
    
    # 1. Mark interruption in chat history
    await self.add_chat_message_delta(INTERRUPTION_CHAR, "assistant")
    
    # 2. Clear output queue
    if self._clear_queue is not None:
        self._clear_queue()  # FastRTC's audio queue
    self.output_queue = asyncio.Queue()  # Our event queue
    
    # 3. Send silence to flush Opus encoder
    await self.output_queue.put(
        (SAMPLE_RATE, np.zeros(SAMPLES_PER_FRAME, dtype=np.float32))
    )
    
    # 4. Notify frontend
    await self.output_queue.put(ora.UnmuteInterruptedByVAD())
    
    # 5. Cancel LLM and TTS tasks
    await self.quest_manager.remove("tts")
    await self.quest_manager.remove("llm")
Result:
  • LLM stream cancelled (no more tokens)
  • TTS connection closed (no more audio)
  • Chat history contains partial response with
  • Next preprocessing will remove the

Special Behaviors

Long Silence Detection

File: unmute/unmute_handler.py:626
USER_SILENCE_TIMEOUT = 7.0  # seconds

async def detect_long_silence(self):
    if (
        self.chatbot.conversation_state() == "waiting_for_user"
        and (self.audio_received_sec() - self.waiting_for_user_start_time)
        > USER_SILENCE_TIMEOUT
    ):
        # Add silence marker
        await self.add_chat_message_delta(USER_SILENCE_MARKER, "user")
Effect: LLM sees "..." and can prompt user or check if they’re there.

Goodbye Detection

File: unmute/unmute_handler.py:609
async def check_for_bot_goodbye(self):
    last_assistant_message = next(
        (
            msg for msg in reversed(self.chatbot.chat_history)
            if msg["role"] == "assistant"
        ),
        {"content": ""}
    )["content"]
    
    if last_assistant_message.lower().endswith("bye!"):
        await self.output_queue.put(
            CloseStream("The assistant ended the conversation.")
        )
Effect: Connection closes gracefully when bot says “Bye!”

Metrics

File: unmute/metrics.py

LLM-Specific Metrics

# Session metrics
VLLM_ACTIVE_SESSIONS = Gauge('unmute_vllm_active_sessions')
VLLM_INTERRUPTS = Counter('unmute_vllm_interrupts_total')

# Word metrics
VLLM_SENT_WORDS = Counter('unmute_vllm_sent_words_total')
VLLM_RECV_WORDS = Counter('unmute_vllm_recv_words_total')

# Request/response metrics
VLLM_REQUEST_LENGTH = Histogram('unmute_vllm_request_length_words')
VLLM_REPLY_LENGTH = Histogram('unmute_vllm_reply_length_words')

# Duration metrics
VLLM_GEN_DURATION = Histogram('unmute_vllm_gen_duration_seconds')

# Latency metrics
VLLM_TTFT = Histogram(
    'unmute_vllm_ttft_seconds',
    buckets=[0.05, 0.1, 0.2, 0.5, 1.0, 2.0]
)

# Error metrics
VLLM_HARD_ERRORS = Counter('unmute_vllm_hard_errors_total')

Metrics Recording

File: unmute/unmute_handler.py:220
# Before generation
num_words_sent = sum(
    len(message.get("content", "").split())
    for message in messages
)
mt.VLLM_SENT_WORDS.inc(num_words_sent)
mt.VLLM_REQUEST_LENGTH.observe(num_words_sent)
mt.VLLM_ACTIVE_SESSIONS.inc()

# During generation
if time_to_first_token is None:
    time_to_first_token = llm_stopwatch.time()
    mt.VLLM_TTFT.observe(time_to_first_token)

mt.VLLM_RECV_WORDS.inc()  # Per word

# After generation
mt.VLLM_ACTIVE_SESSIONS.dec()
mt.VLLM_REPLY_LENGTH.observe(len(response_words))
mt.VLLM_GEN_DURATION.observe(llm_stopwatch.time())

Configuration

Environment Variables:
# LLM server URL
KYUTAI_LLM_URL=http://llm:8000

# Model name (optional, auto-detected if only one)
KYUTAI_LLM_MODEL=meta-llama/Llama-3.2-1B-Instruct

# API key (optional, for OpenAI/external servers)
KYUTAI_LLM_API_KEY=sk-...

# News API key (for news character)
NEWSAPI_API_KEY=...

Next Steps

Build docs developers (and LLMs) love