Skip to main content
The Stream class represents an individual audio processing stream. Streams allow you to process multiple audio sources simultaneously using a single transcriber instance, sharing the model resources efficiently.

Overview

Each Stream maintains:
  • Its own transcript (separate from other streams)
  • Independent audio buffer and VAD state
  • Separate event listener list
  • Unique stream handle for event identification

Creating a Stream

Streams are created from a Transcriber instance:
from moonshine_voice import Transcriber, ModelArch

transcriber = Transcriber(
    model_path="/path/to/models",
    model_arch=ModelArch.BASE
)

# Create additional streams
stream1 = transcriber.create_stream()
stream2 = transcriber.create_stream(update_interval=0.3)

Methods

start

Begin a new transcription session on this stream.
stream.start()
Resets the stream’s transcript and prepares for new audio.

stop

End the current session.
stream.stop()
Completes any active transcript line and triggers completion events.

add_audio

Add audio data to this stream.
stream.add_audio(
    audio_data: List[float],
    sample_rate: int = 16000
)
audio_data
List[float]
required
Mono PCM audio samples as floats between -1.0 and 1.0
sample_rate
int
default:"16000"
Sample rate of the input audio in Hz

update_transcription

Manually trigger a transcription update.
transcript = stream.update_transcription(
    flags: int = 0
) -> Transcript
flags
int
default:"0"
Use Transcriber.MOONSHINE_FLAG_FORCE_UPDATE to bypass caching
Returns: Current Transcript for this stream

add_listener

Add an event listener to this stream.
stream.add_listener(listener: TranscriptEventListener)
listener
TranscriptEventListener
required
Object implementing the event listener protocol

remove_listener

Remove an event listener.
stream.remove_listener(listener: TranscriptEventListener)

Example: Processing Multiple Microphones

from moonshine_voice import Transcriber, ModelArch, TranscriptEventListener
import sounddevice as sd
import threading

class StreamListener(TranscriptEventListener):
    def __init__(self, name):
        self.name = name
    
    def on_line_completed(self, event):
        print(f"[{self.name}] {event.line.text}")

# Single transcriber for both streams
transcriber = Transcriber(
    model_path="/path/to/models",
    model_arch=ModelArch.TINY_STREAMING
)

# Create two streams for two microphones
stream1 = transcriber.create_stream()
stream2 = transcriber.create_stream()

stream1.add_listener(StreamListener("Mic 1"))
stream2.add_listener(StreamListener("Mic 2"))

# Start both streams
stream1.start()
stream2.start()

# Capture from device 1
def capture_device_1():
    def callback(indata, frames, time, status):
        audio = indata[:, 0].copy()  # Get mono
        stream1.add_audio(audio.tolist(), 16000)
    
    with sd.InputStream(device=1, channels=1, samplerate=16000, callback=callback):
        input("Press Enter to stop device 1\n")

# Capture from device 2
def capture_device_2():
    def callback(indata, frames, time, status):
        audio = indata[:, 0].copy()
        stream2.add_audio(audio.tolist(), 16000)
    
    with sd.InputStream(device=2, channels=1, samplerate=16000, callback=callback):
        input("Press Enter to stop device 2\n")

# Run both captures
thread1 = threading.Thread(target=capture_device_1)
thread2 = threading.Thread(target=capture_device_2)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

stream1.stop()
stream2.stop()

Example: System Audio + Microphone

# Process both system audio and microphone simultaneously
transcriber = Transcriber(
    model_path="/path/to/models",
    model_arch=ModelArch.SMALL_STREAMING
)

# Stream 1: User's microphone
user_stream = transcriber.create_stream()
user_stream.add_listener(StreamListener("User"))
user_stream.start()

# Stream 2: Remote participant's audio
remote_stream = transcriber.create_stream()
remote_stream.add_listener(StreamListener("Remote"))
remote_stream.start()

# Feed audio from different sources
user_stream.add_audio(user_audio_chunk, 48000)
remote_stream.add_audio(remote_audio_chunk, 16000)

Stream Handle

Each stream has a unique handle that appears in events:
class MyListener(TranscriptEventListener):
    def on_line_completed(self, event):
        print(f"Stream {event.stream_handle}: {event.line.text}")
This allows a single listener to process events from multiple streams.

Resource Sharing

All streams created from the same Transcriber share:
  • Model weights (loaded once in memory)
  • ONNX Runtime session
  • Model configuration
Each stream has its own:
  • Audio buffer and VAD state
  • Transcript and line tracking
  • Event listener list
  • Speaker identification state

Performance Considerations

Processing is serialized across streams - they don’t run in parallel. Each add_audio() call is processed before the next.
Creating many streams (10+) may impact latency as each needs processing time.
For best performance with multiple streams, use streaming models (TINY_STREAMING, SMALL_STREAMING, MEDIUM_STREAMING).

Thread Safety

Streams are thread-safe:
  • Can call add_audio() from different threads
  • Calls are serialized internally
  • Event listeners are called from the same thread as add_audio()

See Also

Build docs developers (and LLMs) love