Moonshine Voice’s streaming architecture is optimized for real-time voice applications with low latency. This guide covers advanced streaming patterns and concepts.
How Streaming Works
Streaming models in Moonshine perform incremental processing:
- Audio arrives in chunks - Add audio as it’s captured
- Encoder caching - Input encoding is cached between updates
- Decoder state - Partial decoder state is preserved
- Incremental updates - Only new audio is fully processed
- Event emission - Listeners are notified of changes
Streaming models do most of their work while the user is still speaking, resulting in much lower response latency compared to traditional batch processing.
Stream Objects
A Stream handles audio input for transcription. Transcribers can manage multiple streams simultaneously without duplicating model resources.
Creating Streams
from moonshine_voice import Transcriber, ModelArch
transcriber = Transcriber(
model_path=model_path,
model_arch=ModelArch.SMALL_STREAMING
)
# Create a stream with default settings
stream = transcriber.create_stream()
# Create a stream with custom update interval
stream = transcriber.create_stream(update_interval=0.3)
Using the Default Stream
The transcriber provides convenience methods that operate on a default stream:
# These use the default stream internally
transcriber.start()
transcriber.add_audio(audio_data, sample_rate)
transcriber.stop()
# Equivalent to:
stream = transcriber.get_default_stream()
stream.start()
stream.add_audio(audio_data, sample_rate)
stream.stop()
Multiple Concurrent Streams
Process multiple audio sources simultaneously with shared model resources:
from moonshine_voice import Transcriber, TranscriptEventListener
class StreamListener(TranscriptEventListener):
def __init__(self, stream_name):
self.name = stream_name
def on_line_completed(self, event):
print(f"[{self.name}] {event.line.text}")
# Create transcriber
transcriber = Transcriber(
model_path=model_path,
model_arch=model_arch
)
# Create multiple streams
stream1 = transcriber.create_stream()
stream2 = transcriber.create_stream()
stream3 = transcriber.create_stream()
# Add listeners
stream1.add_listener(StreamListener("Mic 1"))
stream2.add_listener(StreamListener("Mic 2"))
stream3.add_listener(StreamListener("Mic 3"))
# Start all streams
stream1.start()
stream2.start()
stream3.start()
# Feed audio from different sources
stream1.add_audio(mic1_audio, 16000)
stream2.add_audio(mic2_audio, 16000)
stream3.add_audio(phone_audio, 8000) # Different sample rates OK
# Stop and clean up
stream1.stop()
stream2.stop()
stream3.stop()
stream1.close()
stream2.close()
stream3.close()
transcriber.close()
Multiple streams share the model weights in memory, making multi-stream processing memory-efficient.
Update Intervals
The update interval controls how often transcription updates occur during streaming:
# Fast updates (200ms) - more responsive, higher compute
stream = transcriber.create_stream(update_interval=0.2)
# Standard updates (500ms) - balanced
stream = transcriber.create_stream(update_interval=0.5)
# Slow updates (1s) - less compute, delayed feedback
stream = transcriber.create_stream(update_interval=1.0)
# Very large interval - only get start/complete events
stream = transcriber.create_stream(update_interval=100.0)
Update Timing
class TimingListener(TranscriptEventListener):
def on_line_text_changed(self, event):
latency_ms = event.line.last_transcription_latency_ms
print(f"Update latency: {latency_ms:.0f}ms")
def on_line_completed(self, event):
latency_ms = event.line.last_transcription_latency_ms
duration = event.line.duration
print(f"Completed in {latency_ms:.0f}ms (audio: {duration:.2f}s)")
For streaming models, the update interval has minimal impact on overall latency because most processing happens incrementally as audio arrives.
Manual Transcription Updates
Trigger transcription updates manually:
stream.start()
# Add audio chunks
for chunk in audio_chunks:
stream.add_audio(chunk, sample_rate)
# Force an update immediately
transcript = stream.update_transcription(
flags=Transcriber.MOONSHINE_FLAG_FORCE_UPDATE
)
# Process transcript
for line in transcript.lines:
print(line.text)
stream.stop()
Streams accept audio in flexible formats:
Chunk Size
# Small chunks (low latency)
chunk_duration = 0.05 # 50ms
chunk_size = int(chunk_duration * sample_rate)
# Medium chunks (balanced)
chunk_duration = 0.1 # 100ms
chunk_size = int(chunk_duration * sample_rate)
# Large chunks (higher throughput)
chunk_duration = 0.5 # 500ms
chunk_size = int(chunk_duration * sample_rate)
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i + chunk_size]
stream.add_audio(chunk, sample_rate)
Chunk size doesn’t significantly affect transcription quality. Choose based on your application’s latency requirements.
Sample Rate Conversion
# Library handles arbitrary sample rates automatically
stream.add_audio(audio_8khz, 8000) # Telephone quality
stream.add_audio(audio_16khz, 16000) # Standard speech
stream.add_audio(audio_44khz, 44100) # CD quality
stream.add_audio(audio_48khz, 48000) # Professional audio
- Channels: Mono (single channel)
- Format: Float32 PCM, values between -1.0 and 1.0
- Sample Rate: Any rate (16kHz recommended)
import numpy as np
# Convert stereo to mono
if audio_stereo.ndim == 2:
audio_mono = np.mean(audio_stereo, axis=1)
# Convert int16 to float32
if audio_int16.dtype == np.int16:
audio_float = audio_int16.astype(np.float32) / 32768.0
# Ensure range [-1.0, 1.0]
audio_normalized = np.clip(audio_float, -1.0, 1.0)
stream.add_audio(audio_normalized, sample_rate)
Session Management
Each start/stop cycle creates a new session:
# Session 1
stream.start()
stream.add_audio(audio1, sample_rate)
transcript1 = stream.stop() # Returns final transcript
# Transcript is reset here
# Session 2 - fresh transcript
stream.start()
stream.add_audio(audio2, sample_rate)
transcript2 = stream.stop()
# transcript1 and transcript2 are independent
Transcripts are cleared on every start() call. Save any data you need before starting a new session.
Event Flow Guarantees
The streaming event system provides these guarantees:
- LineStarted is called exactly once per segment
- LineCompleted is called exactly once per segment
- LineUpdated / LineTextChanged only occur between start and complete
- Only one line is active at a time per stream
- Completed lines never change
- Each line has a unique
lineId
Event Ordering Example
class OrderListener(TranscriptEventListener):
def on_line_started(self, event):
print(f"1. STARTED: line_id={event.line.line_id}")
def on_line_text_changed(self, event):
print(f"2. TEXT: '{event.line.text}'")
def on_line_updated(self, event):
print(f"3. UPDATED: duration={event.line.duration:.2f}s")
def on_line_completed(self, event):
print(f"4. COMPLETED: '{event.line.text}'")
print(f" Final line_id={event.line.line_id}\n")
Output:
1. STARTED: line_id=12345
2. TEXT: 'Hello'
3. UPDATED: duration=0.30s
2. TEXT: 'Hello world'
3. UPDATED: duration=0.65s
4. COMPLETED: 'Hello world'
Final line_id=12345
1. STARTED: line_id=12346
...
Streaming vs Non-Streaming Models
Streaming Models
TINY_STREAMING (34M params)
SMALL_STREAMING (123M params)
MEDIUM_STREAMING (245M params)
Benefits:
- Incremental processing with caching
- Very low latency for live speech
- Efficient for real-time applications
Non-Streaming Models
TINY (26M params)
BASE (58M params)
Benefits:
- Slightly smaller model size
- Good for batch/offline processing
# For real-time applications, prefer streaming models
transcriber = Transcriber(
model_path=streaming_model_path,
model_arch=ModelArch.SMALL_STREAMING # Recommended
)
# For batch processing, either works
transcriber = Transcriber(
model_path=base_model_path,
model_arch=ModelArch.BASE
)
Advanced: Stream State Tracking
class StatefulListener(TranscriptEventListener):
def __init__(self):
self.active_lines = {} # stream_handle -> line_id
self.completed_lines = [] # List of completed line_ids
def on_line_started(self, event):
# Track active line for this stream
self.active_lines[event.stream_handle] = event.line.line_id
print(f"Stream {event.stream_handle}: started line {event.line.line_id}")
def on_line_completed(self, event):
# Move from active to completed
line_id = self.active_lines.pop(event.stream_handle, None)
if line_id:
self.completed_lines.append(line_id)
print(f"Stream {event.stream_handle}: completed {len(self.completed_lines)} lines total")
def get_active_count(self):
return len(self.active_lines)
def get_completed_count(self):
return len(self.completed_lines)
listener = StatefulListener()
stream.add_listener(listener)
# Later...
print(f"Active: {listener.get_active_count()}")
print(f"Completed: {listener.get_completed_count()}")
Context Manager Pattern
# Automatic cleanup with context managers
with transcriber.create_stream() as stream:
stream.start()
for audio_chunk in audio_source:
stream.add_audio(audio_chunk, sample_rate)
stream.stop()
# Stream automatically closed
import time
class PerfListener(TranscriptEventListener):
def __init__(self):
self.start_time = None
self.update_count = 0
def on_line_started(self, event):
self.start_time = time.time()
self.update_count = 0
def on_line_text_changed(self, event):
self.update_count += 1
elapsed = time.time() - self.start_time
print(f"Update #{self.update_count} at {elapsed:.2f}s: {event.line.text}")
def on_line_completed(self, event):
elapsed = time.time() - self.start_time
latency = event.line.last_transcription_latency_ms
print(f"Completed after {elapsed:.2f}s (final latency: {latency:.0f}ms)")
See Also