Skip to main content

Overview

The llm_utils module provides utilities for interacting with Large Language Models via the OpenAI-compatible API, preprocessing conversation history, and streaming text responses.

Constants

INTERRUPTION_CHAR

INTERRUPTION_CHAR = "—"  # em-dash
Character appended to assistant messages when the bot is interrupted by the user.

USER_SILENCE_MARKER

USER_SILENCE_MARKER = "..."
Marker inserted into user messages when they remain silent for an extended period.

Classes

VLLMStream

class VLLMStream:
    def __init__(
        self,
        client: AsyncOpenAI,
        temperature: float = 1.0,
    )
Streaming LLM client for chat completions.
client
AsyncOpenAI
required
AsyncOpenAI client instance
temperature
float
default:"1.0"
Sampling temperature (0.0 to 2.0). Lower values are more deterministic.

Methods

chat_completion
async def chat_completion(
    self,
    messages: list[dict[str, str]]
) -> AsyncIterator[str]
Generates streaming chat completion.
messages
list[dict[str, str]]
required
Conversation history in OpenAI format. Each dict should have “role” and “content” keys.
Returns: AsyncIterator[str] - Stream of text chunks Example:
from unmute.llm.llm_utils import VLLMStream, get_openai_client

client = get_openai_client()
llm = VLLMStream(client, temperature=0.7)

messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is Python?"}
]

async for chunk in llm.chat_completion(messages):
    print(chunk, end="", flush=True)

Functions

get_openai_client

def get_openai_client(
    server_url: str = LLM_SERVER,
    api_key: str | None = KYUTAI_LLM_API_KEY
) -> AsyncOpenAI
Creates an AsyncOpenAI client instance.
server_url
str
default:"LLM_SERVER"
Base URL of the LLM server
api_key
str | None
default:"KYUTAI_LLM_API_KEY"
API key for authentication. Defaults to “EMPTY” for vLLM servers that don’t require keys.
Returns: AsyncOpenAI client Example:
client = get_openai_client(
    server_url="http://localhost:8000",
    api_key="your-api-key"
)

autoselect_model

@cache
def autoselect_model() -> str
Automatically selects an LLM model from the server. Returns: str - Model identifier Raises: ValueError if multiple models are available (requires explicit selection) Notes:
  • Uses KYUTAI_LLM_MODEL environment variable if set
  • Otherwise queries the server and selects the model if only one is available
  • Result is cached for performance

preprocess_messages_for_llm

def preprocess_messages_for_llm(
    chat_history: list[dict[str, str]],
) -> list[dict[str, str]]
Preprocesses conversation history before sending to the LLM.
chat_history
list[dict[str, str]]
required
Raw conversation history with “role” and “content” keys
Returns: list[dict[str, str]] - Cleaned conversation history Processing steps:
  1. Removes messages containing only the INTERRUPTION_CHAR
  2. Strips INTERRUPTION_CHAR suffix from interrupted messages
  3. Merges consecutive messages from the same role
  4. Adds dummy “Hello.” user message if needed for model compatibility
  5. Removes USER_SILENCE_MARKER prefix when user continues talking
Example:
from unmute.llm.llm_utils import preprocess_messages_for_llm

raw_history = [
    {"role": "system", "content": "You are helpful."},
    {"role": "user", "content": "Hello"},
    {"role": "assistant", "content": "Hi there—"},  # interrupted
    {"role": "user", "content": "What's Python?"}
]

processed = preprocess_messages_for_llm(raw_history)
# Result: interruption marker removed, messages cleaned

rechunk_to_words

async def rechunk_to_words(
    iterator: AsyncIterator[str]
) -> AsyncIterator[str]
Rechunks a text stream into whole words for better TTS pronunciation.
iterator
AsyncIterator[str]
required
Stream of text chunks (may break mid-word)
Returns: AsyncIterator[str] - Stream of complete words Behavior:
  • Spaces are included with the following word: "foo bar baz""foo", " bar", " baz"
  • Multiple whitespace characters are merged into a single space
  • Buffers partial words until whitespace is encountered
Example:
from unmute.llm.llm_utils import rechunk_to_words

async def stream_text():
    chunks = ["Hel", "lo wo", "rld! How", " are you?"]
    for chunk in chunks:
        yield chunk

async for word in rechunk_to_words(stream_text()):
    print(f"[{word}]")
# Output:
# [Hello]
# [ world!]
# [ How]
# [ are]
# [ you?]

Protocol

LLMStream

class LLMStream(Protocol):
    async def chat_completion(
        self,
        messages: list[dict[str, str]]
    ) -> AsyncIterator[str]:
        ...
Protocol for LLM streaming clients. Any class implementing chat_completion() can be used as an LLM stream.

Complete Example

import asyncio
from unmute.llm.llm_utils import (
    get_openai_client,
    VLLMStream,
    preprocess_messages_for_llm,
    rechunk_to_words,
)

async def chat_with_llm():
    # Initialize client
    client = get_openai_client()
    llm = VLLMStream(client, temperature=0.7)
    
    # Prepare conversation
    raw_history = [
        {"role": "system", "content": "You are a helpful AI assistant."},
        {"role": "user", "content": "Tell me about Python programming."},
    ]
    
    # Preprocess messages
    messages = preprocess_messages_for_llm(raw_history)
    
    # Stream response as complete words
    print("Assistant: ", end="")
    async for word in rechunk_to_words(llm.chat_completion(messages)):
        print(word, end="", flush=True)
    print()  # newline

asyncio.run(chat_with_llm())

Advanced Usage: Integration with TTS

import asyncio
from unmute.llm.llm_utils import get_openai_client, VLLMStream, rechunk_to_words
from unmute.tts.text_to_speech import TextToSpeech, TTSClientEosMessage

async def speak_llm_response():
    # Initialize services
    client = get_openai_client()
    llm = VLLMStream(client, temperature=0.7)
    tts = TextToSpeech()
    await tts.start_up()
    
    messages = [
        {"role": "system", "content": "You are helpful."},
        {"role": "user", "content": "Explain machine learning briefly."}
    ]
    
    # Stream LLM response directly to TTS
    async for word in rechunk_to_words(llm.chat_completion(messages)):
        await tts.send(word)
    
    # Signal end of text
    await tts.send(TTSClientEosMessage())
    
    # Receive synthesized audio
    async for message in tts:
        pass  # Process audio messages
    
    await tts.shutdown()

asyncio.run(speak_llm_response())

Environment Variables

  • KYUTAI_LLM_MODEL: Model identifier to use (if not set, auto-selects)
  • KYUTAI_LLM_API_KEY: API key for LLM server
  • LLM_SERVER: Base URL of the LLM server

Notes

  • The VLLMStream class auto-selects the model if not explicitly configured
  • Message preprocessing handles common conversation artifacts (interruptions, silence markers)
  • Word rechunking is essential for natural TTS pronunciation
  • All async functions should be run within an event loop
  • The client supports any OpenAI-compatible API (vLLM, llama.cpp, etc.)

Build docs developers (and LLMs) love