Skip to main content
The stats package provides an interceptor that records comprehensive statistics for RTP and RTCP streams, including packet counts, byte counts, jitter, loss, and round-trip time measurements.

Overview

The stats interceptor collects statistics for:
  • Inbound RTP: Received packets, bytes, jitter, loss
  • Outbound RTP: Sent packets, bytes
  • Remote Inbound: Receiver’s view of sent streams (from RTCP RR)
  • Remote Outbound: Sender’s view of received streams (from RTCP SR)

InterceptorFactory

Constructor

func NewInterceptor(opts ...Option) (*InterceptorFactory, error)
Creates a new stats interceptor factory.

Options

SetRecorderFactory

func SetRecorderFactory(f RecorderFactory) Option
Sets a custom recorder factory.
f
RecorderFactory
Factory function: func(ssrc uint32, clockRate float64) Recorder

SetNowFunc

func SetNowFunc(now func() time.Time) Option
Sets a custom time function (primarily for testing).

WithLoggerFactory

func WithLoggerFactory(loggerFactory logging.LoggerFactory) Option
Sets a custom logger factory.

Callback

OnNewPeerConnection

func (r *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback)
Sets a callback invoked when a new PeerConnection interceptor is created.
cb
NewPeerConnectionCallback
Callback: func(id string, getter Getter)

Stats Type

type Stats struct {
    InboundRTPStreamStats
    OutboundRTPStreamStats
    RemoteInboundRTPStreamStats
    RemoteOutboundRTPStreamStats
}

InboundRTPStreamStats

PacketsReceived
uint64
Total packets received
PacketsLost
int64
Total packets lost
Jitter
float64
Interarrival jitter in seconds
LastPacketReceivedTimestamp
time.Time
When the last packet was received
HeaderBytesReceived
uint64
Total header bytes received
BytesReceived
uint64
Total bytes received (headers + payload)
FIRCount
uint32
Full Intra Request count
PLICount
uint32
Picture Loss Indication count
NACKCount
uint32
NACK feedback count

OutboundRTPStreamStats

PacketsSent
uint64
Total packets sent
BytesSent
uint64
Total bytes sent (headers + payload)
HeaderBytesSent
uint64
Total header bytes sent
FIRCount
uint32
Full Intra Request count received
PLICount
uint32
Picture Loss Indication count received
NACKCount
uint32
NACK feedback count received

RemoteInboundRTPStreamStats

PacketsLost
int64
Packets lost at remote receiver
PacketsReceived
uint64
Packets received at remote receiver
Jitter
float64
Jitter at remote receiver in seconds
RoundTripTime
time.Duration
Current round-trip time
TotalRoundTripTime
time.Duration
Sum of all RTT measurements
RoundTripTimeMeasurements
uint64
Number of RTT measurements taken
FractionLost
float64
Fraction of packets lost (0.0-1.0)

RemoteOutboundRTPStreamStats

PacketsSent
uint64
Packets sent by remote sender
BytesSent
uint64
Bytes sent by remote sender
RemoteTimeStamp
time.Time
Timestamp from remote sender report
ReportsSent
uint64
Number of sender reports received
RoundTripTime
time.Duration
Current round-trip time
TotalRoundTripTime
time.Duration
Sum of all RTT measurements
RoundTripTimeMeasurements
uint64
Number of RTT measurements taken

Getter Interface

type Getter interface {
    Get(ssrc uint32) *Stats
}

Get

Retrieves statistics for a specific SSRC.
ssrc
uint32
Synchronization source identifier
Returns: *Stats - Statistics for the SSRC, or nil if not found

Usage Example

Basic Setup

import (
    "fmt"
    "time"
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/stats"
)

// Create stats interceptor
statsFactory, err := stats.NewInterceptor()
if err != nil {
    panic(err)
}

// Get notified of new peer connections
var statsGetter stats.Getter
statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
    fmt.Printf("New peer connection: %s\n", id)
    statsGetter = getter
})

// Add to registry
registry := &interceptor.Registry{}
registry.Add(statsFactory)

// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
    panic(err)
}
defer i.Close()

// Query stats later
if statsGetter != nil {
    s := statsGetter.Get(mySSRC)
    if s != nil {
        fmt.Printf("Packets received: %d\n", s.PacketsReceived)
        fmt.Printf("Packets lost: %d\n", s.PacketsLost)
        fmt.Printf("Jitter: %.2fms\n", s.Jitter*1000)
    }
}

Periodic Monitoring

import (
    "fmt"
    "time"
    "github.com/pion/interceptor/pkg/stats"
)

func monitorStats(getter stats.Getter, ssrc uint32) {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        s := getter.Get(ssrc)
        if s == nil {
            continue
        }
        
        // Inbound stats
        fmt.Printf("Inbound:\n")
        fmt.Printf("  Packets: %d received, %d lost\n",
            s.PacketsReceived, s.PacketsLost)
        fmt.Printf("  Loss rate: %.2f%%\n",
            float64(s.PacketsLost)/float64(s.PacketsReceived+uint64(s.PacketsLost))*100)
        fmt.Printf("  Jitter: %.2fms\n", s.Jitter*1000)
        fmt.Printf("  Bytes: %d (%.2f Mbps)\n",
            s.BytesReceived,
            float64(s.BytesReceived*8)/5e6) // Over 5 second interval
        
        // Outbound stats
        fmt.Printf("Outbound:\n")
        fmt.Printf("  Packets: %d sent\n", s.PacketsSent)
        fmt.Printf("  Bytes: %d (%.2f Mbps)\n",
            s.BytesSent,
            float64(s.BytesSent*8)/5e6)
        
        // Remote inbound (receiver feedback)
        if s.RoundTripTime > 0 {
            fmt.Printf("Remote Inbound:\n")
            fmt.Printf("  RTT: %v\n", s.RemoteInboundRTPStreamStats.RoundTripTime)
            fmt.Printf("  Remote jitter: %.2fms\n",
                s.RemoteInboundRTPStreamStats.Jitter*1000)
            fmt.Printf("  Remote loss: %d packets (%.2f%%)\n",
                s.RemoteInboundRTPStreamStats.PacketsLost,
                s.RemoteInboundRTPStreamStats.FractionLost*100)
        }
    }
}

WebRTC Integration

import (
    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/stats"
    "github.com/pion/webrtc/v4"
)

func createPeerConnection() (*webrtc.PeerConnection, stats.Getter, error) {
    m := &webrtc.MediaEngine{}
    if err := m.RegisterDefaultCodecs(); err != nil {
        return nil, nil, err
    }
    
    ir := &interceptor.Registry{}
    
    // Add stats interceptor
    statsFactory, _ := stats.NewInterceptor()
    var statsGetter stats.Getter
    statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
        statsGetter = getter
    })
    ir.Add(statsFactory)
    
    // Add other interceptors (they will feed stats)
    // ...
    
    api := webrtc.NewAPI(
        webrtc.WithMediaEngine(m),
        webrtc.WithInterceptorRegistry(ir),
    )
    
    pc, err := api.NewPeerConnection(webrtc.Configuration{})
    return pc, statsGetter, err
}

Quality Monitoring

type QualityMetrics struct {
    PacketLossRate  float64
    JitterMs        float64
    RTTMs           float64
    BitrateKbps     float64
}

func calculateQuality(s *stats.Stats, interval time.Duration) QualityMetrics {
    if s == nil {
        return QualityMetrics{}
    }
    
    totalPackets := s.PacketsReceived + uint64(max(s.PacketsLost, 0))
    lossRate := 0.0
    if totalPackets > 0 {
        lossRate = float64(s.PacketsLost) / float64(totalPackets)
    }
    
    rttMs := 0.0
    if s.RemoteInboundRTPStreamStats.RoundTripTime > 0 {
        rttMs = float64(s.RemoteInboundRTPStreamStats.RoundTripTime.Milliseconds())
    }
    
    bitrateKbps := float64(s.BytesReceived*8) / interval.Seconds() / 1000
    
    return QualityMetrics{
        PacketLossRate: lossRate,
        JitterMs:       s.Jitter * 1000,
        RTTMs:          rttMs,
        BitrateKbps:    bitrateKbps,
    }
}

func assessQuality(metrics QualityMetrics) string {
    if metrics.PacketLossRate > 0.05 || metrics.JitterMs > 50 || metrics.RTTMs > 200 {
        return "Poor"
    }
    if metrics.PacketLossRate > 0.02 || metrics.JitterMs > 30 || metrics.RTTMs > 100 {
        return "Fair"
    }
    return "Good"
}

Exporting Statistics

import (
    "encoding/json"
    "time"
)

type StatsSnapshot struct {
    Timestamp    time.Time     `json:"timestamp"`
    SSRC         uint32        `json:"ssrc"`
    Stats        *stats.Stats  `json:"stats"`
}

func exportStats(getter stats.Getter, ssrc uint32) ([]byte, error) {
    snapshot := StatsSnapshot{
        Timestamp: time.Now(),
        SSRC:      ssrc,
        Stats:     getter.Get(ssrc),
    }
    
    return json.MarshalIndent(snapshot, "", "  ")
}

Recorder Interface

The internal recorder interface that can be customized:
type Recorder interface {
    QueueIncomingRTP(ts time.Time, buf []byte, attr interceptor.Attributes)
    QueueIncomingRTCP(ts time.Time, buf []byte, attr interceptor.Attributes)
    QueueOutgoingRTP(ts time.Time, header *rtp.Header, payload []byte, attr interceptor.Attributes)
    QueueOutgoingRTCP(ts time.Time, pkts []rtcp.Packet, attr interceptor.Attributes)
    GetStats() Stats
    Stop()
    Start()
}

Performance Considerations

  • Memory: Maintains per-SSRC state
  • CPU: Minimal overhead per packet
  • Thread-safe: All operations are mutex-protected
  • Accuracy: Jitter calculated according to RFC 3550

Use Cases

Adaptive Bitrate

if s := getter.Get(ssrc); s != nil {
    // Reduce bitrate if loss is high
    if s.RemoteInboundRTPStreamStats.FractionLost > 0.05 {
        reduceEncoderBitrate()
    }
}

Network Diagnostics

if s := getter.Get(ssrc); s != nil {
    if s.RemoteInboundRTPStreamStats.RoundTripTime > 200*time.Millisecond {
        log.Warn("High RTT detected")
    }
}

Quality of Experience

func estimateQoE(s *stats.Stats) float64 {
    // MOS-like score (1-5)
    score := 4.5
    
    // Penalize loss
    if s.PacketsLost > 0 {
        lossRate := float64(s.PacketsLost) / float64(s.PacketsReceived)
        score -= lossRate * 10
    }
    
    // Penalize jitter
    if s.Jitter > 0.030 { // 30ms
        score -= (s.Jitter - 0.030) * 20
    }
    
    return max(1.0, min(5.0, score))
}

See Also

Reference

For more details, see the pkg.go.dev documentation.

Build docs developers (and LLMs) love