Moonshine Voice includes built-in speaker identification (diarization) to distinguish between different speakers in audio. This feature automatically assigns speaker IDs to transcript lines.
How It Works
Speaker identification analyzes voice characteristics to determine:
- When a new speaker starts talking
- Which speaker is currently speaking
- A unique ID for each speaker
- A sequential index for easy labeling (“Speaker 1”, “Speaker 2”, etc.)
Speaker identification is enabled by default. It runs automatically during transcription and adds minimal overhead to processing time.
Basic Usage
Access speaker information from transcript lines:
from moonshine_voice import (
Transcriber,
TranscriptEventListener,
LineCompleted,
ModelArch
)
class SpeakerListener(TranscriptEventListener):
def on_line_completed(self, event: LineCompleted):
line = event.line
if line.has_speaker_id:
speaker_num = line.speaker_index
print(f"Speaker {speaker_num}: {line.text}")
else:
print(f"Unknown: {line.text}")
transcriber = Transcriber(
model_path=model_path,
model_arch=ModelArch.TINY_STREAMING
)
transcriber.add_listener(SpeakerListener())
Each TranscriptLine contains speaker information:
class TranscriptLine:
has_speaker_id: bool # Whether speaker was identified
speaker_id: str # Unique 64-bit speaker identifier
speaker_index: int # Sequential number (0, 1, 2, ...)
text: str # Transcribed text
Example: Accessing speaker data
class DetailedSpeakerListener(TranscriptEventListener):
def on_line_completed(self, event):
line = event.line
print(f"Has speaker ID: {line.has_speaker_id}")
if line.has_speaker_id:
print(f"Speaker ID: {line.speaker_id}")
print(f"Speaker index: {line.speaker_index}")
print(f"Text: {line.text}")
Basic Speaker Prefix
def format_with_speaker(line):
if line.has_speaker_id:
return f"Speaker {line.speaker_index}: {line.text}"
return line.text
class FormattingListener(TranscriptEventListener):
def on_line_completed(self, event):
print(format_with_speaker(event.line))
Named Speakers
class NamedSpeakerListener(TranscriptEventListener):
def __init__(self):
self.speaker_names = {}
self.default_names = ["Alice", "Bob", "Carol", "Dave"]
def get_speaker_name(self, speaker_index):
if speaker_index not in self.speaker_names:
if speaker_index < len(self.default_names):
name = self.default_names[speaker_index]
else:
name = f"Speaker {speaker_index + 1}"
self.speaker_names[speaker_index] = name
return self.speaker_names[speaker_index]
def on_line_completed(self, event):
line = event.line
if line.has_speaker_id:
name = self.get_speaker_name(line.speaker_index)
print(f"{name}: {line.text}")
else:
print(line.text)
Terminal Display with Speaker Colors
import sys
class ColoredSpeakerListener(TranscriptEventListener):
# ANSI color codes
COLORS = [
'\033[91m', # Red
'\033[92m', # Green
'\033[93m', # Yellow
'\033[94m', # Blue
'\033[95m', # Magenta
'\033[96m', # Cyan
]
RESET = '\033[0m'
def __init__(self):
self.last_line_length = 0
def get_color(self, speaker_index):
return self.COLORS[speaker_index % len(self.COLORS)]
def on_line_text_changed(self, event):
line = event.line
if line.has_speaker_id:
color = self.get_color(line.speaker_index)
prefix = f"{color}Speaker {line.speaker_index}{self.RESET}: "
else:
prefix = ""
text = f"\r{prefix}{line.text}"
print(text, end="", flush=True)
# Clear any remaining characters
if len(text) < self.last_line_length:
print(" " * (self.last_line_length - len(text)), end="", flush=True)
self.last_line_length = len(text)
def on_line_completed(self, event):
self.on_line_text_changed(event)
print() # New line
self.last_line_length = 0
Disabling Speaker Identification
Disable speaker ID if you don’t need it:
options = {
"identify_speakers": "false"
}
transcriber = Transcriber(
model_path=model_path,
model_arch=model_arch,
options=options
)
Disabling speaker identification provides a minor performance improvement, but the overhead is minimal.
Speaker ID Persistence
The speaker_id field is a unique 64-bit integer designed for storage:
import json
class SpeakerDatabase:
def __init__(self):
self.speakers = {} # speaker_id -> speaker_info
def register_speaker(self, line):
if not line.has_speaker_id:
return
speaker_id = line.speaker_id
if speaker_id not in self.speakers:
self.speakers[speaker_id] = {
"id": speaker_id,
"first_seen": line.start_time,
"utterances": []
}
self.speakers[speaker_id]["utterances"].append({
"text": line.text,
"time": line.start_time,
"duration": line.duration
})
def save(self, filepath):
with open(filepath, 'w') as f:
json.dump(self.speakers, f, indent=2)
def load(self, filepath):
with open(filepath, 'r') as f:
self.speakers = json.load(f)
class StorageListener(TranscriptEventListener):
def __init__(self, database):
self.db = database
def on_line_completed(self, event):
self.db.register_speaker(event.line)
# Usage
db = SpeakerDatabase()
transcriber.add_listener(StorageListener(db))
# After transcription
db.save("speakers.json")
Speaker Transitions
Detect when speakers change:
class TransitionListener(TranscriptEventListener):
def __init__(self):
self.current_speaker = None
def on_line_completed(self, event):
line = event.line
if line.has_speaker_id:
speaker_index = line.speaker_index
# Detect speaker change
if self.current_speaker is None:
print(f"\n[Speaker {speaker_index} begins]")
elif self.current_speaker != speaker_index:
print(f"\n[Speaker {self.current_speaker} -> Speaker {speaker_index}]")
self.current_speaker = speaker_index
print(f"Speaker {speaker_index}: {line.text}")
else:
print(line.text)
Multi-Speaker Statistics
class SpeakerStats(TranscriptEventListener):
def __init__(self):
self.stats = {} # speaker_index -> stats
def on_line_completed(self, event):
line = event.line
if not line.has_speaker_id:
return
idx = line.speaker_index
if idx not in self.stats:
self.stats[idx] = {
"utterances": 0,
"total_duration": 0.0,
"word_count": 0
}
self.stats[idx]["utterances"] += 1
self.stats[idx]["total_duration"] += line.duration
self.stats[idx]["word_count"] += len(line.text.split())
def print_summary(self):
print("\n=== Speaker Statistics ===")
for idx in sorted(self.stats.keys()):
s = self.stats[idx]
avg_duration = s["total_duration"] / s["utterances"]
print(f"\nSpeaker {idx}:")
print(f" Utterances: {s['utterances']}")
print(f" Total time: {s['total_duration']:.1f}s")
print(f" Avg utterance: {avg_duration:.2f}s")
print(f" Words spoken: {s['word_count']}")
# Usage
stats = SpeakerStats()
transcriber.add_listener(stats)
# After transcription
stats.print_summary()
Command Line Usage
# Default behavior (speaker IDs enabled)
python -m moonshine_voice.transcriber --language en --wav-path meeting.wav
# Hide speaker IDs from output
python -m moonshine_voice.transcriber --language en --no-speaker-ids
# Process with microphone
python -m moonshine_voice.mic_transcriber --language en
Limitations and Accuracy
Speaker identification is experimental and may not be reliable enough for all applications. Accuracy depends on:
- Audio quality
- Number of speakers
- Speaker voice similarity
- Background noise
Best Practices
- Use high-quality audio (16kHz or higher sample rate)
- Minimize background noise
- Allow clear speaker transitions
- Test with your specific use case
Combining with Intent Recognition
from moonshine_voice import IntentRecognizer
class MultiSpeakerIntentListener(TranscriptEventListener):
def __init__(self, intent_recognizer):
self.recognizer = intent_recognizer
self.speaker_intents = {} # speaker_index -> list of intents
def on_line_completed(self, event):
line = event.line
# Display with speaker label
if line.has_speaker_id:
speaker = line.speaker_index
print(f"Speaker {speaker}: {line.text}")
# Track which speakers trigger which intents
# (IntentRecognizer already processes the line as a listener)
if speaker not in self.speaker_intents:
self.speaker_intents[speaker] = []
else:
print(line.text)
# Setup
intent_recognizer = IntentRecognizer(embedding_path, embedding_arch)
intent_recognizer.register_intent("turn on lights", lambda t, u, s: print("💡 ON"))
listener = MultiSpeakerIntentListener(intent_recognizer)
mic_transcriber.add_listener(listener)
mic_transcriber.add_listener(intent_recognizer)
See Also