Overview
The llm_utils module provides utilities for interacting with Large Language Models via the OpenAI-compatible API, preprocessing conversation history, and streaming text responses.
Constants
INTERRUPTION_CHAR
INTERRUPTION_CHAR = "—" # em-dash
Character appended to assistant messages when the bot is interrupted by the user.
USER_SILENCE_MARKER
USER_SILENCE_MARKER = "..."
Marker inserted into user messages when they remain silent for an extended period.
Classes
VLLMStream
class VLLMStream:
def __init__(
self,
client: AsyncOpenAI,
temperature: float = 1.0,
)
Streaming LLM client for chat completions.
AsyncOpenAI client instance
Sampling temperature (0.0 to 2.0). Lower values are more deterministic.
Methods
chat_completion
async def chat_completion(
self,
messages: list[dict[str, str]]
) -> AsyncIterator[str]
Generates streaming chat completion.
messages
list[dict[str, str]]
required
Conversation history in OpenAI format. Each dict should have “role” and “content” keys.
Returns: AsyncIterator[str] - Stream of text chunks
Example:
from unmute.llm.llm_utils import VLLMStream, get_openai_client
client = get_openai_client()
llm = VLLMStream(client, temperature=0.7)
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is Python?"}
]
async for chunk in llm.chat_completion(messages):
print(chunk, end="", flush=True)
Functions
get_openai_client
def get_openai_client(
server_url: str = LLM_SERVER,
api_key: str | None = KYUTAI_LLM_API_KEY
) -> AsyncOpenAI
Creates an AsyncOpenAI client instance.
Base URL of the LLM server
api_key
str | None
default:"KYUTAI_LLM_API_KEY"
API key for authentication. Defaults to “EMPTY” for vLLM servers that don’t require keys.
Returns: AsyncOpenAI client
Example:
client = get_openai_client(
server_url="http://localhost:8000",
api_key="your-api-key"
)
autoselect_model
@cache
def autoselect_model() -> str
Automatically selects an LLM model from the server.
Returns: str - Model identifier
Raises: ValueError if multiple models are available (requires explicit selection)
Notes:
- Uses
KYUTAI_LLM_MODEL environment variable if set
- Otherwise queries the server and selects the model if only one is available
- Result is cached for performance
preprocess_messages_for_llm
def preprocess_messages_for_llm(
chat_history: list[dict[str, str]],
) -> list[dict[str, str]]
Preprocesses conversation history before sending to the LLM.
chat_history
list[dict[str, str]]
required
Raw conversation history with “role” and “content” keys
Returns: list[dict[str, str]] - Cleaned conversation history
Processing steps:
- Removes messages containing only the
INTERRUPTION_CHAR
- Strips
INTERRUPTION_CHAR suffix from interrupted messages
- Merges consecutive messages from the same role
- Adds dummy “Hello.” user message if needed for model compatibility
- Removes
USER_SILENCE_MARKER prefix when user continues talking
Example:
from unmute.llm.llm_utils import preprocess_messages_for_llm
raw_history = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there—"}, # interrupted
{"role": "user", "content": "What's Python?"}
]
processed = preprocess_messages_for_llm(raw_history)
# Result: interruption marker removed, messages cleaned
rechunk_to_words
async def rechunk_to_words(
iterator: AsyncIterator[str]
) -> AsyncIterator[str]
Rechunks a text stream into whole words for better TTS pronunciation.
iterator
AsyncIterator[str]
required
Stream of text chunks (may break mid-word)
Returns: AsyncIterator[str] - Stream of complete words
Behavior:
- Spaces are included with the following word:
"foo bar baz" → "foo", " bar", " baz"
- Multiple whitespace characters are merged into a single space
- Buffers partial words until whitespace is encountered
Example:
from unmute.llm.llm_utils import rechunk_to_words
async def stream_text():
chunks = ["Hel", "lo wo", "rld! How", " are you?"]
for chunk in chunks:
yield chunk
async for word in rechunk_to_words(stream_text()):
print(f"[{word}]")
# Output:
# [Hello]
# [ world!]
# [ How]
# [ are]
# [ you?]
Protocol
LLMStream
class LLMStream(Protocol):
async def chat_completion(
self,
messages: list[dict[str, str]]
) -> AsyncIterator[str]:
...
Protocol for LLM streaming clients. Any class implementing chat_completion() can be used as an LLM stream.
Complete Example
import asyncio
from unmute.llm.llm_utils import (
get_openai_client,
VLLMStream,
preprocess_messages_for_llm,
rechunk_to_words,
)
async def chat_with_llm():
# Initialize client
client = get_openai_client()
llm = VLLMStream(client, temperature=0.7)
# Prepare conversation
raw_history = [
{"role": "system", "content": "You are a helpful AI assistant."},
{"role": "user", "content": "Tell me about Python programming."},
]
# Preprocess messages
messages = preprocess_messages_for_llm(raw_history)
# Stream response as complete words
print("Assistant: ", end="")
async for word in rechunk_to_words(llm.chat_completion(messages)):
print(word, end="", flush=True)
print() # newline
asyncio.run(chat_with_llm())
Advanced Usage: Integration with TTS
import asyncio
from unmute.llm.llm_utils import get_openai_client, VLLMStream, rechunk_to_words
from unmute.tts.text_to_speech import TextToSpeech, TTSClientEosMessage
async def speak_llm_response():
# Initialize services
client = get_openai_client()
llm = VLLMStream(client, temperature=0.7)
tts = TextToSpeech()
await tts.start_up()
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Explain machine learning briefly."}
]
# Stream LLM response directly to TTS
async for word in rechunk_to_words(llm.chat_completion(messages)):
await tts.send(word)
# Signal end of text
await tts.send(TTSClientEosMessage())
# Receive synthesized audio
async for message in tts:
pass # Process audio messages
await tts.shutdown()
asyncio.run(speak_llm_response())
Environment Variables
KYUTAI_LLM_MODEL: Model identifier to use (if not set, auto-selects)
KYUTAI_LLM_API_KEY: API key for LLM server
LLM_SERVER: Base URL of the LLM server
Notes
- The
VLLMStream class auto-selects the model if not explicitly configured
- Message preprocessing handles common conversation artifacts (interruptions, silence markers)
- Word rechunking is essential for natural TTS pronunciation
- All async functions should be run within an event loop
- The client supports any OpenAI-compatible API (vLLM, llama.cpp, etc.)