Unmute provides several utility classes for timing, queuing, service discovery, and caching.
Quest and QuestManager
Async task lifecycle management for services with init/run/close phases.
Class: Quest[T]
Represents an async unit of work with initialization, execution, and cleanup phases.
from unmute.quest_manager import Quest
# Create a quest with full lifecycle
quest = Quest(
name="my-service",
init=async_init_function,
run=async_run_function,
close=async_close_function
)
# Or create a simple quest
quest = Quest.from_run_step(
name="simple-task",
run=async_function
)
Methods
Wait for initialization to complete and return the initialized resource.resource = await quest.get()
Get the initialized resource immediately, or None if not ready.resource = quest.get_nowait()
if resource:
print("Ready!")
Cancel the quest and run cleanup.
Class: QuestManager
Manages multiple quests and ensures proper cleanup.
from unmute.quest_manager import QuestManager
manager = QuestManager()
# Add quests
await manager.add(stt_quest)
await manager.add(tts_quest)
# Wait for all quests to complete
await manager.wait()
# Remove a specific quest
await manager.remove("stt")
Timing Utilities
Class: Stopwatch
Simple timer for measuring elapsed time.
from unmute.timer import Stopwatch
# Auto-start on creation
timer = Stopwatch()
# Or start manually
timer = Stopwatch(autostart=False)
timer.start_if_not_started()
# Get elapsed time
elapsed = timer.time() # seconds
# Stop and get final time
final_time = timer.stop()
Properties and Methods
Whether the stopwatch has been started.
Get elapsed time in seconds. Returns 0.0 if not started.
Stop the timer and return elapsed time, or None if already stopped.
Class: PhasesStopwatch
Track timing for multiple sequential phases.
from unmute.timer import PhasesStopwatch
# Define phases
phases = ["stt_ttft", "llm_ttft", "tts_ttft", "total"]
timer = PhasesStopwatch(phases)
# Mark phase completion
timer.time_phase_if_not_started("stt_ttft")
# ... time passes ...
timer.time_phase_if_not_started("llm_ttft")
# Get timing for specific phase
stt_time = timer.get_time_for_phase("stt_ttft")
# Get all timings
timings = timer.phase_dict()
# {"stt_ttft": 0.05, "llm_ttft": 0.12, ...}
# Get partial timings (includes None for incomplete phases)
partial = timer.phase_dict_partial()
# {"stt_ttft": 0.05, "llm_ttft": 0.12, "tts_ttft": None, ...}
# Reset all phases
timer.reset()
Queue Utilities
Class: RealtimeQueue[T]
Priority queue that releases items based on timestamps for real-time processing.
from unmute.tts.realtime_queue import RealtimeQueue
queue = RealtimeQueue()
# Start the timer (optional, auto-starts on first put)
queue.start_if_not_started()
# Add items with release times
queue.put(audio_chunk_1, time=0.5) # Release at 0.5 seconds
queue.put(audio_chunk_2, time=1.0) # Release at 1.0 seconds
# Get items as they become ready (async iterator)
async for timestamp, item in queue.get():
print(f"Released at {timestamp}: {item}")
# Get ready items immediately (non-blocking)
for timestamp, item in queue.get_nowait():
process(item)
# Check if empty
if queue.empty():
print("No items waiting")
Used by TTS for buffering audio chunks to ensure smooth playback timing.
Service Discovery
Functions for finding and connecting to backend services with failover support.
get_instances
Get all available instances of a service.Parameters:
service_name (str) - Service name (e.g., “stt”, “tts”, “llm”)
from unmute.service_discovery import get_instances
instances = await get_instances("stt")
# ["ws://stt-1:8080", "ws://stt-2:8080"]
find_instance
Find a working instance of a service by trying connections.Parameters:
service_name (str) - Service name
client_factory (Callable) - Function to create client from URL
timeout_sec (float) - Connection timeout per attempt (default: 0.5)
max_trials (int) - Maximum connection attempts (default: 3)
Raises:
MissingServiceAtCapacity - All instances are at capacity
MissingServiceTimeout - Connection attempts timed out
from unmute.service_discovery import find_instance
from unmute.stt.speech_to_text import SpeechToText
# Find working STT instance
stt = await find_instance(
service_name="stt",
client_factory=lambda url: SpeechToText(stt_instance=url),
timeout_sec=1.0,
max_trials=5
)
async_ttl_cached
Decorator to cache async function results with TTL.Parameters:
func (Callable) - Async function to cache
ttl_sec (float) - Cache TTL in seconds (default: 0.1)
from unmute.service_discovery import async_ttl_cached
@async_ttl_cached(ttl_sec=5.0)
async def get_expensive_data(key: str) -> str:
# Expensive operation
return await fetch_from_db(key)
# First call fetches
data1 = await get_expensive_data("user123")
# Second call within 5 seconds uses cache
data2 = await get_expensive_data("user123")
Caching
get_cache
Get a cache instance (local or Redis-backed).
from unmute.cache import get_cache
# Get cache with TTL
cache = get_cache(prefix="voices", ttl_seconds=3600)
# Set value
cache.set("key", {"name": "Alice", "voice": "voice-123"})
# Get value
value = cache.get("key") # Returns dict or None
# Delete value
cache.delete("key")
# Cleanup expired entries (LocalCache only)
cache.cleanup()
Cache Implementation:
- If
KYUTAI_REDIS_URL is set, uses RedisCache for distributed caching
- Otherwise uses
LocalCache for single-instance in-memory caching
Recording
Class: Recorder
Records WebSocket session events to disk for replay and analysis.
from unmute.recorder import Recorder
from pathlib import Path
# Create recorder
recorder = Recorder(recordings_dir=Path("/tmp/recordings"))
# Record events
await recorder.add_event("client", client_event)
await recorder.add_event("server", server_event)
# Shutdown and save
await recorder.shutdown(keep_recording=True)
EventSender: "client" or "server"
Recordings are saved as JSON files in the specified directory, one file per session.
Audio Processing
ExponentialMovingAverage
Smooth signal processing with configurable attack and release times.
from unmute.stt.exponential_moving_average import ExponentialMovingAverage
# Create EMA for Voice Activity Detection
ema = ExponentialMovingAverage(
attack_time=0.05, # Fast response to speech start
release_time=0.5, # Slow decay for pauses
initial_value=0.0
)
# Update with new values
smoothed = ema.update(dt=0.01, new_value=0.8)
# Calculate decay time
time_to_threshold = ema.time_to_decay_to(value=0.1)
Used by the STT component for pause detection and VAD.
WebSocket Utilities
URL Conversion
Convert between HTTP and WebSocket URLs.
from unmute.websocket_utils import http_to_ws, ws_to_http
# Convert HTTP to WebSocket
ws_url = http_to_ws("http://localhost:8080/api")
# "ws://localhost:8080/api"
ws_url = http_to_ws("https://example.com/api")
# "wss://example.com/api"
# Convert WebSocket to HTTP
http_url = ws_to_http("ws://localhost:8080/api")
# "http://localhost:8080/api"
Usage Example
from unmute.quest_manager import Quest, QuestManager
from unmute.timer import Stopwatch, PhasesStopwatch
from unmute.service_discovery import find_instance
from unmute.stt.speech_to_text import SpeechToText
async def main():
# Set up service discovery
stt = await find_instance("stt", lambda url: SpeechToText(stt_instance=url))
# Create quest for STT service
stt_quest = Quest(
name="stt",
init=lambda: stt,
run=lambda s: s.start_up(),
close=lambda s: s.shutdown()
)
# Manage with QuestManager
manager = QuestManager()
await manager.add(stt_quest)
# Track timing
timer = PhasesStopwatch(["init", "process", "total"])
timer.time_phase_if_not_started("init")
# ... do work ...
timer.time_phase_if_not_started("process")
print(timer.phase_dict())
# Cleanup
await manager.wait()