Skip to main content

Overview

The SpeechToText class provides an async interface to the Unmute STT server via WebSocket. It streams audio data and receives real-time transcription along with Voice Activity Detection (VAD) pause predictions.

Class Definition

class SpeechToText(ServiceWithStartup)

Constructor

def __init__(
    self,
    stt_instance: str = STT_SERVER,
    delay_sec: float = STT_DELAY_SEC
)
Initializes the speech-to-text client.
stt_instance
str
default:"STT_SERVER"
URL of the STT server instance
delay_sec
float
default:"STT_DELAY_SEC"
Processing delay in seconds for the STT pipeline

Properties

pause_prediction

pause_prediction: ExponentialMovingAverage
Exponential moving average of pause prediction scores (0-1 range). Higher values indicate more confidence that the user has paused speaking. Configured with:
  • attack_time: 0.01 seconds
  • release_time: 0.01 seconds
  • initial_value: 1.0

sent_samples

sent_samples: int
Total number of audio samples sent to the STT server.

current_time

current_time: float
Current processing time in seconds, accounting for the STT delay.

received_words

received_words: int
Count of words received from the STT server.

Core Methods

send_audio

async def send_audio(self, audio: np.ndarray) -> None
Sends audio data to the STT server for transcription.
audio
np.ndarray
required
1D numpy array of audio samples (float32 format)
Raises:
  • ValueError: If audio is not a 1D array
Notes:
  • Automatically converts audio to float32 if needed
  • Updates sent_samples counter
  • Increments metrics for monitoring

send_marker

async def send_marker(self, id: int) -> None
Sends a marker message to the STT server for synchronization.
id
int
required
Unique marker identifier

start_up

async def start_up()
Establishes WebSocket connection to the STT server and waits for ready signal. Raises:
  • MissingServiceAtCapacity: If STT server is at capacity
  • RuntimeError: If unexpected message type received during startup

shutdown

async def shutdown()
Closes the WebSocket connection and records session metrics. Metrics recorded:
  • Session duration
  • Total audio duration
  • Number of words transcribed

state

def state(self) -> WebsocketState
Returns the current WebSocket connection state. Returns: Literal["not_created", "connecting", "connected", "closing", "closed"]

Async Iterator

aiter

async def __aiter__(self) -> AsyncIterator[STTWordMessage | STTMarkerMessage]
Iterates over messages received from the STT server. Yields:
  • STTWordMessage: Transcribed word with timing information
  • STTMarkerMessage: Synchronization marker
Example:
async for message in stt:
    if isinstance(message, STTWordMessage):
        print(f"Word: {message.text} at {message.start_time}s")
    elif isinstance(message, STTMarkerMessage):
        print(f"Marker: {message.id}")

Message Types

STTWordMessage

class STTWordMessage(BaseModel):
    type: Literal["Word"]
    text: str
    start_time: float
Represents a transcribed word or phrase.

STTMarkerMessage

class STTMarkerMessage(BaseModel):
    type: Literal["Marker"]
    id: int
Synchronization marker echoed back from the server.

STTStepMessage

class STTStepMessage(BaseModel):
    type: Literal["Step"]
    step_idx: int
    prs: list[float]
Processing step update with pause prediction scores.

STTErrorMessage

class STTErrorMessage(BaseModel):
    type: Literal["Error"]
    message: str
Error message from the server.

STTReadyMessage

class STTReadyMessage(BaseModel):
    type: Literal["Ready"]
Server ready signal.

Example Usage

import asyncio
import numpy as np
from unmute.stt.speech_to_text import SpeechToText, STTWordMessage

async def transcribe_audio():
    stt = SpeechToText()
    
    try:
        # Connect to STT server
        await stt.start_up()
        print(f"STT state: {stt.state()}")
        
        # Start receiving transcriptions
        async def receive_transcriptions():
            async for message in stt:
                if isinstance(message, STTWordMessage):
                    print(f"[{message.start_time:.2f}s] {message.text}")
        
        # Send audio in a separate task
        async def send_audio_stream():
            # Example: send 100ms chunks of audio
            chunk_size = int(24000 * 0.1)  # 100ms at 24kHz
            
            for _ in range(100):
                audio = np.random.randn(chunk_size).astype(np.float32)
                await stt.send_audio(audio)
                await asyncio.sleep(0.1)
        
        # Run both tasks concurrently
        await asyncio.gather(
            receive_transcriptions(),
            send_audio_stream()
        )
        
    finally:
        await stt.shutdown()

asyncio.run(transcribe_audio())

Advanced Usage: Pause Detection

async def monitor_pause_detection():
    stt = SpeechToText()
    await stt.start_up()
    
    # Monitor pause prediction score
    while True:
        pause_score = stt.pause_prediction.value
        
        if pause_score > 0.6:
            print("User likely paused speaking")
        elif pause_score < 0.4:
            print("User actively speaking")
        
        await asyncio.sleep(0.1)

Metrics

The class automatically tracks the following metrics:
  • STT_ACTIVE_SESSIONS: Active transcription sessions
  • STT_SENT_FRAMES: Audio frames sent to server
  • STT_RECV_FRAMES: Processing steps received
  • STT_RECV_WORDS: Words transcribed
  • STT_TTFT: Time to first token (transcription)
  • STT_SESSION_DURATION: Total session duration
  • STT_AUDIO_DURATION: Total audio processed
  • STT_NUM_WORDS: Total words per session

Notes

  • Audio is expected at 24kHz sample rate
  • The STT pipeline has an inherent delay (configurable via delay_sec)
  • Pause predictions are smoothed using exponential moving average
  • First 12 processing steps are ignored for pause prediction to avoid initial noise
  • Connection is automatically closed when iteration completes

Build docs developers (and LLMs) love