The cc package provides a framework for integrating bandwidth estimation algorithms into the interceptor pipeline. It serves as an adapter between congestion control algorithms and the RTP/RTCP streams.
Overview
The CC package:
- Framework: Generic interface for bandwidth estimators
- Integration: Connects estimators with RTP/RTCP streams
- Callbacks: Notifies on new peer connections and bitrate changes
- Default: Uses GCC (Google Congestion Control) by default
BandwidthEstimator Interface
type BandwidthEstimator interface {
AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter
WriteRTCP([]rtcp.Packet, interceptor.Attributes) error
GetTargetBitrate() int
OnTargetBitrateChange(f func(bitrate int))
GetStats() map[string]any
Close() error
}
Methods
AddStream
AddStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter
Adds a new RTP stream to the bandwidth estimator.
RTP writer for the stream
Returns: interceptor.RTPWriter - Wrapped writer that tracks packets
WriteRTCP
WriteRTCP(pkts []rtcp.Packet, attr interceptor.Attributes) error
Processes RTCP feedback packets.
RTCP packets (typically TWCC or RFC 8888 feedback)
GetTargetBitrate
Returns: int - Current target bitrate in bits per second
OnTargetBitrateChange
OnTargetBitrateChange(f func(bitrate int))
Sets a callback invoked when target bitrate changes.
Callback function receiving new bitrate in bps
GetStats
GetStats() map[string]any
Returns: map[string]any - Estimator-specific statistics
Close
Closes the bandwidth estimator.
InterceptorFactory
Type Definition
type InterceptorFactory struct {
// contains filtered or unexported fields
}
Constructor
func NewInterceptor(
factory BandwidthEstimatorFactory,
opts ...Option,
) (*InterceptorFactory, error)
Creates a new CC interceptor factory.
factory
BandwidthEstimatorFactory
Factory function to create bandwidth estimators. If nil, uses GCC by default.
Additional options (currently none defined)
BandwidthEstimatorFactory
type BandwidthEstimatorFactory func() (BandwidthEstimator, error)
Callbacks
OnNewPeerConnection
func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback)
Sets a callback invoked when a new bandwidth estimator is created.
cb
NewPeerConnectionCallback
Callback: func(id string, estimator BandwidthEstimator)
Usage Example
Using Default GCC
import (
"fmt"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
)
// Create CC interceptor with default GCC
ccFactory, err := cc.NewInterceptor(nil) // nil = use GCC
if err != nil {
panic(err)
}
// Get notified of new connections
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
fmt.Printf("New peer: %s\n", id)
// Monitor bitrate changes
estimator.OnTargetBitrateChange(func(bitrate int) {
fmt.Printf("Peer %s bitrate: %d bps\n", id, bitrate)
})
})
// Add to registry
registry := &interceptor.Registry{}
registry.Add(ccFactory)
Using GCC with Custom Config
import (
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
)
// Create GCC factory with custom settings
gccFactory := func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
gcc.SendSideBWEInitialBitrate(2_000_000), // 2 Mbps
gcc.SendSideBWEMinBitrate(500_000), // 500 Kbps
gcc.SendSideBWEMaxBitrate(10_000_000), // 10 Mbps
)
}
// Create CC interceptor
ccFactory, _ := cc.NewInterceptor(gccFactory)
Complete WebRTC Setup
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/interceptor/pkg/twcc"
"github.com/pion/webrtc/v4"
)
func setupCongestionControl() (*webrtc.PeerConnection, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
// Register TWCC extension
const twccURI = "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01"
if err := m.RegisterHeaderExtension(
webrtc.RTPHeaderExtensionCapability{URI: twccURI},
webrtc.RTPCodecTypeVideo,
); err != nil {
return nil, err
}
ir := &interceptor.Registry{}
// 1. Add TWCC header extension (sender)
twccHdr, _ := twcc.NewHeaderExtensionInterceptor()
ir.Add(twccHdr)
// 2. Add TWCC feedback (receiver)
twccSender, _ := twcc.NewSenderInterceptor()
ir.Add(twccSender)
// 3. Add CC with GCC (sender)
gccFactory := func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
gcc.SendSideBWEInitialBitrate(1_000_000),
)
}
ccFactory, _ := cc.NewInterceptor(gccFactory)
// Monitor all peer connections
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
log.Printf("CC enabled for peer %s", id)
estimator.OnTargetBitrateChange(func(bitrate int) {
log.Printf("Target bitrate: %d bps (%.2f Mbps)",
bitrate, float64(bitrate)/1e6)
})
})
ir.Add(ccFactory)
api := webrtc.NewAPI(
webrtc.WithMediaEngine(m),
webrtc.WithInterceptorRegistry(ir),
)
return api.NewPeerConnection(webrtc.Configuration{})
}
Custom Bandwidth Estimator
import (
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
)
type SimpleBWE struct {
mu sync.Mutex
targetBitrate int
onChange func(int)
}
func NewSimpleBWE(initialBitrate int) (*SimpleBWE, error) {
return &SimpleBWE{
targetBitrate: initialBitrate,
}, nil
}
func (b *SimpleBWE) AddStream(
info *interceptor.StreamInfo,
writer interceptor.RTPWriter,
) interceptor.RTPWriter {
// Just pass through, no packet tracking
return writer
}
func (b *SimpleBWE) WriteRTCP(
pkts []rtcp.Packet,
attr interceptor.Attributes,
) error {
b.mu.Lock()
defer b.mu.Unlock()
// Simple logic: reduce on any feedback
// (In reality, you'd analyze the feedback)
newBitrate := int(float64(b.targetBitrate) * 0.95)
if newBitrate != b.targetBitrate {
b.targetBitrate = newBitrate
if b.onChange != nil {
b.onChange(newBitrate)
}
}
return nil
}
func (b *SimpleBWE) GetTargetBitrate() int {
b.mu.Lock()
defer b.mu.Unlock()
return b.targetBitrate
}
func (b *SimpleBWE) OnTargetBitrateChange(f func(bitrate int)) {
b.mu.Lock()
defer b.mu.Unlock()
b.onChange = f
}
func (b *SimpleBWE) GetStats() map[string]any {
return map[string]any{
"targetBitrate": b.GetTargetBitrate(),
}
}
func (b *SimpleBWE) Close() error {
return nil
}
// Use custom estimator
simpleFactory := func() (cc.BandwidthEstimator, error) {
return NewSimpleBWE(1_000_000)
}
ccFactory, _ := cc.NewInterceptor(simpleFactory)
Multi-Peer Management
type PeerManager struct {
peers map[string]cc.BandwidthEstimator
mu sync.Mutex
}
func (pm *PeerManager) OnNewPeer(id string, estimator cc.BandwidthEstimator) {
pm.mu.Lock()
pm.peers[id] = estimator
pm.mu.Unlock()
estimator.OnTargetBitrateChange(func(bitrate int) {
log.Printf("Peer %s: %d bps", id, bitrate)
pm.updateGlobalPolicy()
})
}
func (pm *PeerManager) updateGlobalPolicy() {
pm.mu.Lock()
defer pm.mu.Unlock()
// Example: Limit total bandwidth across all peers
totalBandwidth := 5_000_000 // 5 Mbps total
perPeerBandwidth := totalBandwidth / len(pm.peers)
for id, estimator := range pm.peers {
current := estimator.GetTargetBitrate()
if current > perPeerBandwidth {
log.Printf("Limiting peer %s from %d to %d bps",
id, current, perPeerBandwidth)
// Note: Can't directly set bitrate, would need custom estimator
}
}
}
// Setup
manager := &PeerManager{
peers: make(map[string]cc.BandwidthEstimator),
}
ccFactory, _ := cc.NewInterceptor(nil)
ccFactory.OnNewPeerConnection(manager.OnNewPeer)
Statistics Monitoring
func monitorCC(estimator cc.BandwidthEstimator) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
bitrate := estimator.GetTargetBitrate()
stats := estimator.GetStats()
log.Printf("CC Stats:")
log.Printf(" Target bitrate: %d bps (%.2f Mbps)",
bitrate, float64(bitrate)/1e6)
for key, value := range stats {
log.Printf(" %s: %v", key, value)
}
}
}
// Use with callback
ccFactory.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
go monitorCC(estimator)
})
Integration Points
With Encoder
type VideoEncoder interface {
SetBitrate(bps int) error
}
func connectEncoderToCC(encoder VideoEncoder, estimator cc.BandwidthEstimator) {
estimator.OnTargetBitrateChange(func(bitrate int) {
if err := encoder.SetBitrate(bitrate); err != nil {
log.Printf("Failed to set encoder bitrate: %v", err)
}
})
}
With Pacing
See Pacing Package for integration examples.
With Stats
import "github.com/pion/interceptor/pkg/stats"
func integrateStatsWithCC(
statsGetter stats.Getter,
estimator cc.BandwidthEstimator,
ssrc uint32,
) {
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
s := statsGetter.Get(ssrc)
if s == nil {
continue
}
bitrate := estimator.GetTargetBitrate()
// Compare target vs actual
actualBitrate := float64(s.BytesSent * 8) // Last second
utilization := actualBitrate / float64(bitrate)
log.Printf("Bandwidth utilization: %.1f%%", utilization*100)
}
}()
}
See Also
Reference
For more details, see the pkg.go.dev documentation.