Skip to main content
Unmute provides several utility classes for timing, queuing, service discovery, and caching.

Quest and QuestManager

Async task lifecycle management for services with init/run/close phases.

Class: Quest[T]

Represents an async unit of work with initialization, execution, and cleanup phases.
from unmute.quest_manager import Quest

# Create a quest with full lifecycle
quest = Quest(
    name="my-service",
    init=async_init_function,
    run=async_run_function,
    close=async_close_function
)

# Or create a simple quest
quest = Quest.from_run_step(
    name="simple-task",
    run=async_function
)

Methods

async get
method
Wait for initialization to complete and return the initialized resource.
resource = await quest.get()
get_nowait
method
Get the initialized resource immediately, or None if not ready.
resource = quest.get_nowait()
if resource:
    print("Ready!")
async remove
method
Cancel the quest and run cleanup.
await quest.remove()

Class: QuestManager

Manages multiple quests and ensures proper cleanup.
from unmute.quest_manager import QuestManager

manager = QuestManager()

# Add quests
await manager.add(stt_quest)
await manager.add(tts_quest)

# Wait for all quests to complete
await manager.wait()

# Remove a specific quest
await manager.remove("stt")

Timing Utilities

Class: Stopwatch

Simple timer for measuring elapsed time.
from unmute.timer import Stopwatch

# Auto-start on creation
timer = Stopwatch()

# Or start manually
timer = Stopwatch(autostart=False)
timer.start_if_not_started()

# Get elapsed time
elapsed = timer.time()  # seconds

# Stop and get final time
final_time = timer.stop()

Properties and Methods

started
property
Whether the stopwatch has been started.
time
method
Get elapsed time in seconds. Returns 0.0 if not started.
stop
method
Stop the timer and return elapsed time, or None if already stopped.

Class: PhasesStopwatch

Track timing for multiple sequential phases.
from unmute.timer import PhasesStopwatch

# Define phases
phases = ["stt_ttft", "llm_ttft", "tts_ttft", "total"]
timer = PhasesStopwatch(phases)

# Mark phase completion
timer.time_phase_if_not_started("stt_ttft")
# ... time passes ...
timer.time_phase_if_not_started("llm_ttft")

# Get timing for specific phase
stt_time = timer.get_time_for_phase("stt_ttft")

# Get all timings
timings = timer.phase_dict()
# {"stt_ttft": 0.05, "llm_ttft": 0.12, ...}

# Get partial timings (includes None for incomplete phases)
partial = timer.phase_dict_partial()
# {"stt_ttft": 0.05, "llm_ttft": 0.12, "tts_ttft": None, ...}

# Reset all phases
timer.reset()

Queue Utilities

Class: RealtimeQueue[T]

Priority queue that releases items based on timestamps for real-time processing.
from unmute.tts.realtime_queue import RealtimeQueue

queue = RealtimeQueue()

# Start the timer (optional, auto-starts on first put)
queue.start_if_not_started()

# Add items with release times
queue.put(audio_chunk_1, time=0.5)  # Release at 0.5 seconds
queue.put(audio_chunk_2, time=1.0)  # Release at 1.0 seconds

# Get items as they become ready (async iterator)
async for timestamp, item in queue.get():
    print(f"Released at {timestamp}: {item}")

# Get ready items immediately (non-blocking)
for timestamp, item in queue.get_nowait():
    process(item)

# Check if empty
if queue.empty():
    print("No items waiting")
Used by TTS for buffering audio chunks to ensure smooth playback timing.

Service Discovery

Functions for finding and connecting to backend services with failover support.

get_instances

async get_instances
function
Get all available instances of a service.Parameters:
  • service_name (str) - Service name (e.g., “stt”, “tts”, “llm”)
from unmute.service_discovery import get_instances

instances = await get_instances("stt")
# ["ws://stt-1:8080", "ws://stt-2:8080"]

find_instance

async find_instance
function
Find a working instance of a service by trying connections.Parameters:
  • service_name (str) - Service name
  • client_factory (Callable) - Function to create client from URL
  • timeout_sec (float) - Connection timeout per attempt (default: 0.5)
  • max_trials (int) - Maximum connection attempts (default: 3)
Raises:
  • MissingServiceAtCapacity - All instances are at capacity
  • MissingServiceTimeout - Connection attempts timed out
from unmute.service_discovery import find_instance
from unmute.stt.speech_to_text import SpeechToText

# Find working STT instance
stt = await find_instance(
    service_name="stt",
    client_factory=lambda url: SpeechToText(stt_instance=url),
    timeout_sec=1.0,
    max_trials=5
)

async_ttl_cached

async_ttl_cached
function
Decorator to cache async function results with TTL.Parameters:
  • func (Callable) - Async function to cache
  • ttl_sec (float) - Cache TTL in seconds (default: 0.1)
from unmute.service_discovery import async_ttl_cached

@async_ttl_cached(ttl_sec=5.0)
async def get_expensive_data(key: str) -> str:
    # Expensive operation
    return await fetch_from_db(key)

# First call fetches
data1 = await get_expensive_data("user123")

# Second call within 5 seconds uses cache
data2 = await get_expensive_data("user123")

Caching

get_cache

Get a cache instance (local or Redis-backed).
from unmute.cache import get_cache

# Get cache with TTL
cache = get_cache(prefix="voices", ttl_seconds=3600)

# Set value
cache.set("key", {"name": "Alice", "voice": "voice-123"})

# Get value
value = cache.get("key")  # Returns dict or None

# Delete value
cache.delete("key")

# Cleanup expired entries (LocalCache only)
cache.cleanup()
Cache Implementation:
  • If KYUTAI_REDIS_URL is set, uses RedisCache for distributed caching
  • Otherwise uses LocalCache for single-instance in-memory caching

Recording

Class: Recorder

Records WebSocket session events to disk for replay and analysis.
from unmute.recorder import Recorder
from pathlib import Path

# Create recorder
recorder = Recorder(recordings_dir=Path("/tmp/recordings"))

# Record events
await recorder.add_event("client", client_event)
await recorder.add_event("server", server_event)

# Shutdown and save
await recorder.shutdown(keep_recording=True)
EventSender: "client" or "server" Recordings are saved as JSON files in the specified directory, one file per session.

Audio Processing

ExponentialMovingAverage

Smooth signal processing with configurable attack and release times.
from unmute.stt.exponential_moving_average import ExponentialMovingAverage

# Create EMA for Voice Activity Detection
ema = ExponentialMovingAverage(
    attack_time=0.05,   # Fast response to speech start
    release_time=0.5,   # Slow decay for pauses
    initial_value=0.0
)

# Update with new values
smoothed = ema.update(dt=0.01, new_value=0.8)

# Calculate decay time
time_to_threshold = ema.time_to_decay_to(value=0.1)
Used by the STT component for pause detection and VAD.

WebSocket Utilities

URL Conversion

Convert between HTTP and WebSocket URLs.
from unmute.websocket_utils import http_to_ws, ws_to_http

# Convert HTTP to WebSocket
ws_url = http_to_ws("http://localhost:8080/api")
# "ws://localhost:8080/api"

ws_url = http_to_ws("https://example.com/api")
# "wss://example.com/api"

# Convert WebSocket to HTTP
http_url = ws_to_http("ws://localhost:8080/api")
# "http://localhost:8080/api"

Usage Example

from unmute.quest_manager import Quest, QuestManager
from unmute.timer import Stopwatch, PhasesStopwatch
from unmute.service_discovery import find_instance
from unmute.stt.speech_to_text import SpeechToText

async def main():
    # Set up service discovery
    stt = await find_instance("stt", lambda url: SpeechToText(stt_instance=url))
    
    # Create quest for STT service
    stt_quest = Quest(
        name="stt",
        init=lambda: stt,
        run=lambda s: s.start_up(),
        close=lambda s: s.shutdown()
    )
    
    # Manage with QuestManager
    manager = QuestManager()
    await manager.add(stt_quest)
    
    # Track timing
    timer = PhasesStopwatch(["init", "process", "total"])
    timer.time_phase_if_not_started("init")
    
    # ... do work ...
    
    timer.time_phase_if_not_started("process")
    print(timer.phase_dict())
    
    # Cleanup
    await manager.wait()

Build docs developers (and LLMs) love