The pacing package implements packet pacing using a token bucket algorithm to smooth out packet transmission and prevent network congestion from bursty traffic.
Overview
Pacing smooths packet transmission by:
- Rate Limiting: Controls sending rate to target bitrate
- Burst Control: Prevents large packet bursts
- Queue Management: Buffers packets when rate limit exceeded
- Adaptive: Adjusts rate based on congestion control
InterceptorFactory
Type Definition
type InterceptorFactory struct {
// contains filtered or unexported fields
}
Constructor
func NewInterceptor(opts ...Option) *InterceptorFactory
Creates a new pacing interceptor factory.
Options
InitialRate
func InitialRate(rate int) Option
Sets the initial pacing rate.
Initial bitrate in bits per second. Default: 1,000,000 (1 Mbps)
Interval
func Interval(interval time.Duration) Option
Sets the pacing interval (how often packets are sent).
Time between pacing cycles. Default: 5ms
WithLoggerFactory
func WithLoggerFactory(loggerFactory logging.LoggerFactory) Option
Sets a custom logger factory.
Methods
SetRate
func (f *InterceptorFactory) SetRate(id string, r int)
Updates the pacing rate for a specific interceptor instance.
New bitrate in bits per second
Usage Example
Basic Setup
import (
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/pacing"
)
// Create pacing interceptor
pacingFactory := pacing.NewInterceptor(
pacing.InitialRate(2_000_000), // 2 Mbps
pacing.Interval(5 * time.Millisecond),
)
// Add to registry
registry := &interceptor.Registry{}
registry.Add(pacingFactory)
// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
panic(err)
}
defer i.Close()
Dynamic Rate Adjustment
import (
"github.com/pion/interceptor/pkg/pacing"
"github.com/pion/interceptor/pkg/stats"
)
func setupAdaptivePacing() (*pacing.InterceptorFactory, stats.Getter, error) {
// Create pacing factory
pacingFactory := pacing.NewInterceptor(
pacing.InitialRate(1_000_000),
)
// Create stats interceptor
statsFactory, _ := stats.NewInterceptor()
var statsGetter stats.Getter
statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
statsGetter = getter
// Periodically adjust pacing based on conditions
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
if s := getter.Get(mySSRC); s != nil {
// Reduce rate if high loss
lossRate := float64(s.PacketsLost) / float64(s.PacketsReceived)
if lossRate > 0.05 {
newRate := 800_000 // 800 Kbps
pacingFactory.SetRate(id, newRate)
log.Printf("Reduced pacing rate to %d bps", newRate)
}
}
}
}()
})
return pacingFactory, statsGetter, nil
}
Integration with GCC
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/interceptor/pkg/pacing"
)
func setupPacingWithGCC() (*interceptor.Registry, error) {
registry := &interceptor.Registry{}
// Create pacing interceptor
pacingFactory := pacing.NewInterceptor(
pacing.InitialRate(1_000_000),
pacing.Interval(10 * time.Millisecond),
)
// Create GCC with custom pacer
gccFactory := func() (cc.BandwidthEstimator, error) {
// Note: This is conceptual - actual integration is more complex
return gcc.NewSendSideBWE(
gcc.SendSideBWEInitialBitrate(1_000_000),
)
}
ccFactory, _ := cc.NewInterceptor(gccFactory)
// Update pacing rate when GCC changes target bitrate
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
estimator.OnTargetBitrateChange(func(bitrate int) {
pacingFactory.SetRate(id, bitrate)
log.Printf("Pacing rate updated to %d bps", bitrate)
})
})
registry.Add(pacingFactory)
registry.Add(ccFactory)
return registry, nil
}
How It Works
Token Bucket Algorithm
- Token Generation: Tokens added at target bitrate
- Token Consumption: Each packet consumes tokens (8 bits per byte)
- Queue When Empty: Packets queued if insufficient tokens
- Send When Available: Packets sent when tokens available
// Simplified concept:
type TokenBucket struct {
rate int // bits per second
burst int // maximum burst size in bits
tokens float64 // current token count
lastUpdate time.Time
}
func (tb *TokenBucket) Allow(packetSize int) bool {
now := time.Now()
elapsed := now.Sub(tb.lastUpdate)
// Add tokens based on elapsed time
tokensToAdd := float64(tb.rate) * elapsed.Seconds()
tb.tokens = min(tb.tokens + tokensToAdd, float64(tb.burst))
tb.lastUpdate = now
// Check if enough tokens
requiredTokens := float64(packetSize * 8) // bits
if tb.tokens >= requiredTokens {
tb.tokens -= requiredTokens
return true
}
return false
}
Burst Size Calculation
// Burst = (bitrate / interval_frequency) * 8
// Allows sending rate/interval_frequency bits per interval
rate := 1_000_000 // 1 Mbps
interval := 5 * time.Millisecond
frequency := float64(time.Second) / float64(interval) // 200 Hz
burst := int(float64(rate) / frequency) // 5000 bits
Latency
- Minimum: ~interval (e.g., 5ms)
- Maximum: Depends on queue size and rate
- Typical: 5-20ms added latency
Queue Behavior
// Queue size: 1,000,000 bytes (default)
// At 1 Mbps: Can buffer ~8 seconds of video
// Queue full: Returns errPacerOverflow
CPU Usage
- Background goroutine runs every interval
- Minimal CPU per packet
- Token bucket calculations: O(1)
Configuration Guidelines
Interval Selection
Fast (1-5ms):
- Lower latency
- More frequent packet sends
- Higher CPU overhead
- Better for interactive applications
Normal (5-10ms):
- Balanced approach
- Good for most use cases
- Recommended
Slow (10-20ms):
- Higher latency
- Lower CPU overhead
- Suitable for non-interactive streams
Rate Selection
// Video streaming
initialRate := 2_000_000 // 2 Mbps for HD video
// Audio streaming
initialRate := 128_000 // 128 Kbps for high-quality audio
// Screen sharing
initialRate := 500_000 // 500 Kbps
Error Handling
Queue Overflow
var errPacerOverflow = errors.New("pacer queue overflow")
// Returned when queue is full
// Application should:
// 1. Reduce encoder bitrate
// 2. Drop frames
// 3. Wait and retry
Pacer Closed
var errPacerClosed = errors.New("pacer closed")
// Returned after Close() is called
// Application should stop sending
Advanced Usage
Multi-Stream Pacing
The pacer automatically handles multiple streams:
// Pacer manages all streams for a PeerConnection
// Token bucket shared across streams
// Fair queuing between streams
Custom Pacer
import "golang.org/x/time/rate"
type CustomPacer struct {
limiter *rate.Limiter
queue chan Packet
}
func NewCustomPacer(rateLimit int) *CustomPacer {
return &CustomPacer{
limiter: rate.NewLimiter(rate.Limit(rateLimit), rateLimit/10),
queue: make(chan Packet, 1000),
}
}
func (p *CustomPacer) Write(
header *rtp.Header,
payload []byte,
attributes interceptor.Attributes,
) (int, error) {
pkt := Packet{header, payload, attributes}
select {
case p.queue <- pkt:
return header.MarshalSize() + len(payload), nil
default:
return 0, errors.New("queue full")
}
}
func (p *CustomPacer) sender() {
for pkt := range p.queue {
// Wait for rate limiter
if err := p.limiter.WaitN(context.Background(), len(pkt.payload)); err != nil {
continue
}
// Send packet
pkt.writer.Write(pkt.header, pkt.payload, pkt.attributes)
}
}
Rate Smoothing
type RateSmoother struct {
target int
current int
stepSize int
}
func (s *RateSmoother) Update(newTarget int) {
s.target = newTarget
}
func (s *RateSmoother) Step() int {
if s.current < s.target {
s.current = min(s.current + s.stepSize, s.target)
} else if s.current > s.target {
s.current = max(s.current - s.stepSize, s.target)
}
return s.current
}
// Use smoother with pacing
smoother := &RateSmoother{
current: 1_000_000,
stepSize: 100_000, // 100 Kbps steps
}
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
for range ticker.C {
rate := smoother.Step()
pacingFactory.SetRate(peerID, rate)
}
}()
Debugging
import "github.com/pion/logging"
loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel = logging.LogLevelDebug
pacingFactory := pacing.NewInterceptor(
pacing.WithLoggerFactory(loggerFactory),
)
// Logs will show:
// - Rate updates
// - Queue status
// - Packet send timing
Comparison with Other Approaches
No Pacing
Pros:
Cons:
- Bursty traffic
- Network congestion
- Higher packet loss
With Pacing
Pros:
- Smooth traffic
- Better network utilization
- Lower congestion
- More stable quality
Cons:
- Added latency
- Complexity
- Queue management needed
Use Cases
Live Streaming
// Smooth transmission for consistent viewer experience
pacing := pacing.NewInterceptor(
pacing.InitialRate(2_500_000), // 2.5 Mbps
pacing.Interval(10 * time.Millisecond),
)
Video Conferencing
// Low latency with moderate smoothing
pacing := pacing.NewInterceptor(
pacing.InitialRate(1_000_000), // 1 Mbps
pacing.Interval(5 * time.Millisecond),
)
File Transfer
// High throughput, latency less critical
pacing := pacing.NewInterceptor(
pacing.InitialRate(10_000_000), // 10 Mbps
pacing.Interval(20 * time.Millisecond),
)
See Also
Reference
For more details, see the pkg.go.dev documentation.