The rtpfb package implements feedback aggregation for CCFB (Congestion Control Feedback) and TWCC (Transport-Wide Congestion Control) packets, providing detailed packet reports for congestion control algorithms.
Overview
The rtpfb package:
- Tracks Outgoing Packets: Records packet departure times and sequence numbers
- Processes Feedback: Handles TWCC and RFC 8888 CCFB feedback
- Generates Reports: Creates detailed packet reports with RTT and loss information
- Attributes Integration: Adds reports to interceptor attributes for easy access
Interceptor
Tracks packets and generates feedback reports.
Factory
type InterceptorFactory struct {
// contains filtered or unexported fields
}
Constructor
func NewInterceptor(opts ...Option) (*InterceptorFactory, error)
Creates a new CCFB/TWCC feedback interceptor factory.
Options
WithLoggerFactory
func WithLoggerFactory(lf logging.LoggerFactory) Option
Sets a custom logger factory.
Report Type
The report structure added to interceptor attributes:
type Report struct {
Arrival time.Time
RTT time.Duration
PacketReports []PacketReport
}
When the feedback report arrived
Calculated round-trip time
Individual packet reports
PacketReport Type
type PacketReport struct {
SSRC uint32
SequenceNumber uint16
TransportSequence uint16
Departure time.Time
Arrival time.Time
Size int
}
Synchronization source identifier
Transport-wide sequence number (TWCC)
When packet was received (from feedback)
Attributes Key
const CCFBAttributesKey ccfbAttributesKeyType = iota
Use this key to retrieve reports from interceptor attributes:
if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
// Use report
}
Usage Example
Basic Setup
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/rtpfb"
)
// Create RTPFB interceptor
rtpfbInt, err := rtpfb.NewInterceptor()
if err != nil {
panic(err)
}
// Add to registry
registry := &interceptor.Registry{}
registry.Add(rtpfbInt)
// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
panic(err)
}
defer i.Close()
Reading Reports
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/rtpfb"
)
func processRTCPWithReports(reader interceptor.RTCPReader) {
buf := make([]byte, 1500)
for {
n, attr, err := reader.Read(buf, nil)
if err != nil {
break
}
// Check for CCFB report in attributes
if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
fmt.Printf("RTT: %v\n", report.RTT)
fmt.Printf("Packets in report: %d\n", len(report.PacketReports))
for _, pkt := range report.PacketReports {
delay := pkt.Arrival.Sub(pkt.Departure)
fmt.Printf("Packet %d: delay=%v, size=%d\n",
pkt.SequenceNumber, delay, pkt.Size)
}
}
}
}
Integration with Congestion Control
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/cc"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/interceptor/pkg/rtpfb"
"github.com/pion/interceptor/pkg/twcc"
)
func setupCompleteCC() (*interceptor.Registry, error) {
registry := &interceptor.Registry{}
// 1. Track packets and generate reports
rtpfbInt, _ := rtpfb.NewInterceptor()
registry.Add(rtpfbInt)
// 2. Add TWCC header extensions (sender)
twccHdr, _ := twcc.NewHeaderExtensionInterceptor()
registry.Add(twccHdr)
// 3. Generate TWCC feedback (receiver)
twccSender, _ := twcc.NewSenderInterceptor()
registry.Add(twccSender)
// 4. Bandwidth estimation (sender)
gccFactory := func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE()
}
ccFactory, _ := cc.NewInterceptor(gccFactory)
registry.Add(ccFactory)
return registry, nil
}
How It Works
Sender Side
- Bind Local Stream: Intercepts outgoing RTP packets
- Record Departure: Stores:
- SSRC
- RTP sequence number
- Transport sequence number (if TWCC)
- Packet size
- Departure time
- Maintain History: Keeps recent packet info for matching with feedback
Feedback Reception
- Bind RTCP Reader: Intercepts incoming RTCP
- Detect Feedback: Identifies TWCC or RFC 8888 CCFB packets
- Match Packets: Correlates feedback with sent packets
- Calculate Metrics:
- One-way delay (arrival - departure)
- RTT (feedback arrival - departure - pending time)
- Generate Report: Creates PacketReport for each acknowledged packet
- Add to Attributes: Attaches report using CCFBAttributesKey
Supported Feedback Types
TWCC (TransportLayerCC)
type TransportLayerCC struct {
SenderSSRC uint32
MediaSSRC uint32
BaseSequenceNumber uint16
PacketStatusCount uint16
ReferenceTime uint32
FbPktCount uint8
PacketChunks []PacketStatusChunk
RecvDeltas []*RecvDelta
}
RFC 8888 CCFB
type CCFeedbackReport struct {
SenderSSRC uint32
MediaSSRC uint32
ReportTimestamp uint32
EcnCounts ECNCounts
PacketReports []PacketReport
}
Metric Calculations
Round-Trip Time (RTT)
// Shortest RTT from all acknowledged packets
RTT = min(feedback_arrival - packet_departure - feedback_delay)
// Where:
// feedback_arrival: When feedback RTCP was received
// packet_departure: When RTP packet was sent
// feedback_delay: Time between packet arrival and feedback sent
One-Way Delay
// For each packet
OWD = packet_arrival - packet_departure
// Where:
// packet_arrival: From feedback report
// packet_departure: Recorded when sent
Delay Variation (Jitter)
// Application can calculate from PacketReports
func calculateJitter(reports []PacketReport) time.Duration {
if len(reports) < 2 {
return 0
}
var deltas []time.Duration
for i := 1; i < len(reports); i++ {
prev := reports[i-1].Arrival.Sub(reports[i-1].Departure)
curr := reports[i].Arrival.Sub(reports[i].Departure)
delta := curr - prev
if delta < 0 {
delta = -delta
}
deltas = append(deltas, delta)
}
// Calculate mean absolute deviation
var sum time.Duration
for _, d := range deltas {
sum += d
}
return sum / time.Duration(len(deltas))
}
Advanced Usage
Custom Congestion Controller
import "github.com/pion/interceptor/pkg/rtpfb"
type CustomCC struct {
targetBitrate int
}
func (c *CustomCC) ProcessFeedback(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
n, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}
// Get RTPFB report
if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
c.updateBitrate(report)
}
return n, attr, err
})
}
func (c *CustomCC) updateBitrate(report rtpfb.Report) {
// Analyze packet reports
var totalDelay time.Duration
var lostPackets int
for i, pkt := range report.PacketReports {
delay := pkt.Arrival.Sub(pkt.Departure)
totalDelay += delay
// Check for loss (sequence number gap)
if i > 0 {
expectedSeq := report.PacketReports[i-1].SequenceNumber + 1
if pkt.SequenceNumber != expectedSeq {
lostPackets += int(pkt.SequenceNumber - expectedSeq)
}
}
}
avgDelay := totalDelay / time.Duration(len(report.PacketReports))
lossRate := float64(lostPackets) / float64(len(report.PacketReports))
// Simple adaptation logic
if lossRate > 0.05 || avgDelay > 200*time.Millisecond {
c.targetBitrate = int(float64(c.targetBitrate) * 0.85) // Reduce 15%
} else if lossRate < 0.01 && avgDelay < 100*time.Millisecond {
c.targetBitrate = int(float64(c.targetBitrate) * 1.08) // Increase 8%
}
log.Printf("Updated bitrate: %d bps (loss=%.1f%%, delay=%v)",
c.targetBitrate, lossRate*100, avgDelay)
}
Exporting Metrics
import "encoding/csv"
type MetricsExporter struct {
file *os.File
writer *csv.Writer
}
func NewMetricsExporter(path string) (*MetricsExporter, error) {
f, err := os.Create(path)
if err != nil {
return nil, err
}
w := csv.NewWriter(f)
// Write header
w.Write([]string{"Time", "SSRC", "SeqNum", "Departure", "Arrival", "Delay", "Size"})
return &MetricsExporter{file: f, writer: w}, nil
}
func (e *MetricsExporter) ExportReport(report rtpfb.Report) error {
for _, pkt := range report.PacketReports {
delay := pkt.Arrival.Sub(pkt.Departure)
record := []string{
report.Arrival.Format(time.RFC3339Nano),
fmt.Sprintf("%d", pkt.SSRC),
fmt.Sprintf("%d", pkt.SequenceNumber),
pkt.Departure.Format(time.RFC3339Nano),
pkt.Arrival.Format(time.RFC3339Nano),
fmt.Sprintf("%d", delay.Microseconds()),
fmt.Sprintf("%d", pkt.Size),
}
if err := e.writer.Write(record); err != nil {
return err
}
}
e.writer.Flush()
return e.writer.Error()
}
func (e *MetricsExporter) Close() error {
e.writer.Flush()
return e.file.Close()
}
Memory
- Stores recent packet history
- ~50 bytes per packet
- History size depends on feedback interval and packet rate
- Typical: 100-500 packets = 5-25 KB per stream
CPU
- Packet recording: O(1)
- Feedback matching: O(n) where n = packets in feedback
- Minimal overhead
Debugging
import "github.com/pion/logging"
loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel = logging.LogLevelTrace
rtpfbInt, _ := rtpfb.NewInterceptor(
rtpfb.WithLoggerFactory(loggerFactory),
)
// Logs will show:
// - Packets tracked
// - Feedback received
// - RTT calculations
// - Report generation
Limitations
- Requires feedback (TWCC or RFC 8888)
- History size limited by memory
- Clock skew can affect delay measurements
- Doesn’t handle SSRC changes
See Also
Reference
For more details, see the pkg.go.dev documentation.