Overview
The SpeechToText class provides an async interface to the Unmute STT server via WebSocket. It streams audio data and receives real-time transcription along with Voice Activity Detection (VAD) pause predictions.
Class Definition
class SpeechToText(ServiceWithStartup)
Constructor
def __init__(
self,
stt_instance: str = STT_SERVER,
delay_sec: float = STT_DELAY_SEC
)
Initializes the speech-to-text client.
URL of the STT server instance
delay_sec
float
default:"STT_DELAY_SEC"
Processing delay in seconds for the STT pipeline
Properties
pause_prediction
pause_prediction: ExponentialMovingAverage
Exponential moving average of pause prediction scores (0-1 range). Higher values indicate more confidence that the user has paused speaking.
Configured with:
attack_time: 0.01 seconds
release_time: 0.01 seconds
initial_value: 1.0
sent_samples
Total number of audio samples sent to the STT server.
current_time
Current processing time in seconds, accounting for the STT delay.
received_words
Count of words received from the STT server.
Core Methods
send_audio
async def send_audio(self, audio: np.ndarray) -> None
Sends audio data to the STT server for transcription.
1D numpy array of audio samples (float32 format)
Raises:
ValueError: If audio is not a 1D array
Notes:
- Automatically converts audio to float32 if needed
- Updates
sent_samples counter
- Increments metrics for monitoring
send_marker
async def send_marker(self, id: int) -> None
Sends a marker message to the STT server for synchronization.
start_up
Establishes WebSocket connection to the STT server and waits for ready signal.
Raises:
MissingServiceAtCapacity: If STT server is at capacity
RuntimeError: If unexpected message type received during startup
shutdown
Closes the WebSocket connection and records session metrics.
Metrics recorded:
- Session duration
- Total audio duration
- Number of words transcribed
state
def state(self) -> WebsocketState
Returns the current WebSocket connection state.
Returns: Literal["not_created", "connecting", "connected", "closing", "closed"]
Async Iterator
aiter
async def __aiter__(self) -> AsyncIterator[STTWordMessage | STTMarkerMessage]
Iterates over messages received from the STT server.
Yields:
STTWordMessage: Transcribed word with timing information
STTMarkerMessage: Synchronization marker
Example:
async for message in stt:
if isinstance(message, STTWordMessage):
print(f"Word: {message.text} at {message.start_time}s")
elif isinstance(message, STTMarkerMessage):
print(f"Marker: {message.id}")
Message Types
STTWordMessage
class STTWordMessage(BaseModel):
type: Literal["Word"]
text: str
start_time: float
Represents a transcribed word or phrase.
STTMarkerMessage
class STTMarkerMessage(BaseModel):
type: Literal["Marker"]
id: int
Synchronization marker echoed back from the server.
STTStepMessage
class STTStepMessage(BaseModel):
type: Literal["Step"]
step_idx: int
prs: list[float]
Processing step update with pause prediction scores.
STTErrorMessage
class STTErrorMessage(BaseModel):
type: Literal["Error"]
message: str
Error message from the server.
STTReadyMessage
class STTReadyMessage(BaseModel):
type: Literal["Ready"]
Server ready signal.
Example Usage
import asyncio
import numpy as np
from unmute.stt.speech_to_text import SpeechToText, STTWordMessage
async def transcribe_audio():
stt = SpeechToText()
try:
# Connect to STT server
await stt.start_up()
print(f"STT state: {stt.state()}")
# Start receiving transcriptions
async def receive_transcriptions():
async for message in stt:
if isinstance(message, STTWordMessage):
print(f"[{message.start_time:.2f}s] {message.text}")
# Send audio in a separate task
async def send_audio_stream():
# Example: send 100ms chunks of audio
chunk_size = int(24000 * 0.1) # 100ms at 24kHz
for _ in range(100):
audio = np.random.randn(chunk_size).astype(np.float32)
await stt.send_audio(audio)
await asyncio.sleep(0.1)
# Run both tasks concurrently
await asyncio.gather(
receive_transcriptions(),
send_audio_stream()
)
finally:
await stt.shutdown()
asyncio.run(transcribe_audio())
Advanced Usage: Pause Detection
async def monitor_pause_detection():
stt = SpeechToText()
await stt.start_up()
# Monitor pause prediction score
while True:
pause_score = stt.pause_prediction.value
if pause_score > 0.6:
print("User likely paused speaking")
elif pause_score < 0.4:
print("User actively speaking")
await asyncio.sleep(0.1)
Metrics
The class automatically tracks the following metrics:
STT_ACTIVE_SESSIONS: Active transcription sessions
STT_SENT_FRAMES: Audio frames sent to server
STT_RECV_FRAMES: Processing steps received
STT_RECV_WORDS: Words transcribed
STT_TTFT: Time to first token (transcription)
STT_SESSION_DURATION: Total session duration
STT_AUDIO_DURATION: Total audio processed
STT_NUM_WORDS: Total words per session
Notes
- Audio is expected at 24kHz sample rate
- The STT pipeline has an inherent delay (configurable via
delay_sec)
- Pause predictions are smoothed using exponential moving average
- First 12 processing steps are ignored for pause prediction to avoid initial noise
- Connection is automatically closed when iteration completes