Skip to main content
The rtpfb package implements feedback aggregation for CCFB (Congestion Control Feedback) and TWCC (Transport-Wide Congestion Control) packets, providing detailed packet reports for congestion control algorithms.

Overview

The rtpfb package:
  • Tracks Outgoing Packets: Records packet departure times and sequence numbers
  • Processes Feedback: Handles TWCC and RFC 8888 CCFB feedback
  • Generates Reports: Creates detailed packet reports with RTT and loss information
  • Attributes Integration: Adds reports to interceptor attributes for easy access

Interceptor

Tracks packets and generates feedback reports.

Factory

type InterceptorFactory struct {
    // contains filtered or unexported fields
}

Constructor

func NewInterceptor(opts ...Option) (*InterceptorFactory, error)
Creates a new CCFB/TWCC feedback interceptor factory.

Options

WithLoggerFactory

func WithLoggerFactory(lf logging.LoggerFactory) Option
Sets a custom logger factory.

Report Type

The report structure added to interceptor attributes:
type Report struct {
    Arrival       time.Time
    RTT           time.Duration
    PacketReports []PacketReport
}
Arrival
time.Time
When the feedback report arrived
RTT
time.Duration
Calculated round-trip time
PacketReports
[]PacketReport
Individual packet reports

PacketReport Type

type PacketReport struct {
    SSRC              uint32
    SequenceNumber    uint16
    TransportSequence uint16
    Departure         time.Time
    Arrival           time.Time
    Size              int
}
SSRC
uint32
Synchronization source identifier
SequenceNumber
uint16
RTP sequence number
TransportSequence
uint16
Transport-wide sequence number (TWCC)
Departure
time.Time
When packet was sent
Arrival
time.Time
When packet was received (from feedback)
Size
int
Packet size in bytes

Attributes Key

const CCFBAttributesKey ccfbAttributesKeyType = iota
Use this key to retrieve reports from interceptor attributes:
if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
    // Use report
}

Usage Example

Basic Setup

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/rtpfb"
)

// Create RTPFB interceptor
rtpfbInt, err := rtpfb.NewInterceptor()
if err != nil {
    panic(err)
}

// Add to registry
registry := &interceptor.Registry{}
registry.Add(rtpfbInt)

// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
    panic(err)
}
defer i.Close()

Reading Reports

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/rtpfb"
)

func processRTCPWithReports(reader interceptor.RTCPReader) {
    buf := make([]byte, 1500)
    
    for {
        n, attr, err := reader.Read(buf, nil)
        if err != nil {
            break
        }
        
        // Check for CCFB report in attributes
        if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
            fmt.Printf("RTT: %v\n", report.RTT)
            fmt.Printf("Packets in report: %d\n", len(report.PacketReports))
            
            for _, pkt := range report.PacketReports {
                delay := pkt.Arrival.Sub(pkt.Departure)
                fmt.Printf("Packet %d: delay=%v, size=%d\n",
                    pkt.SequenceNumber, delay, pkt.Size)
            }
        }
    }
}

Integration with Congestion Control

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/cc"
    "github.com/pion/interceptor/pkg/gcc"
    "github.com/pion/interceptor/pkg/rtpfb"
    "github.com/pion/interceptor/pkg/twcc"
)

func setupCompleteCC() (*interceptor.Registry, error) {
    registry := &interceptor.Registry{}
    
    // 1. Track packets and generate reports
    rtpfbInt, _ := rtpfb.NewInterceptor()
    registry.Add(rtpfbInt)
    
    // 2. Add TWCC header extensions (sender)
    twccHdr, _ := twcc.NewHeaderExtensionInterceptor()
    registry.Add(twccHdr)
    
    // 3. Generate TWCC feedback (receiver)
    twccSender, _ := twcc.NewSenderInterceptor()
    registry.Add(twccSender)
    
    // 4. Bandwidth estimation (sender)
    gccFactory := func() (cc.BandwidthEstimator, error) {
        return gcc.NewSendSideBWE()
    }
    ccFactory, _ := cc.NewInterceptor(gccFactory)
    registry.Add(ccFactory)
    
    return registry, nil
}

How It Works

Sender Side

  1. Bind Local Stream: Intercepts outgoing RTP packets
  2. Record Departure: Stores:
    • SSRC
    • RTP sequence number
    • Transport sequence number (if TWCC)
    • Packet size
    • Departure time
  3. Maintain History: Keeps recent packet info for matching with feedback

Feedback Reception

  1. Bind RTCP Reader: Intercepts incoming RTCP
  2. Detect Feedback: Identifies TWCC or RFC 8888 CCFB packets
  3. Match Packets: Correlates feedback with sent packets
  4. Calculate Metrics:
    • One-way delay (arrival - departure)
    • RTT (feedback arrival - departure - pending time)
  5. Generate Report: Creates PacketReport for each acknowledged packet
  6. Add to Attributes: Attaches report using CCFBAttributesKey

Supported Feedback Types

TWCC (TransportLayerCC)

type TransportLayerCC struct {
    SenderSSRC         uint32
    MediaSSRC          uint32
    BaseSequenceNumber uint16
    PacketStatusCount  uint16
    ReferenceTime      uint32
    FbPktCount         uint8
    PacketChunks       []PacketStatusChunk
    RecvDeltas         []*RecvDelta
}

RFC 8888 CCFB

type CCFeedbackReport struct {
    SenderSSRC      uint32
    MediaSSRC       uint32
    ReportTimestamp uint32
    EcnCounts       ECNCounts
    PacketReports   []PacketReport
}

Metric Calculations

Round-Trip Time (RTT)

// Shortest RTT from all acknowledged packets
RTT = min(feedback_arrival - packet_departure - feedback_delay)

// Where:
// feedback_arrival: When feedback RTCP was received
// packet_departure: When RTP packet was sent
// feedback_delay: Time between packet arrival and feedback sent

One-Way Delay

// For each packet
OWD = packet_arrival - packet_departure

// Where:
// packet_arrival: From feedback report
// packet_departure: Recorded when sent

Delay Variation (Jitter)

// Application can calculate from PacketReports
func calculateJitter(reports []PacketReport) time.Duration {
    if len(reports) < 2 {
        return 0
    }
    
    var deltas []time.Duration
    for i := 1; i < len(reports); i++ {
        prev := reports[i-1].Arrival.Sub(reports[i-1].Departure)
        curr := reports[i].Arrival.Sub(reports[i].Departure)
        delta := curr - prev
        if delta < 0 {
            delta = -delta
        }
        deltas = append(deltas, delta)
    }
    
    // Calculate mean absolute deviation
    var sum time.Duration
    for _, d := range deltas {
        sum += d
    }
    return sum / time.Duration(len(deltas))
}

Advanced Usage

Custom Congestion Controller

import "github.com/pion/interceptor/pkg/rtpfb"

type CustomCC struct {
    targetBitrate int
}

func (c *CustomCC) ProcessFeedback(reader interceptor.RTCPReader) interceptor.RTCPReader {
    return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
        n, attr, err := reader.Read(b, a)
        if err != nil {
            return 0, nil, err
        }
        
        // Get RTPFB report
        if report, ok := attr.Get(rtpfb.CCFBAttributesKey).(rtpfb.Report); ok {
            c.updateBitrate(report)
        }
        
        return n, attr, err
    })
}

func (c *CustomCC) updateBitrate(report rtpfb.Report) {
    // Analyze packet reports
    var totalDelay time.Duration
    var lostPackets int
    
    for i, pkt := range report.PacketReports {
        delay := pkt.Arrival.Sub(pkt.Departure)
        totalDelay += delay
        
        // Check for loss (sequence number gap)
        if i > 0 {
            expectedSeq := report.PacketReports[i-1].SequenceNumber + 1
            if pkt.SequenceNumber != expectedSeq {
                lostPackets += int(pkt.SequenceNumber - expectedSeq)
            }
        }
    }
    
    avgDelay := totalDelay / time.Duration(len(report.PacketReports))
    lossRate := float64(lostPackets) / float64(len(report.PacketReports))
    
    // Simple adaptation logic
    if lossRate > 0.05 || avgDelay > 200*time.Millisecond {
        c.targetBitrate = int(float64(c.targetBitrate) * 0.85) // Reduce 15%
    } else if lossRate < 0.01 && avgDelay < 100*time.Millisecond {
        c.targetBitrate = int(float64(c.targetBitrate) * 1.08) // Increase 8%
    }
    
    log.Printf("Updated bitrate: %d bps (loss=%.1f%%, delay=%v)",
        c.targetBitrate, lossRate*100, avgDelay)
}

Exporting Metrics

import "encoding/csv"

type MetricsExporter struct {
    file   *os.File
    writer *csv.Writer
}

func NewMetricsExporter(path string) (*MetricsExporter, error) {
    f, err := os.Create(path)
    if err != nil {
        return nil, err
    }
    
    w := csv.NewWriter(f)
    // Write header
    w.Write([]string{"Time", "SSRC", "SeqNum", "Departure", "Arrival", "Delay", "Size"})
    
    return &MetricsExporter{file: f, writer: w}, nil
}

func (e *MetricsExporter) ExportReport(report rtpfb.Report) error {
    for _, pkt := range report.PacketReports {
        delay := pkt.Arrival.Sub(pkt.Departure)
        record := []string{
            report.Arrival.Format(time.RFC3339Nano),
            fmt.Sprintf("%d", pkt.SSRC),
            fmt.Sprintf("%d", pkt.SequenceNumber),
            pkt.Departure.Format(time.RFC3339Nano),
            pkt.Arrival.Format(time.RFC3339Nano),
            fmt.Sprintf("%d", delay.Microseconds()),
            fmt.Sprintf("%d", pkt.Size),
        }
        if err := e.writer.Write(record); err != nil {
            return err
        }
    }
    e.writer.Flush()
    return e.writer.Error()
}

func (e *MetricsExporter) Close() error {
    e.writer.Flush()
    return e.file.Close()
}

Performance Characteristics

Memory

  • Stores recent packet history
  • ~50 bytes per packet
  • History size depends on feedback interval and packet rate
  • Typical: 100-500 packets = 5-25 KB per stream

CPU

  • Packet recording: O(1)
  • Feedback matching: O(n) where n = packets in feedback
  • Minimal overhead

Debugging

import "github.com/pion/logging"

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel = logging.LogLevelTrace

rtpfbInt, _ := rtpfb.NewInterceptor(
    rtpfb.WithLoggerFactory(loggerFactory),
)

// Logs will show:
// - Packets tracked
// - Feedback received
// - RTT calculations
// - Report generation

Limitations

  • Requires feedback (TWCC or RFC 8888)
  • History size limited by memory
  • Clock skew can affect delay measurements
  • Doesn’t handle SSRC changes

See Also

Reference

For more details, see the pkg.go.dev documentation.

Build docs developers (and LLMs) love