Skip to main content
The Pacing interceptor controls the rate at which RTP packets are sent, smoothing out bursts and ensuring consistent bandwidth usage. This helps prevent network congestion and improves overall stream quality.

Overview

The Pacing interceptor provides:
  • Rate limiting: Controls packet transmission rate in bits per second
  • Queue management: Buffers packets when rate limit is reached
  • Burst handling: Smooths out bursty traffic patterns
  • Dynamic adjustment: Rate can be updated based on network conditions
Pacing is particularly useful when combined with congestion control algorithms like GCC that dynamically adjust target bitrates.

Basic Usage

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/pacing"
)

// Create pacing interceptor factory
pacingFactory := pacing.NewInterceptor(
    pacing.InitialRate(1_000_000), // 1 Mbps
)

// Register with interceptor registry
m := &interceptor.Registry{}
m.Add(pacingFactory)

Configuration Options

InitialRate

Sets the initial pacing rate in bits per second:
pacingFactory := pacing.NewInterceptor(
    pacing.InitialRate(2_000_000), // 2 Mbps, default: 1 Mbps
)

Interval

Sets how often packets are sent:
pacingFactory := pacing.NewInterceptor(
    pacing.Interval(5 * time.Millisecond), // Default: 5ms
)
Shorter intervals provide smoother pacing but use more CPU. 5ms is a good balance for most use cases.

WithLoggerFactory

Configure logging for debugging:
pacingFactory := pacing.NewInterceptor(
    pacing.WithLoggerFactory(loggerFactory),
)

Dynamic Rate Adjustment

Update pacing rate based on network conditions:
// Create factory with ID tracking
pacingFactory := pacing.NewInterceptor(
    pacing.InitialRate(1_000_000),
)

m := &interceptor.Registry{}
m.Add(pacingFactory)

// Later, adjust rate for a specific connection
connectionID := "peer-123"
newRate := 2_000_000 // 2 Mbps

pacingFactory.SetRate(connectionID, newRate)

Complete Example

package main

import (
    "log"
    "time"
    
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/gcc"
    "github.com/pion/interceptor/pkg/pacing"
    "github.com/pion/webrtc/v4"
)

func main() {
    // Create pacing interceptor
    pacingFactory := pacing.NewInterceptor(
        pacing.InitialRate(1_000_000),        // Start at 1 Mbps
        pacing.Interval(5 * time.Millisecond), // Send every 5ms
    )
    
    // Create interceptor registry
    i := &interceptor.Registry{}
    i.Add(pacingFactory)
    
    // Create media engine and API
    m := &webrtc.MediaEngine{}
    if err := m.RegisterDefaultCodecs(); err != nil {
        panic(err)
    }
    
    api := webrtc.NewAPI(
        webrtc.WithMediaEngine(m),
        webrtc.WithInterceptorRegistry(i),
    )
    
    // Create peer connection
    pc, err := api.NewPeerConnection(webrtc.Configuration{})
    if err != nil {
        panic(err)
    }
    defer pc.Close()
    
    // Get connection ID for rate updates
    connectionID := pc.ConnectionState().String()
    
    // Simulate rate adjustments based on network conditions
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
        
        rates := []int{1_000_000, 2_000_000, 1_500_000, 3_000_000}
        idx := 0
        
        for range ticker.C {
            newRate := rates[idx%len(rates)]
            pacingFactory.SetRate(connectionID, newRate)
            log.Printf("Updated pacing rate to %d bps (%.2f Mbps)",
                newRate, float64(newRate)/1_000_000)
            idx++
        }
    }()
    
    log.Println("Pacing configured")
    
    select {}
}

Integration with GCC

Pacing works seamlessly with congestion control:
import (
    "github.com/pion/interceptor/pkg/gcc"
    "github.com/pion/interceptor/pkg/pacing"
)

// Create pacing interceptor
pacingFactory := pacing.NewInterceptor(
    pacing.InitialRate(1_000_000),
)

i := &interceptor.Registry{}
i.Add(pacingFactory)

// Create GCC bandwidth estimator
bwe, err := gcc.NewSendSideBWE(
    gcc.SendSideBWEInitialBitrate(1_000_000),
    gcc.SendSideBWEMinBitrate(100_000),
    gcc.SendSideBWEMaxBitrate(5_000_000),
)
if err != nil {
    panic(err)
}

// Update pacing rate when GCC estimates change
bwe.OnTargetBitrateChange(func(bitrate int) {
    log.Printf("GCC target bitrate: %d bps", bitrate)
    pacingFactory.SetRate(connectionID, bitrate)
})

How It Works

Token Bucket Algorithm

The pacer uses a token bucket filter:
  1. Tokens: Represent available bandwidth budget
  2. Rate: Tokens added at configured rate (bits per second)
  3. Bucket: Holds accumulated tokens (burst capacity)
  4. Transmission: Packets sent when enough tokens available
  5. Depletion: Tokens consumed when packets sent
// Simplified token bucket
type TokenBucket struct {
    rate   int           // bits per second
    tokens float64       // current token count
    burst  int           // maximum tokens
}

func (tb *TokenBucket) AllowN(now time.Time, bits int) bool {
    // Add tokens based on time elapsed
    tb.tokens += float64(tb.rate) * elapsed.Seconds()
    tb.tokens = min(tb.tokens, float64(tb.burst))
    
    // Check if enough tokens
    if tb.tokens >= float64(bits) {
        tb.tokens -= float64(bits)
        return true
    }
    
    return false
}

Packet Queue

Packets are queued when rate limit is reached:
// Queue capacity (default: 1,000,000 packets)
const defaultQueueSize = 1_000_000

// Packet is queued if:
// 1. Bucket doesn't have enough tokens
// 2. Previous packets still in queue

// Packet is sent when:
// 1. Interval timer fires
// 2. Bucket has enough tokens for packet size

Burst Handling

Burst capacity allows short bursts above the rate:
// Burst is calculated based on rate and interval
func burst(rate int, interval time.Duration) int {
    if interval == 0 {
        interval = time.Millisecond
    }
    
    // How many intervals per second
    intervalsPerSecond := float64(time.Second) / float64(interval)
    
    // Burst is one interval's worth of bits
    return int(float64(rate) / intervalsPerSecond)
}

// Example: 1 Mbps rate, 5ms interval
// burst = 1,000,000 / (1000/5) = 5,000 bits
Burst capacity allows handling momentary traffic spikes without dropping or delaying packets excessively.

Error Handling

The pacer can return errors:
var (
    // Returned when pacer is closed
    errPacerClosed = errors.New("pacer closed")
    
    // Returned when queue is full
    errPacerOverflow = errors.New("pacer queue overflow")
)

// Handle write errors
n, err := rtpWriter.Write(header, payload, attributes)
if err != nil {
    switch err {
    case errPacerClosed:
        log.Println("Pacer closed, stopping transmission")
        return
    case errPacerOverflow:
        log.Println("Queue overflow, dropping packet")
        // Consider reducing encoding rate
    }
}

Performance Tuning

Interval Selection

// Real-time gaming - more frequent sends
pacing.Interval(1 * time.Millisecond)

// Video conferencing - balanced
pacing.Interval(5 * time.Millisecond)

// Streaming - less frequent OK
pacing.Interval(10 * time.Millisecond)

Queue Size

Queue size determines how many packets can be buffered:
// Default queue size
const queueSize = 1_000_000 // packets

// Memory usage estimate
packetSize := 1200 // bytes
memory := queueSize * packetSize // ~1.2 GB (worst case)

// In practice, queue rarely fills completely
// Typical usage: < 100 packets = ~120 KB

CPU Impact

// CPU usage depends on interval
interval := 5 * time.Millisecond
wakeupsPerSecond := float64(time.Second) / float64(interval)
log.Printf("Wakeups per second: %.0f", wakeupsPerSecond) // 200

// Each wakeup: ~0.05% CPU
// Total: ~10% CPU (200 * 0.05%)

Monitoring

Track pacing effectiveness:
type PacingMonitor struct {
    queued    int64
    sent      int64
    dropped   int64
    mu        sync.Mutex
}

func (pm *PacingMonitor) OnPacketQueued() {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    pm.queued++
}

func (pm *PacingMonitor) OnPacketSent() {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    pm.sent++
}

func (pm *PacingMonitor) OnPacketDropped() {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    pm.dropped++
}

func (pm *PacingMonitor) Stats() map[string]int64 {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    
    return map[string]int64{
        "queued":  pm.queued,
        "sent":    pm.sent,
        "dropped": pm.dropped,
        "pending": pm.queued - pm.sent - pm.dropped,
    }
}

Best Practices

  1. Set Appropriate Rate: Match initial rate to expected bandwidth
  2. Combine with BWE: Use with bandwidth estimation (GCC)
  3. Monitor Queue: Watch for queue buildup indicating rate too low
  4. Handle Overflows: Implement fallback when queue fills
  5. Smooth Adjustments: Change rates gradually, not abruptly
Setting the pacing rate too low will cause packet loss. Ensure the rate matches or exceeds the encoder’s output bitrate.

Use Cases

Smooth Bursty Traffic

// Encoder outputs frames in bursts
// Pacer smooths to constant rate
pacing.InitialRate(encoderBitrate)
pacing.Interval(5 * time.Millisecond)

Prevent Congestion

// Limit sending rate to prevent overwhelming network
maxNetworkRate := 2_000_000 // 2 Mbps
pacing.InitialRate(maxNetworkRate)

Fair Bandwidth Sharing

// Multiple streams share bandwidth
totalBandwidth := 5_000_000 // 5 Mbps
streams := 3
ratePerStream := totalBandwidth / streams

for i := 0; i < streams; i++ {
    factory := pacing.NewInterceptor(
        pacing.InitialRate(ratePerStream),
    )
    registry.Add(factory)
}

Troubleshooting

Causes:
  • Pacing rate too low for encoder output
  • Burst too large for queue
  • Rate not updated when conditions change
Solutions:
  • Increase pacing rate
  • Reduce encoder bitrate
  • Implement dynamic rate adjustment
  • Monitor queue depth
Causes:
  • Queue buildup due to rate mismatch
  • Interval too long
Solutions:
  • Match pacing rate to encoder output
  • Reduce interval
  • Clear queue on congestion
Causes:
  • Interval too long
  • Rate too low
  • Excessive queuing delay
Solutions:
  • Decrease interval (5ms or less)
  • Increase pacing rate
  • Monitor queue depth

Comparison with Alternatives

Pacing Interceptor

// Pros:
// - Simple rate limiting
// - Low overhead
// - Easy configuration

// Cons:
// - Fixed interval
// - Simple token bucket
// - No priority queuing

GCC’s Built-in Pacer

// Pros:
// - Integrated with bandwidth estimation
// - Automatic rate adjustment
// - Production-tested

// Cons:
// - Tied to GCC algorithm
// - Less control

No Pacing

// Pros:
// - Lowest latency
// - No queuing delay
// - Simplest

// Cons:
// - Bursty traffic
// - Can cause congestion
// - May trigger packet loss
  • GCC - Bandwidth estimation that benefits from pacing
  • Stats - Monitor sending rate and queue depth
  • Report - Track packet statistics

Build docs developers (and LLMs) love