Skip to main content
The cc package provides a framework for integrating bandwidth estimation algorithms into the interceptor pipeline. It serves as an adapter between congestion control algorithms and the RTP/RTCP streams.

Overview

The CC package:
  • Framework: Generic interface for bandwidth estimators
  • Integration: Connects estimators with RTP/RTCP streams
  • Callbacks: Notifies on new peer connections and bitrate changes
  • Default: Uses GCC (Google Congestion Control) by default

BandwidthEstimator Interface

type BandwidthEstimator interface {
    AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter
    WriteRTCP([]rtcp.Packet, interceptor.Attributes) error
    GetTargetBitrate() int
    OnTargetBitrateChange(f func(bitrate int))
    GetStats() map[string]any
    Close() error
}

Methods

AddStream

AddStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter
Adds a new RTP stream to the bandwidth estimator.
info
*interceptor.StreamInfo
Stream metadata
writer
interceptor.RTPWriter
RTP writer for the stream
Returns: interceptor.RTPWriter - Wrapped writer that tracks packets

WriteRTCP

WriteRTCP(pkts []rtcp.Packet, attr interceptor.Attributes) error
Processes RTCP feedback packets.
pkts
[]rtcp.Packet
RTCP packets (typically TWCC or RFC 8888 feedback)
attr
interceptor.Attributes
Packet attributes

GetTargetBitrate

GetTargetBitrate() int
Returns: int - Current target bitrate in bits per second

OnTargetBitrateChange

OnTargetBitrateChange(f func(bitrate int))
Sets a callback invoked when target bitrate changes.
f
func(bitrate int)
Callback function receiving new bitrate in bps

GetStats

GetStats() map[string]any
Returns: map[string]any - Estimator-specific statistics

Close

Close() error
Closes the bandwidth estimator.

InterceptorFactory

Type Definition

type InterceptorFactory struct {
    // contains filtered or unexported fields
}

Constructor

func NewInterceptor(
    factory BandwidthEstimatorFactory,
    opts ...Option,
) (*InterceptorFactory, error)
Creates a new CC interceptor factory.
factory
BandwidthEstimatorFactory
Factory function to create bandwidth estimators. If nil, uses GCC by default.
opts
...Option
Additional options (currently none defined)

BandwidthEstimatorFactory

type BandwidthEstimatorFactory func() (BandwidthEstimator, error)

Callbacks

OnNewPeerConnection

func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback)
Sets a callback invoked when a new bandwidth estimator is created.
cb
NewPeerConnectionCallback
Callback: func(id string, estimator BandwidthEstimator)

Usage Example

Using Default GCC

import (
    "fmt"
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/cc"
)

// Create CC interceptor with default GCC
ccFactory, err := cc.NewInterceptor(nil) // nil = use GCC
if err != nil {
    panic(err)
}

// Get notified of new connections
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
    fmt.Printf("New peer: %s\n", id)
    
    // Monitor bitrate changes
    estimator.OnTargetBitrateChange(func(bitrate int) {
        fmt.Printf("Peer %s bitrate: %d bps\n", id, bitrate)
    })
})

// Add to registry
registry := &interceptor.Registry{}
registry.Add(ccFactory)

Using GCC with Custom Config

import (
    "github.com/pion/interceptor/pkg/cc"
    "github.com/pion/interceptor/pkg/gcc"
)

// Create GCC factory with custom settings
gccFactory := func() (cc.BandwidthEstimator, error) {
    return gcc.NewSendSideBWE(
        gcc.SendSideBWEInitialBitrate(2_000_000),  // 2 Mbps
        gcc.SendSideBWEMinBitrate(500_000),        // 500 Kbps
        gcc.SendSideBWEMaxBitrate(10_000_000),     // 10 Mbps
    )
}

// Create CC interceptor
ccFactory, _ := cc.NewInterceptor(gccFactory)

Complete WebRTC Setup

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/cc"
    "github.com/pion/interceptor/pkg/gcc"
    "github.com/pion/interceptor/pkg/twcc"
    "github.com/pion/webrtc/v4"
)

func setupCongestionControl() (*webrtc.PeerConnection, error) {
    m := &webrtc.MediaEngine{}
    if err := m.RegisterDefaultCodecs(); err != nil {
        return nil, err
    }
    
    // Register TWCC extension
    const twccURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
    if err := m.RegisterHeaderExtension(
        webrtc.RTPHeaderExtensionCapability{URI: twccURI},
        webrtc.RTPCodecTypeVideo,
    ); err != nil {
        return nil, err
    }
    
    ir := &interceptor.Registry{}
    
    // 1. Add TWCC header extension (sender)
    twccHdr, _ := twcc.NewHeaderExtensionInterceptor()
    ir.Add(twccHdr)
    
    // 2. Add TWCC feedback (receiver)
    twccSender, _ := twcc.NewSenderInterceptor()
    ir.Add(twccSender)
    
    // 3. Add CC with GCC (sender)
    gccFactory := func() (cc.BandwidthEstimator, error) {
        return gcc.NewSendSideBWE(
            gcc.SendSideBWEInitialBitrate(1_000_000),
        )
    }
    
    ccFactory, _ := cc.NewInterceptor(gccFactory)
    
    // Monitor all peer connections
    ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
        log.Printf("CC enabled for peer %s", id)
        
        estimator.OnTargetBitrateChange(func(bitrate int) {
            log.Printf("Target bitrate: %d bps (%.2f Mbps)",
                bitrate, float64(bitrate)/1e6)
        })
    })
    
    ir.Add(ccFactory)
    
    api := webrtc.NewAPI(
        webrtc.WithMediaEngine(m),
        webrtc.WithInterceptorRegistry(ir),
    )
    
    return api.NewPeerConnection(webrtc.Configuration{})
}

Custom Bandwidth Estimator

import (
    "sync"
    "time"
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/cc"
)

type SimpleBWE struct {
    mu             sync.Mutex
    targetBitrate  int
    onChange       func(int)
}

func NewSimpleBWE(initialBitrate int) (*SimpleBWE, error) {
    return &SimpleBWE{
        targetBitrate: initialBitrate,
    }, nil
}

func (b *SimpleBWE) AddStream(
    info *interceptor.StreamInfo,
    writer interceptor.RTPWriter,
) interceptor.RTPWriter {
    // Just pass through, no packet tracking
    return writer
}

func (b *SimpleBWE) WriteRTCP(
    pkts []rtcp.Packet,
    attr interceptor.Attributes,
) error {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    // Simple logic: reduce on any feedback
    // (In reality, you'd analyze the feedback)
    newBitrate := int(float64(b.targetBitrate) * 0.95)
    if newBitrate != b.targetBitrate {
        b.targetBitrate = newBitrate
        if b.onChange != nil {
            b.onChange(newBitrate)
        }
    }
    
    return nil
}

func (b *SimpleBWE) GetTargetBitrate() int {
    b.mu.Lock()
    defer b.mu.Unlock()
    return b.targetBitrate
}

func (b *SimpleBWE) OnTargetBitrateChange(f func(bitrate int)) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.onChange = f
}

func (b *SimpleBWE) GetStats() map[string]any {
    return map[string]any{
        "targetBitrate": b.GetTargetBitrate(),
    }
}

func (b *SimpleBWE) Close() error {
    return nil
}

// Use custom estimator
simpleFactory := func() (cc.BandwidthEstimator, error) {
    return NewSimpleBWE(1_000_000)
}

ccFactory, _ := cc.NewInterceptor(simpleFactory)

Multi-Peer Management

type PeerManager struct {
    peers map[string]cc.BandwidthEstimator
    mu    sync.Mutex
}

func (pm *PeerManager) OnNewPeer(id string, estimator cc.BandwidthEstimator) {
    pm.mu.Lock()
    pm.peers[id] = estimator
    pm.mu.Unlock()
    
    estimator.OnTargetBitrateChange(func(bitrate int) {
        log.Printf("Peer %s: %d bps", id, bitrate)
        pm.updateGlobalPolicy()
    })
}

func (pm *PeerManager) updateGlobalPolicy() {
    pm.mu.Lock()
    defer pm.mu.Unlock()
    
    // Example: Limit total bandwidth across all peers
    totalBandwidth := 5_000_000 // 5 Mbps total
    perPeerBandwidth := totalBandwidth / len(pm.peers)
    
    for id, estimator := range pm.peers {
        current := estimator.GetTargetBitrate()
        if current > perPeerBandwidth {
            log.Printf("Limiting peer %s from %d to %d bps",
                id, current, perPeerBandwidth)
            // Note: Can't directly set bitrate, would need custom estimator
        }
    }
}

// Setup
manager := &PeerManager{
    peers: make(map[string]cc.BandwidthEstimator),
}

ccFactory, _ := cc.NewInterceptor(nil)
ccFactory.OnNewPeerConnection(manager.OnNewPeer)

Statistics Monitoring

func monitorCC(estimator cc.BandwidthEstimator) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        bitrate := estimator.GetTargetBitrate()
        stats := estimator.GetStats()
        
        log.Printf("CC Stats:")
        log.Printf("  Target bitrate: %d bps (%.2f Mbps)",
            bitrate, float64(bitrate)/1e6)
        
        for key, value := range stats {
            log.Printf("  %s: %v", key, value)
        }
    }
}

// Use with callback
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
    go monitorCC(estimator)
})

Integration Points

With Encoder

type VideoEncoder interface {
    SetBitrate(bps int) error
}

func connectEncoderToCC(encoder VideoEncoder, estimator cc.BandwidthEstimator) {
    estimator.OnTargetBitrateChange(func(bitrate int) {
        if err := encoder.SetBitrate(bitrate); err != nil {
            log.Printf("Failed to set encoder bitrate: %v", err)
        }
    })
}

With Pacing

See Pacing Package for integration examples.

With Stats

import "github.com/pion/interceptor/pkg/stats"

func integrateStatsWithCC(
    statsGetter stats.Getter,
    estimator cc.BandwidthEstimator,
    ssrc uint32,
) {
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            s := statsGetter.Get(ssrc)
            if s == nil {
                continue
            }
            
            bitrate := estimator.GetTargetBitrate()
            
            // Compare target vs actual
            actualBitrate := float64(s.BytesSent * 8) // Last second
            utilization := actualBitrate / float64(bitrate)
            
            log.Printf("Bandwidth utilization: %.1f%%", utilization*100)
        }
    }()
}

See Also

Reference

For more details, see the pkg.go.dev documentation.

Build docs developers (and LLMs) love