Skip to main content

Lock-Free Ring Buffer (HeapRb)

The buffering system uses ringbuf::HeapRb<f32> as a lock-free FIFO queue between audio and network threads.

Design Overview

┌─────────────────────────────────────────────────────────────┐
│                    Ring Buffer (HeapRb<f32>)                │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │  [  f32  |  f32  |  f32  |  ...  |  f32  |  f32  ]  │  │
│  └──────────────────────────────────────────────────────┘  │
│       ▲                                        ▲            │
│       │                                        │            │
│   Write Head                               Read Head       │
│   (Producer)                              (Consumer)        │
│                                                             │
│  Capacity: sample_rate * 2 * duration_ms / 1000             │
│  Type: f32 (32-bit floating point, normalized to ±1.0)     │
│  Thread Safety: Lock-free (split into prod/cons)           │
└─────────────────────────────────────────────────────────────┘
         │                                    |
         │                                    |
    Audio Thread                        Network Thread
   (Producer)                          (Consumer)

Initialization

use ringbuf::HeapRb;

// Calculate size based on duration
let ring_buffer_size = (sample_rate as usize) * 2 * (duration_ms as usize) / 1000;

// Create heap-allocated ring buffer
let rb = HeapRb::<f32>::new(ring_buffer_size);

// Split into producer and consumer (lock-free)
let (prod, cons) = rb.split();
Size Calculation:
For 48kHz, 4000ms:
  48000 samples/sec * 2 channels * 4000ms / 1000 = 384,000 samples
  384,000 samples * 4 bytes/sample (f32) = 1.46 MB

Smart Device-Aware Sizing

The buffer size is automatically adjusted based on device type (audio.rs:437-466):
let adjusted_ring_buffer_duration_ms = if is_loopback {
    // WASAPI Loopback: Larger default (8000ms)
    // Handles: WiFi jitter + CPU throttling + WASAPI timing variability
    8000.max(ring_buffer_duration_ms)
} else {
    // Standard Input/VB Cable: 5000ms default for WiFi tolerance
    5000.max(ring_buffer_duration_ms)
};

let buffer_size_mb = (ring_buffer_size * 2) as f32 / (1024.0 * 1024.0);
emit_log(&app_handle, "info", 
    format!("Ring buffer: {}ms ({:.2}MB) - Device type: {}",
            adjusted_ring_buffer_duration_ms, buffer_size_mb,
            if is_loopback { "WASAPI Loopback" } else { "Standard Input" }));
Why different sizes?
  • WASAPI Loopback (8000ms):
    • More timing unpredictability
    • Affected by CPU power management
    • WiFi jitter on laptops
    • Adaptive range: 4000-12000ms
  • Standard Input (5000ms):
    • More predictable timing
    • Direct hardware access
    • WiFi tolerance
    • Adaptive range: 2000-6000ms

Lock-Free Operations

Producer (Audio Thread)

impl AudioProcessor {
    fn process(&mut self, data: &[f32]) {
        if let Ok(mut guard) = self.prod.lock() {  // Uncontended mutex (safe)
            let pushed = guard.push_slice(data);
            if pushed < data.len() {
                // Overflow: oldest samples dropped
                emit_log(&self.app_handle, "warning", 
                    format!("Buffer overflow: Dropped {} samples", data.len() - pushed));
            }
        }
    }
}
Characteristics:
  • Non-blocking: push_slice() never waits
  • Overflow behavior: Returns number of samples written
  • Performance: O(1) time complexity
  • Thread: Runs in cpal’s audio callback (real-time constraints)

Consumer (Network Thread)

let required_samples = temp_buffer.len();
let count = if cons.len() >= required_samples {
    cons.pop_slice(&mut temp_buffer)  // Read actual data
} else {
    // Starvation: Generate silence
    temp_buffer.fill(0.0);
    emit_log(&app_handle_net, "debug", 
        format!("Starvation: Generating silence chunk (Buffer < {})", required_samples));
    required_samples
};
Characteristics:
  • Non-blocking: Returns immediately
  • Underflow behavior: Fills buffer with silence (zeros)
  • Performance: O(1) time complexity
  • Thread: Network thread (lower priority)

Why Mutex on Producer?

The producer is wrapped in Arc<Mutex<...>> (audio.rs:1095):
let prod = Arc::new(Mutex::new(prod));
Reason: cpal requires static callbacks, but we need to support multiple format conversions (F32/I16/U16). The mutex:
  • Is uncontended (single audio thread)
  • Allows sharing producer across different closures
  • Doesn’t block (lock is always available)
  • Alternative would be separate producer instances per format (less clean)

Adaptive Buffer Sizing

Jitter-Based Resizing

Every 10 seconds, the system checks jitter and adjusts target buffer size (audio.rs:892-955):
const BUFFER_CHECK_INTERVAL_SECS: u64 = 10;

if enable_adaptive_buffer && last_buffer_check.elapsed() >= Duration::from_secs(BUFFER_CHECK_INTERVAL_SECS) {
    // Determine target based on jitter
    let target_buffer_ms = if jitter_avg < 5.0 {
        // Low jitter - can use smaller buffer
        adaptive_min_ms
    } else if jitter_avg > 15.0 {
        // High jitter - need larger buffer
        adaptive_max_ms
    } else {
        // Medium jitter - scale linearly
        let jitter_ratio = (jitter_avg - 5.0) / 10.0;  // 0.0 to 1.0
        let buffer_range = adaptive_max_ms - adaptive_min_ms;
        adaptive_min_ms + (buffer_range as f32 * jitter_ratio) as u32
    };
    
    // Only resize if change is significant (>10%)
    let size_diff_pct = ((target_buffer_ms as f32 - current_buffer_ms as f32).abs() 
                        / current_buffer_ms as f32) * 100.0;
    
    if size_diff_pct > 10.0 {
        // Emit resize event
        emit_log(&app_handle_net, "info", 
            format!("Adaptive buffer: {}ms → {}ms (jitter: {:.1}ms)",
                    current_buffer_ms, target_buffer_ms, jitter_avg));
        current_buffer_ms = target_buffer_ms;
    }
}

Adaptive Ranges by Device Type

let (adaptive_min_ms, adaptive_max_ms) = if is_loopback {
    // WASAPI Loopback: 4000-12000ms range
    (4000.max(min_buffer_ms), 12000.min(max_buffer_ms))
} else {
    // Standard Input: 2000-6000ms range
    (2000.max(min_buffer_ms), 6000.min(max_buffer_ms))
};

Jitter-to-Buffer Mapping

Jitter (ms)  →  Buffer Size
───────────────────────────────
< 5          →  adaptive_min_ms  (Wired network: 2s or 4s)
5-15         →  Linear interpolation
> 15         →  adaptive_max_ms  (Poor WiFi: 6s or 12s)
Example (Standard Input, min=2000, max=6000):
Jitter 0ms   → 2000ms buffer
Jitter 5ms   → 2000ms buffer
Jitter 10ms  → 4000ms buffer (midpoint)
Jitter 15ms  → 6000ms buffer
Jitter 20ms+ → 6000ms buffer

Current Limitation

The adaptive logic tracks target size but doesn’t physically resize the ring buffer:
// Note: Actual ring buffer resizing would require complex synchronization
// to avoid audio dropouts. For now, we just track the target size and
// emit events. Full implementation would need to:
// 1. Create new ring buffer with new size
// 2. Copy existing data to new buffer
// 3. Atomically swap prod/cons references
// This is marked as future enhancement.
Current behavior: Emits resize events for monitoring, but buffer remains at initial size. Future enhancement: Hot-swappable ring buffer with atomic pointer swap.

Jitter Handling

Sources of Jitter

  1. Network Jitter:
    • WiFi interference
    • Router bufferbloat
    • Competing traffic
  2. OS Scheduler:
    • Thread preemption
    • CPU frequency scaling
    • Background processes
  3. Hardware:
    • USB audio interfaces (variable latency)
    • Laptop power management
    • Thermal throttling

Buffer as Jitter Absorber

The ring buffer acts as a time-based cushion:
Audio arrives at variable intervals:
  ▂▄▆█▅▃▁ (jitter)

Ring buffer smooths it:
  ▅▅▅▅▅▅▅ (steady output)
Trade-off: Larger buffer = more jitter tolerance, but higher latency.

High Water Mark (Drain Mode)

When buffer fills beyond threshold, the system enters drain mode (audio.rs:606-612):
let high_water_mark = prefill_samples + (sample_rate as usize / 10);  // Prefill + 100ms

if current_buffered > high_water_mark && current_stream.is_some() {
    // DRAIN MODE: We are lagging. Process immediately.
    next_tick = now + tick_duration;  // Skip sleep
}
Purpose: If audio is arriving faster than network can send (e.g., temporary congestion), catch up by processing immediately until buffer drains to normal level.

Prefill Gate (v1.8.1)

Cold Start Problem

Without prefill:
Time:  0ms   10ms  20ms  30ms  40ms  50ms
Buffer: [  ]  [▂ ]  [▄ ]  [▆ ]  [█ ]  [██]
Send:   ✗     ✗     ✗     ✗     ✗     ✓
Result: Stutter (sending silence initially)
With 1000ms prefill:
Time:  0ms   ...   1000ms  1010ms  1020ms
Buffer: [  ]  ...   [████]   [████]   [████]
Send:   Wait  ...    ✓        ✓        ✓
Result: Smooth start (buffer cushion ready)

Implementation

let prefill_samples = sample_rate as usize * 1;  // 1 second

emit_log(&app_handle_net, "info", 
    format!("Buffering... waiting for {} samples (1000ms)", prefill_samples));

while cons.len() < prefill_samples && is_running_clone.load(Ordering::Relaxed) {
    thread::sleep(Duration::from_millis(10));  // Poll every 10ms
}

emit_log(&app_handle_net, "success", "Buffer prefilled! Starting transmission.");
Why 1000ms?
  • Balances startup delay and stability
  • Absorbs TCP handshake latency
  • Small enough to be imperceptible to user
  • Large enough to prevent initial stutter
  • Works equally well on Windows, Linux, macOS

Prefill + High Water Mark Relationship

┌───────────────────────────────────────────────────┐
│              Ring Buffer Capacity                 │
│  (e.g., 4000ms @ 48kHz = 384,000 samples)        │
└───────────────────────────────────────────────────┘

0        Prefill (1s)    High Water (1.1s)      4s
├───────────┼────────────────┼──────────────────┤
│  Wait     │   Normal Mode  │   Drain Mode     │
│  (fill)   │ (strict pacing)│ (catch up)       │
└───────────┴────────────────┴──────────────────┘

Buffer Health Metrics

Calculation

let occupied = cons.len();
let capacity = cons.capacity();
let buffer_health = 1.0 - (occupied as f32 / capacity as f32);
Interpretation:
  • 1.0: Empty (consuming faster than producing)
  • 0.5: Half full (balanced)
  • 0.0: Full (producing faster than consuming)
Ideal range: 0.3 - 0.7 (30-70% full)

Buffer Usage in Quality Score

let buffer_usage = occupied as f32 / capacity as f32;
let buffer_penalty = if buffer_usage > 0.8 {
    ((buffer_usage - 0.8) / 0.2 * 30.0) as u8  // Max 30 points penalty
} else {
    0
};
High usage (>80%) indicates:
  • Network is slower than audio production
  • Risk of overflow (dropped samples)
  • Should increase buffer size or reduce chunk size

Overflow vs. Underflow

Overflow (Buffer Full)

Cause: Audio arrives faster than network can send Symptom: “Buffer overflow: Dropped X samples” warnings Effect: Audio gaps (dropped samples) Solutions:
  1. Increase ring buffer size
  2. Increase chunk size (send larger batches)
  3. Check network congestion
  4. Enable adaptive buffering

Underflow (Buffer Empty)

Cause: Network sends faster than audio arrives Symptom: “Starvation: Generating silence chunk” debug logs Effect: Silence inserted (zeros) Solutions:
  1. Increase prefill duration
  2. Decrease chunk size (smaller batches)
  3. Check audio device configuration
  4. Verify sample rate matches device

Memory Characteristics

Allocation

HeapRb<f32>::new(ring_buffer_size)  // Heap-allocated
  • Location: Heap (not stack)
  • Type: Contiguous array of f32
  • Lifetime: Owned by stream, dropped when stream stops

Size Examples

Sample RateDurationSamplesMemory
44.1 kHz2000ms176,4000.67 MB
48 kHz2000ms192,0000.73 MB
48 kHz4000ms384,0001.46 MB
48 kHz8000ms768,0002.93 MB
48 kHz12000ms1,152,0004.39 MB
Calculation: samples * 4 bytes (f32) / 1024 / 1024

Cache Efficiency

  • Sequential writes: Producer writes forward (cache-friendly)
  • Sequential reads: Consumer reads forward (cache-friendly)
  • Wraparound: HeapRb handles circular logic internally
  • False sharing: Avoided (separate head/tail cache lines)

Comparison: Lock-Free vs. Mutex-Based

HeapRb (Lock-Free)

let (prod, cons) = rb.split();

// Producer thread
prod.push_slice(data);  // No blocking, no syscalls

// Consumer thread
cons.pop_slice(buffer);  // No blocking, no syscalls
Advantages:
  • Wait-free for producer (critical for audio)
  • No priority inversion
  • Deterministic latency
  • No kernel involvement

Hypothetical Mutex-Based

let buffer = Arc<Mutex<Vec<f32>>>;

// Producer thread
let mut guard = buffer.lock().unwrap();  // May block!
guard.extend_from_slice(data);

// Consumer thread
let mut guard = buffer.lock().unwrap();  // May block!
let data = guard.drain(..chunk_size).collect();
Disadvantages:
  • Producer may block (audio dropouts!)
  • Priority inversion risk
  • Variable latency
  • Kernel involvement (futex syscalls)

Future Enhancements

  • Hot-swappable resizing: Atomic buffer replacement
  • Metrics: Overflow/underflow counters exposed to frontend
  • Variable chunk size: Dynamic adjustment based on jitter
  • Multiple buffers: Separate buffers per network connection

Build docs developers (and LLMs) love