Skip to main content

Pipeline Overview

The audio pipeline transforms platform-specific audio formats into network-ready PCM data:
┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│ Audio Device │ --> │ Format       │ --> │ Ring Buffer  │ --> │ Network      │
│ (F32/I16/U16)│     │ Normalization│     │ (F32 samples)│     │ (I16 LE PCM) │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘
     cpal                Producer              HeapRb              Consumer
  (callback)          (audio thread)        (lock-free)        (network thread)

Stage 1: Audio Capture (cpal)

Device Selection

Device enumeration (audio.rs:1209-1300) scans all available host APIs:
for host_id in cpal::available_hosts() {
    let host = cpal::host_from_id(host_id)?;
    
    // Standard input devices
    if let Ok(devices) = host.input_devices() {
        // Filter and collect device names
    }
    
    // Windows: Loopback mode (output devices)
    #[cfg(target_os = "windows")]
    if include_loopback {
        if let Ok(devices) = host.output_devices() {
            // Prefix with "[Loopback] "
        }
    }
}
Platform-Specific Filtering:
  • Linux: Excludes default, sysdefault, null, hw: (unreliable)
  • Windows: Output devices available as loopback sources
  • macOS: All input devices, requires permissions

Format Detection

The pipeline dynamically detects the best supported format (audio.rs:1015-1092):
// Priority order: F32 > I16 > U16
for format in [SampleFormat::F32, SampleFormat::I16, SampleFormat::U16] {
    for range in &supported_configs {
        if range.sample_format() == format 
           && range.channels() == 2 
           && range.min_sample_rate().0 <= sample_rate 
           && range.max_sample_rate().0 >= sample_rate {
            best_config_range = Some(range.clone());
            break;
        }
    }
}
Why F32 is preferred:
  • Native format for PipeWire (Linux)
  • Eliminates clipping on Linux (v1.8.0 improvement)
  • Internal processing uses F32 (no conversion needed)
  • Higher precision for RMS calculations

Stream Configuration

let stream_config = cpal::StreamConfig {
    channels: 2,              // Stereo only
    sample_rate: cpal::SampleRate(sample_rate), // 44100 or 48000
    buffer_size: if is_loopback {
        cpal::BufferSize::Default  // WASAPI compatibility
    } else {
        cpal::BufferSize::Fixed(buffer_size)  // 256, 512, 1024, 2048
    },
};
Buffer Size Impact:
  • Smaller = Lower latency, higher CPU usage
  • Larger = Higher latency, more stable
  • Loopback uses Default (typically 10ms on Windows)

Stage 2: Format Normalization (Producer)

AudioProcessor Design

The AudioProcessor struct (audio.rs:1108-1142) handles format-agnostic processing:
struct AudioProcessor {
    prod: Arc<Mutex<ringbuf::Producer<f32, Arc<ringbuf::HeapRb<f32>>>>>,
    app_handle: AppHandle,
}

impl AudioProcessor {
    fn process(&mut self, data: &[f32]) {
        if let Ok(mut guard) = self.prod.lock() {
            let pushed = guard.push_slice(data);
            if pushed < data.len() {
                // Buffer overflow: drop oldest samples
                emit_log(&self.app_handle, "warning", 
                    format!("Buffer overflow: Dropped {} samples", 
                            data.len() - pushed));
            }
        }
    }
}

Format-Specific Conversion

F32 Input (Passthrough)

cpal::SampleFormat::F32 => {
    let mut processor = AudioProcessor::new(prod.clone(), app_handle.clone());
    device.build_input_stream(&stream_config, 
        move |data: &[f32], _: &_| {
            processor.process(data);  // Direct write
        }, 
        err_fn, None)
}

I16 Input (Normalization)

cpal::SampleFormat::I16 => {
    device.build_input_stream(&stream_config, 
        move |data: &[i16], _: &_| {
            let mut converted = Vec::with_capacity(data.len());
            for &sample in data {
                let s_f32 = sample as f32 / 32768.0;  // Normalize to [-1.0, 1.0]
                converted.push(s_f32);
            }
            processor.process(&converted);
        }, 
        err_fn, None)
}

U16 Input (Unsigned to Signed)

cpal::SampleFormat::U16 => {
    device.build_input_stream(&stream_config, 
        move |data: &[u16], _: &_| {
            let mut converted = Vec::with_capacity(data.len());
            for &sample in data {
                // U16: 0..65535, center at 32768
                let s_f32 = (sample as f32 - 32768.0) / 32768.0;
                converted.push(s_f32);
            }
            processor.process(&converted);
        }, 
        err_fn, None)
}

Overflow Handling

When the ring buffer is full (audio.rs:1125-1138):
  1. push_slice() returns number of samples actually written
  2. If pushed < data.len(), overflow detected
  3. Rate-limited warning emitted (max 1 per 5 seconds)
  4. Oldest samples are implicitly dropped (new data has priority)
This is intentional: Better to drop old audio than introduce latency.

Stage 3: Ring Buffer (Lock-Free FIFO)

See Buffering System for detailed analysis. Key points:
  • Type: HeapRb<f32>
  • Size: Adaptive (2000-15000ms worth of samples)
  • Thread-safe without mutexes
  • Producer and consumer operate independently

Stage 4: Prefill Gate (v1.8.1)

Before transmission begins, the system waits for the buffer to fill (audio.rs:569-591):
let prefill_samples = sample_rate as usize * 1;  // 1 second

while cons.len() < prefill_samples && is_running_clone.load(Ordering::Relaxed) {
    thread::sleep(Duration::from_millis(10));
}

emit_log(&app_handle, "success", "Buffer prefilled! Starting transmission.");
Purpose: Eliminates startup stutter by ensuring a cushion of data before TCP transmission begins. Why 1000ms?
  • Absorbs initial network handshake latency
  • Prevents “cold start” dropouts
  • Small enough to avoid noticeable delay
  • Works uniformly across Windows, Linux, macOS

Stage 5: Network Transmission (Consumer)

Strict Pacing Algorithm

The consumer thread uses a tick-based pacer (audio.rs:593-638):
let tick_duration = Duration::from_micros(
    (chunk_size as u64 * 1_000_000) / sample_rate as u64
);
let mut next_tick = Instant::now();

loop {
    let now = Instant::now();
    
    // Check buffer health
    let current_buffered = cons.len();
    let high_water_mark = prefill_samples + (sample_rate as usize / 10);
    
    if current_buffered > high_water_mark {
        // DRAIN MODE: Process immediately to catch up
        next_tick = now + tick_duration;
    } else {
        // STRICT MODE: Respect the clock
        if now < next_tick {
            thread::sleep(next_tick - now);
            next_tick += tick_duration;
        } else {
            // Massive lag (>200ms)? Reset clock
            if now.duration_since(next_tick) > Duration::from_millis(200) {
                next_tick = Instant::now() + tick_duration;
            } else {
                next_tick += tick_duration;
            }
        }
    }
    
    // Read and transmit chunk
    // ...
}
Adaptive Behavior:
  • Normal: Sleeps until next scheduled tick
  • High buffer: Drains immediately to prevent overflow
  • Massive lag: Resets clock to prevent drift accumulation

F32 → I16 LE Conversion

At the network edge, F32 samples are converted to 16-bit little-endian PCM (audio.rs:772-779):
let mut payload = Vec::with_capacity(data_len as usize);
for i in 0..count {
    // Clamp to valid range
    let sample = temp_buffer[i].clamp(-1.0, 1.0);
    
    // Convert to I16 (16-bit signed integer)
    let sample_i16 = (sample * 32767.0) as i16;
    
    // Write as little-endian bytes
    payload.extend_from_slice(&sample_i16.to_le_bytes());
}
PCM Format Specification:
  • Sample Size: 16-bit signed integer
  • Byte Order: Little-endian
  • Channels: 2 (interleaved L-R-L-R…)
  • Sample Rate: 44100 or 48000 Hz
  • Bitrate: sample_rate * 2 channels * 16 bits = ~1.5 Mbps @ 48kHz

RMS Calculation (Removed in v1.6.0+)

Previous versions calculated RMS for silence detection in the audio thread. This has been removed to simplify the pipeline. The current implementation focuses on reliable transmission without silence detection in the producer.

Error Handling

Stream Errors

The error callback (audio.rs:980-990) logs cpal errors:
let err_fn = move |err| {
    emit_log(&app_handle_err, "error", 
        format!("⚠️ AUDIO STREAM ERROR: {} - This may cause stream to stop!", err));
};
Common errors:
  • Device disconnected
  • Sample rate change
  • Buffer underrun (cpal level)
  • Permission denied (microphone access)

Recovery Strategy

  1. Graceful degradation: Send silence on buffer underrun
  2. No auto-restart: Frontend must manually restart stream
  3. Network errors: Handled separately by consumer thread

Performance Optimization

Why Lock-Free Matters

// HeapRb is lock-free (no mutex)
let (prod, cons) = rb.split();

// Producer (audio thread): Never blocks
guard.push_slice(data);  // O(1) write

// Consumer (network thread): Never blocks
cons.pop_slice(&mut temp_buffer);  // O(1) read
Benefits:
  • No priority inversion
  • Deterministic latency
  • No syscalls in audio thread
  • Wait-free for producer

Memory Layout

HeapRb<f32>:
  Capacity: sample_rate * 2 * duration_ms / 1000
  
  Example (48kHz, 4000ms):
    48000 * 2 * 4000 / 1000 = 384,000 samples
    384,000 * 4 bytes (f32) = 1.46 MB

CPU Impact

  • Audio thread: ~1-2% (format conversion + ring write)
  • Network thread: ~2-3% (ring read + TCP write + stats)
  • Total: <5% on modern CPUs

Future Enhancements

  • Codec support (Opus, FLAC) for bandwidth reduction
  • Adaptive sample rate switching
  • Multi-channel support (5.1, 7.1)
  • Hardware-accelerated resampling

Build docs developers (and LLMs) love