Skip to main content
The pacing package implements packet pacing using a token bucket algorithm to smooth out packet transmission and prevent network congestion from bursty traffic.

Overview

Pacing smooths packet transmission by:
  • Rate Limiting: Controls sending rate to target bitrate
  • Burst Control: Prevents large packet bursts
  • Queue Management: Buffers packets when rate limit exceeded
  • Adaptive: Adjusts rate based on congestion control

InterceptorFactory

Type Definition

type InterceptorFactory struct {
    // contains filtered or unexported fields
}

Constructor

func NewInterceptor(opts ...Option) *InterceptorFactory
Creates a new pacing interceptor factory.

Options

InitialRate

func InitialRate(rate int) Option
Sets the initial pacing rate.
rate
int
Initial bitrate in bits per second. Default: 1,000,000 (1 Mbps)

Interval

func Interval(interval time.Duration) Option
Sets the pacing interval (how often packets are sent).
interval
time.Duration
Time between pacing cycles. Default: 5ms

WithLoggerFactory

func WithLoggerFactory(loggerFactory logging.LoggerFactory) Option
Sets a custom logger factory.

Methods

SetRate

func (f *InterceptorFactory) SetRate(id string, r int)
Updates the pacing rate for a specific interceptor instance.
id
string
PeerConnection ID
r
int
New bitrate in bits per second

Usage Example

Basic Setup

import (
    "time"
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/pacing"
)

// Create pacing interceptor
pacingFactory := pacing.NewInterceptor(
    pacing.InitialRate(2_000_000),      // 2 Mbps
    pacing.Interval(5 * time.Millisecond),
)

// Add to registry
registry := &interceptor.Registry{}
registry.Add(pacingFactory)

// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
    panic(err)
}
defer i.Close()

Dynamic Rate Adjustment

import (
    "github.com/pion/interceptor/pkg/pacing"
    "github.com/pion/interceptor/pkg/stats"
)

func setupAdaptivePacing() (*pacing.InterceptorFactory, stats.Getter, error) {
    // Create pacing factory
    pacingFactory := pacing.NewInterceptor(
        pacing.InitialRate(1_000_000),
    )
    
    // Create stats interceptor
    statsFactory, _ := stats.NewInterceptor()
    var statsGetter stats.Getter
    statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
        statsGetter = getter
        
        // Periodically adjust pacing based on conditions
        go func() {
            ticker := time.NewTicker(1 * time.Second)
            defer ticker.Stop()
            
            for range ticker.C {
                if s := getter.Get(mySSRC); s != nil {
                    // Reduce rate if high loss
                    lossRate := float64(s.PacketsLost) / float64(s.PacketsReceived)
                    if lossRate > 0.05 {
                        newRate := 800_000 // 800 Kbps
                        pacingFactory.SetRate(id, newRate)
                        log.Printf("Reduced pacing rate to %d bps", newRate)
                    }
                }
            }
        }()
    })
    
    return pacingFactory, statsGetter, nil
}

Integration with GCC

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/cc"
    "github.com/pion/interceptor/pkg/gcc"
    "github.com/pion/interceptor/pkg/pacing"
)

func setupPacingWithGCC() (*interceptor.Registry, error) {
    registry := &interceptor.Registry{}
    
    // Create pacing interceptor
    pacingFactory := pacing.NewInterceptor(
        pacing.InitialRate(1_000_000),
        pacing.Interval(10 * time.Millisecond),
    )
    
    // Create GCC with custom pacer
    gccFactory := func() (cc.BandwidthEstimator, error) {
        // Note: This is conceptual - actual integration is more complex
        return gcc.NewSendSideBWE(
            gcc.SendSideBWEInitialBitrate(1_000_000),
        )
    }
    
    ccFactory, _ := cc.NewInterceptor(gccFactory)
    
    // Update pacing rate when GCC changes target bitrate
    ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
        estimator.OnTargetBitrateChange(func(bitrate int) {
            pacingFactory.SetRate(id, bitrate)
            log.Printf("Pacing rate updated to %d bps", bitrate)
        })
    })
    
    registry.Add(pacingFactory)
    registry.Add(ccFactory)
    
    return registry, nil
}

How It Works

Token Bucket Algorithm

  1. Token Generation: Tokens added at target bitrate
  2. Token Consumption: Each packet consumes tokens (8 bits per byte)
  3. Queue When Empty: Packets queued if insufficient tokens
  4. Send When Available: Packets sent when tokens available
// Simplified concept:
type TokenBucket struct {
    rate       int           // bits per second
    burst      int           // maximum burst size in bits
    tokens     float64       // current token count
    lastUpdate time.Time
}

func (tb *TokenBucket) Allow(packetSize int) bool {
    now := time.Now()
    elapsed := now.Sub(tb.lastUpdate)
    
    // Add tokens based on elapsed time
    tokensToAdd := float64(tb.rate) * elapsed.Seconds()
    tb.tokens = min(tb.tokens + tokensToAdd, float64(tb.burst))
    tb.lastUpdate = now
    
    // Check if enough tokens
    requiredTokens := float64(packetSize * 8) // bits
    if tb.tokens >= requiredTokens {
        tb.tokens -= requiredTokens
        return true
    }
    
    return false
}

Burst Size Calculation

// Burst = (bitrate / interval_frequency) * 8
// Allows sending rate/interval_frequency bits per interval

rate := 1_000_000 // 1 Mbps
interval := 5 * time.Millisecond
frequency := float64(time.Second) / float64(interval) // 200 Hz
burst := int(float64(rate) / frequency) // 5000 bits

Performance Characteristics

Latency

  • Minimum: ~interval (e.g., 5ms)
  • Maximum: Depends on queue size and rate
  • Typical: 5-20ms added latency

Queue Behavior

// Queue size: 1,000,000 bytes (default)
// At 1 Mbps: Can buffer ~8 seconds of video
// Queue full: Returns errPacerOverflow

CPU Usage

  • Background goroutine runs every interval
  • Minimal CPU per packet
  • Token bucket calculations: O(1)

Configuration Guidelines

Interval Selection

Fast (1-5ms):
  • Lower latency
  • More frequent packet sends
  • Higher CPU overhead
  • Better for interactive applications
Normal (5-10ms):
  • Balanced approach
  • Good for most use cases
  • Recommended
Slow (10-20ms):
  • Higher latency
  • Lower CPU overhead
  • Suitable for non-interactive streams

Rate Selection

// Video streaming
initialRate := 2_000_000 // 2 Mbps for HD video

// Audio streaming
initialRate := 128_000 // 128 Kbps for high-quality audio

// Screen sharing
initialRate := 500_000 // 500 Kbps

Error Handling

Queue Overflow

var errPacerOverflow = errors.New("pacer queue overflow")

// Returned when queue is full
// Application should:
// 1. Reduce encoder bitrate
// 2. Drop frames
// 3. Wait and retry

Pacer Closed

var errPacerClosed = errors.New("pacer closed")

// Returned after Close() is called
// Application should stop sending

Advanced Usage

Multi-Stream Pacing

The pacer automatically handles multiple streams:
// Pacer manages all streams for a PeerConnection
// Token bucket shared across streams
// Fair queuing between streams

Custom Pacer

import "golang.org/x/time/rate"

type CustomPacer struct {
    limiter *rate.Limiter
    queue   chan Packet
}

func NewCustomPacer(rateLimit int) *CustomPacer {
    return &CustomPacer{
        limiter: rate.NewLimiter(rate.Limit(rateLimit), rateLimit/10),
        queue:   make(chan Packet, 1000),
    }
}

func (p *CustomPacer) Write(
    header *rtp.Header,
    payload []byte,
    attributes interceptor.Attributes,
) (int, error) {
    pkt := Packet{header, payload, attributes}
    
    select {
    case p.queue <- pkt:
        return header.MarshalSize() + len(payload), nil
    default:
        return 0, errors.New("queue full")
    }
}

func (p *CustomPacer) sender() {
    for pkt := range p.queue {
        // Wait for rate limiter
        if err := p.limiter.WaitN(context.Background(), len(pkt.payload)); err != nil {
            continue
        }
        
        // Send packet
        pkt.writer.Write(pkt.header, pkt.payload, pkt.attributes)
    }
}

Rate Smoothing

type RateSmoother struct {
    target   int
    current  int
    stepSize int
}

func (s *RateSmoother) Update(newTarget int) {
    s.target = newTarget
}

func (s *RateSmoother) Step() int {
    if s.current < s.target {
        s.current = min(s.current + s.stepSize, s.target)
    } else if s.current > s.target {
        s.current = max(s.current - s.stepSize, s.target)
    }
    return s.current
}

// Use smoother with pacing
smoother := &RateSmoother{
    current:  1_000_000,
    stepSize: 100_000, // 100 Kbps steps
}

go func() {
    ticker := time.NewTicker(500 * time.Millisecond)
    for range ticker.C {
        rate := smoother.Step()
        pacingFactory.SetRate(peerID, rate)
    }
}()

Debugging

import "github.com/pion/logging"

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel = logging.LogLevelDebug

pacingFactory := pacing.NewInterceptor(
    pacing.WithLoggerFactory(loggerFactory),
)

// Logs will show:
// - Rate updates
// - Queue status
// - Packet send timing

Comparison with Other Approaches

No Pacing

Pros:
  • Lowest latency
  • Simple
Cons:
  • Bursty traffic
  • Network congestion
  • Higher packet loss

With Pacing

Pros:
  • Smooth traffic
  • Better network utilization
  • Lower congestion
  • More stable quality
Cons:
  • Added latency
  • Complexity
  • Queue management needed

Use Cases

Live Streaming

// Smooth transmission for consistent viewer experience
pacing := pacing.NewInterceptor(
    pacing.InitialRate(2_500_000), // 2.5 Mbps
    pacing.Interval(10 * time.Millisecond),
)

Video Conferencing

// Low latency with moderate smoothing
pacing := pacing.NewInterceptor(
    pacing.InitialRate(1_000_000), // 1 Mbps
    pacing.Interval(5 * time.Millisecond),
)

File Transfer

// High throughput, latency less critical
pacing := pacing.NewInterceptor(
    pacing.InitialRate(10_000_000), // 10 Mbps
    pacing.Interval(20 * time.Millisecond),
)

See Also

Reference

For more details, see the pkg.go.dev documentation.

Build docs developers (and LLMs) love