Skip to main content

TCP Streaming Architecture

TCP Streamer uses a dedicated network thread to transmit audio data:
┌────────────────────────────────────────────────────┐
│            Network Consumer Thread                 │
│  ┌──────────────────────────────────────────────┐  │
│  │  1. Prefill Wait (1000ms)                    │  │
│  │  2. Strict Pacing Loop                       │  │
│  │     - Calculate tick duration                │  │
│  │     - Adaptive drain on high buffer          │  │
│  │     - Sleep until next tick                  │  │
│  │  3. Connection Management                    │  │
│  │     - Exponential backoff                    │  │
│  │     - Socket2 advanced setup                 │  │
│  │  4. Data Transmission                        │  │
│  │     - F32 → I16 LE conversion                │  │
│  │     - TCP write with timeout                 │  │
│  │  5. Metrics Collection                       │  │
│  │     - Jitter (EWMA)                          │  │
│  │     - Latency samples                        │  │
│  │     - Quality score                          │  │
│  └──────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────┘
          │                              │
          ▼                              ▼
  ┌───────────────┐            ┌──────────────────┐
  │ Ring Buffer   │            │ Frontend Events  │
  │ (Consumer)    │            │ (stats, quality) │
  └───────────────┘            └──────────────────┘

Socket Configuration (socket2)

The application uses the socket2 crate for advanced socket control (audio.rs:706-740):

Socket Creation

use socket2::{Domain, Protocol, Socket, TcpKeepalive, Type};

let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;

Send Buffer Size

socket.set_send_buffer_size(1024 * 1024)?;  // 1 MB
Purpose: Large send buffer prevents blocking when network is temporarily slow. Trade-off: Higher memory usage, but critical for audio streaming where writes cannot wait.

TCP Keepalive

let keepalive = TcpKeepalive::new()
    .with_time(Duration::from_secs(5))      // Send probe after 5s idle
    .with_interval(Duration::from_secs(2)); // Retry every 2s
    
socket.set_tcp_keepalive(&keepalive)?;
Benefits:
  • Detects dead connections (network cable unplugged)
  • Prevents firewall timeout (keeps NAT entries alive)
  • Faster failure detection than default OS settings (usually 2 hours)
How it works:
  1. After 5 seconds of no data, send TCP keepalive probe
  2. If no ACK, retry every 2 seconds
  3. After ~9 probes (OS-dependent), connection marked dead

Type of Service (TOS) / DSCP

let tos_value = match dscp_strategy.as_str() {
    "voip" => 0xB8,           // Expedited Forwarding (EF) - DSCP 46
    "lowdelay" => 0x10,       // IPTOS_LOWDELAY - Minimize delay
    "throughput" => 0x08,     // IPTOS_THROUGHPUT - Maximize throughput
    "besteffort" => 0x00,     // Default (no priority)
    _ => 0xB8,                // Default to VoIP
};

let _ = socket.set_tos(tos_value);  // Best effort (may fail on some platforms)
DSCP Classes:
StrategyDSCPHexUse Case
VoIP460xB8Real-time audio (recommended)
Low Delay160x10Interactive traffic
Throughput80x08Bulk data transfer
Best Effort00x00Normal traffic
Router Support: QoS must be enabled on network equipment to honor DSCP markings.

TCP Nodelay (Nagle’s Algorithm)

let stream: TcpStream = socket.into();
stream.set_nodelay(true)?;  // Disable Nagle's algorithm
Why disable Nagle?
  • Nagle buffers small packets to reduce network overhead
  • Audio requires immediate transmission (latency-sensitive)
  • We already batch data into chunks (128-4096 samples)
  • Disabling reduces latency by 40-200ms

Write Timeout (Critical)

stream.set_write_timeout(Some(Duration::from_secs(5)))?;
Purpose: Prevent indefinite blocking if server stops reading. Behavior:
  • write_all() will return error after 5 seconds
  • Triggers connection cleanup and reconnection
  • Prevents “zombie” connections
Why 5 seconds?
  • Long enough for network jitter
  • Short enough to detect server crashes
  • Balances responsiveness and stability

Connection Lifecycle

Initial Connection

let addr: SocketAddr = server_addr.parse()?;
socket.connect(&addr.into())?;
Blocking operation: Waits for TCP 3-way handshake (SYN, SYN-ACK, ACK).

Reconnection Strategy (Exponential Backoff)

let mut retry_delay = Duration::from_secs(2);  // Start at 2s
const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);

// On failure:
retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);

// Add jitter to prevent thundering herd
fn add_jitter(base: Duration) -> Duration {
    let jitter_ms = (SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap_or_default()
        .subsec_nanos() % 1000) as i64 - 500;  // ±500ms
    let ms = base.as_millis() as i64 + jitter_ms;
    Duration::from_millis(ms.max(2000) as u64)
}
Backoff Progression:
Attempt 1: 2s   (+ jitter)
Attempt 2: 4s   (+ jitter)
Attempt 3: 8s   (+ jitter)
Attempt 4: 16s  (+ jitter)
Attempt 5: 32s  (+ jitter)
Attempt 6+: 60s (+ jitter, capped)
Why jitter?
  • If 100 clients disconnect simultaneously (server crash)
  • Without jitter: All reconnect at same time (thundering herd)
  • With jitter: Reconnections spread over ±500ms window

Graceful Shutdown

fn close_tcp_stream(stream: TcpStream, context: &str, app_handle: &AppHandle) {
    use std::net::Shutdown;
    
    // Send TCP FIN to server
    if let Err(e) = stream.shutdown(Shutdown::Both) {
        emit_log(app_handle, "debug", 
            format!("TCP shutdown {} ({}): socket may already be closed", context, e));
    } else {
        emit_log(app_handle, "debug", 
            format!("TCP connection closed gracefully ({})", context));
    }
    
    // stream drops here, releasing the socket
}
When called:
  • Write error detected (audio.rs:804)
  • Network thread exits (audio.rs:960)
  • User stops stream
Why Shutdown::Both?
  • Sends FIN in both directions
  • Signals server to close its end
  • Prevents half-open connections

Write Error Handling

if let Err(e) = s.write_all(&payload) {
    emit_log(&app_handle_net, "error", format!("Write error: {}", e));
    
    // v1.9.0: Explicit socket destruction
    if let Some(stream) = current_stream.take() {
        close_tcp_stream(stream, "write error", &app_handle_net);
    }
    
    // Allow kernel to process FIN before reconnect
    thread::sleep(Duration::from_millis(100));
}
Common write errors:
  • WouldBlock: Timeout (5s expired)
  • BrokenPipe: Server closed connection
  • ConnectionReset: Server sent RST
  • NetworkUnreachable: Network failure
Recovery:
  1. Log error
  2. Gracefully close socket (send FIN)
  3. Wait 100ms for kernel cleanup
  4. Loop continues, triggers reconnection logic

Disconnected Buffer Drain (v1.9.0)

When disconnected, the system drains stale audio to send fresh data on reconnect (audio.rs:613-623):
if current_stream.is_none() && current_buffered > 0 {
    // DISCONNECTED: Clear stale audio
    let drain_amount = (sample_rate as usize / 10).min(current_buffered);  // 100ms chunks
    let mut drain_buffer = vec![0.0f32; drain_amount];
    let _ = cons.pop_slice(&mut drain_buffer);
    
    // Use strict pacing to avoid CPU spin
    if now < next_tick {
        thread::sleep(next_tick - now);
    }
    next_tick = Instant::now() + tick_duration;
}
Why drain?
  • During 60-second reconnection delay, 60 seconds of audio accumulates
  • Sending old audio on reconnect causes noticeable delay
  • Better to send fresh audio immediately
Why 100ms chunks?
  • Avoids blocking network thread for too long
  • Balances drain speed and responsiveness

Network Quality Metrics

Jitter Measurement

let send_time = Instant::now();
let target = next_tick - tick_duration;
let deviation_ms = if send_time > target {
    send_time.duration_since(target).as_secs_f32() * 1000.0
} else {
    0.0
};

// Exponential Weighted Moving Average (EWMA)
if jitter_avg == 0.0 {
    jitter_avg = deviation_ms;
} else {
    jitter_avg = 0.9 * jitter_avg + 0.1 * deviation_ms;
}
Interpretation:
  • < 5ms: Excellent (wired network)
  • 5-15ms: Good (WiFi)
  • 15-50ms: Fair (congested WiFi)
  • > 50ms: Poor (investigate network)

Latency Tracking

let write_start = Instant::now();
if let Err(e) = s.write_all(&payload) {
    // Error handling
} else {
    let write_duration_ms = write_start.elapsed().as_secs_f32() * 1000.0;
    if latency_samples.len() >= 100 {
        latency_samples.remove(0);  // Keep last 100
    }
    latency_samples.push(write_duration_ms);
}
Metrics:
  • Tracks time spent in write_all()
  • Includes kernel buffering and network transmission
  • Rolling window of 100 samples

Quality Score (0-100)

let jitter_penalty = ((jitter_avg / 20.0).min(1.0) * 50.0) as u8;  // Max 50 points
let buffer_penalty = if buffer_usage > 0.8 {
    ((buffer_usage - 0.8) / 0.2 * 30.0) as u8  // Max 30 points
} else { 0 };
let error_penalty = ((consecutive_errors.min(5) as f32 / 5.0) * 20.0) as u8;  // Max 20 points

let score = 100u8.saturating_sub(jitter_penalty + buffer_penalty + error_penalty);
Score Interpretation:
  • 90-100: Excellent
  • 70-89: Good
  • 50-69: Fair
  • < 50: Poor

Thread Priority

use thread_priority::{ThreadBuilder, ThreadPriority};

let priority = if high_priority {
    ThreadPriority::Max
} else {
    ThreadPriority::Min  // Actually normal/default
};

let thread_builder = ThreadBuilder::default()
    .name("NetworkThread")
    .priority(priority);
    
let _ = thread_builder.spawn(move |result| {
    if let Err(e) = result {
        emit_log(&app_handle_net, "warning", 
            format!("Failed to set thread priority: {:?}", e));
    }
    // Network thread logic...
});
Impact:
  • High priority: Reduces jitter under CPU load
  • Trade-off: Can starve other processes
  • Default: Normal priority (recommended for most users)

Heartbeat Monitoring

let mut last_heartbeat = Instant::now();
const HEARTBEAT_INTERVAL_SECS: u64 = 30;

if last_heartbeat.elapsed() >= Duration::from_secs(HEARTBEAT_INTERVAL_SECS) {
    let occupied = cons.len();
    let capacity = cons.capacity();
    let buffer_pct = occupied as f32 / capacity as f32 * 100.0;
    
    emit_log(&app_handle_net, "debug", 
        format!("Network thread heartbeat: ✓ Active | Buffer: {:.1}% | Connection: {}",
                buffer_pct, 
                if current_stream.is_some() { "Connected" } else { "Disconnected" }));
    
    last_heartbeat = Instant::now();
}
Purpose:
  • Proves thread is alive
  • Provides buffer health visibility
  • Helps debug “silent” failures

Network Presets

Three presets optimize for different network conditions:

Ethernet

  • Ring Buffer: 2000ms
  • Chunk Size: 512 samples
  • Adaptive Range: 2000-6000ms
  • Target: Lowest latency, assumes stable connection

WiFi

  • Ring Buffer: 4000ms
  • Chunk Size: 1024 samples
  • Adaptive Range: 3000-10000ms
  • Target: Balanced stability for typical WiFi

WiFi (Poor Signal)

  • Ring Buffer: 8000ms
  • Chunk Size: 2048 samples
  • Adaptive Range: 5000-15000ms
  • Target: Maximum stability, prevents dropouts

Future Enhancements

  • UDP mode: Lower latency, optional packet loss
  • QUIC protocol: Modern alternative to TCP
  • Connection pooling: Multiple parallel streams
  • SSL/TLS: Encrypted transmission
  • IPv6 support: Currently IPv4 only

Build docs developers (and LLMs) love