The stats package provides an interceptor that records comprehensive statistics for RTP and RTCP streams, including packet counts, byte counts, jitter, loss, and round-trip time measurements.
Overview
The stats interceptor collects statistics for:
- Inbound RTP: Received packets, bytes, jitter, loss
- Outbound RTP: Sent packets, bytes
- Remote Inbound: Receiver’s view of sent streams (from RTCP RR)
- Remote Outbound: Sender’s view of received streams (from RTCP SR)
InterceptorFactory
Constructor
func NewInterceptor(opts ...Option) (*InterceptorFactory, error)
Creates a new stats interceptor factory.
Options
SetRecorderFactory
func SetRecorderFactory(f RecorderFactory) Option
Sets a custom recorder factory.
Factory function: func(ssrc uint32, clockRate float64) Recorder
SetNowFunc
func SetNowFunc(now func() time.Time) Option
Sets a custom time function (primarily for testing).
WithLoggerFactory
func WithLoggerFactory(loggerFactory logging.LoggerFactory) Option
Sets a custom logger factory.
Callback
OnNewPeerConnection
func (r *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback)
Sets a callback invoked when a new PeerConnection interceptor is created.
cb
NewPeerConnectionCallback
Callback: func(id string, getter Getter)
Stats Type
type Stats struct {
InboundRTPStreamStats
OutboundRTPStreamStats
RemoteInboundRTPStreamStats
RemoteOutboundRTPStreamStats
}
InboundRTPStreamStats
Interarrival jitter in seconds
LastPacketReceivedTimestamp
When the last packet was received
Total header bytes received
Total bytes received (headers + payload)
Picture Loss Indication count
OutboundRTPStreamStats
Total bytes sent (headers + payload)
Full Intra Request count received
Picture Loss Indication count received
NACK feedback count received
RemoteInboundRTPStreamStats
Packets lost at remote receiver
Packets received at remote receiver
Jitter at remote receiver in seconds
Sum of all RTT measurements
RoundTripTimeMeasurements
Number of RTT measurements taken
Fraction of packets lost (0.0-1.0)
RemoteOutboundRTPStreamStats
Packets sent by remote sender
Bytes sent by remote sender
Timestamp from remote sender report
Number of sender reports received
Sum of all RTT measurements
RoundTripTimeMeasurements
Number of RTT measurements taken
Getter Interface
type Getter interface {
Get(ssrc uint32) *Stats
}
Get
Retrieves statistics for a specific SSRC.
Synchronization source identifier
Returns: *Stats - Statistics for the SSRC, or nil if not found
Usage Example
Basic Setup
import (
"fmt"
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/stats"
)
// Create stats interceptor
statsFactory, err := stats.NewInterceptor()
if err != nil {
panic(err)
}
// Get notified of new peer connections
var statsGetter stats.Getter
statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
fmt.Printf("New peer connection: %s\n", id)
statsGetter = getter
})
// Add to registry
registry := &interceptor.Registry{}
registry.Add(statsFactory)
// Build interceptor
i, err := registry.Build("peer-connection-1")
if err != nil {
panic(err)
}
defer i.Close()
// Query stats later
if statsGetter != nil {
s := statsGetter.Get(mySSRC)
if s != nil {
fmt.Printf("Packets received: %d\n", s.PacketsReceived)
fmt.Printf("Packets lost: %d\n", s.PacketsLost)
fmt.Printf("Jitter: %.2fms\n", s.Jitter*1000)
}
}
Periodic Monitoring
import (
"fmt"
"time"
"github.com/pion/interceptor/pkg/stats"
)
func monitorStats(getter stats.Getter, ssrc uint32) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
s := getter.Get(ssrc)
if s == nil {
continue
}
// Inbound stats
fmt.Printf("Inbound:\n")
fmt.Printf(" Packets: %d received, %d lost\n",
s.PacketsReceived, s.PacketsLost)
fmt.Printf(" Loss rate: %.2f%%\n",
float64(s.PacketsLost)/float64(s.PacketsReceived+uint64(s.PacketsLost))*100)
fmt.Printf(" Jitter: %.2fms\n", s.Jitter*1000)
fmt.Printf(" Bytes: %d (%.2f Mbps)\n",
s.BytesReceived,
float64(s.BytesReceived*8)/5e6) // Over 5 second interval
// Outbound stats
fmt.Printf("Outbound:\n")
fmt.Printf(" Packets: %d sent\n", s.PacketsSent)
fmt.Printf(" Bytes: %d (%.2f Mbps)\n",
s.BytesSent,
float64(s.BytesSent*8)/5e6)
// Remote inbound (receiver feedback)
if s.RoundTripTime > 0 {
fmt.Printf("Remote Inbound:\n")
fmt.Printf(" RTT: %v\n", s.RemoteInboundRTPStreamStats.RoundTripTime)
fmt.Printf(" Remote jitter: %.2fms\n",
s.RemoteInboundRTPStreamStats.Jitter*1000)
fmt.Printf(" Remote loss: %d packets (%.2f%%)\n",
s.RemoteInboundRTPStreamStats.PacketsLost,
s.RemoteInboundRTPStreamStats.FractionLost*100)
}
}
}
WebRTC Integration
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/stats"
"github.com/pion/webrtc/v4"
)
func createPeerConnection() (*webrtc.PeerConnection, stats.Getter, error) {
m := &webrtc.MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, nil, err
}
ir := &interceptor.Registry{}
// Add stats interceptor
statsFactory, _ := stats.NewInterceptor()
var statsGetter stats.Getter
statsFactory.OnNewPeerConnection(func(id string, getter stats.Getter) {
statsGetter = getter
})
ir.Add(statsFactory)
// Add other interceptors (they will feed stats)
// ...
api := webrtc.NewAPI(
webrtc.WithMediaEngine(m),
webrtc.WithInterceptorRegistry(ir),
)
pc, err := api.NewPeerConnection(webrtc.Configuration{})
return pc, statsGetter, err
}
Quality Monitoring
type QualityMetrics struct {
PacketLossRate float64
JitterMs float64
RTTMs float64
BitrateKbps float64
}
func calculateQuality(s *stats.Stats, interval time.Duration) QualityMetrics {
if s == nil {
return QualityMetrics{}
}
totalPackets := s.PacketsReceived + uint64(max(s.PacketsLost, 0))
lossRate := 0.0
if totalPackets > 0 {
lossRate = float64(s.PacketsLost) / float64(totalPackets)
}
rttMs := 0.0
if s.RemoteInboundRTPStreamStats.RoundTripTime > 0 {
rttMs = float64(s.RemoteInboundRTPStreamStats.RoundTripTime.Milliseconds())
}
bitrateKbps := float64(s.BytesReceived*8) / interval.Seconds() / 1000
return QualityMetrics{
PacketLossRate: lossRate,
JitterMs: s.Jitter * 1000,
RTTMs: rttMs,
BitrateKbps: bitrateKbps,
}
}
func assessQuality(metrics QualityMetrics) string {
if metrics.PacketLossRate > 0.05 || metrics.JitterMs > 50 || metrics.RTTMs > 200 {
return "Poor"
}
if metrics.PacketLossRate > 0.02 || metrics.JitterMs > 30 || metrics.RTTMs > 100 {
return "Fair"
}
return "Good"
}
Exporting Statistics
import (
"encoding/json"
"time"
)
type StatsSnapshot struct {
Timestamp time.Time `json:"timestamp"`
SSRC uint32 `json:"ssrc"`
Stats *stats.Stats `json:"stats"`
}
func exportStats(getter stats.Getter, ssrc uint32) ([]byte, error) {
snapshot := StatsSnapshot{
Timestamp: time.Now(),
SSRC: ssrc,
Stats: getter.Get(ssrc),
}
return json.MarshalIndent(snapshot, "", " ")
}
Recorder Interface
The internal recorder interface that can be customized:
type Recorder interface {
QueueIncomingRTP(ts time.Time, buf []byte, attr interceptor.Attributes)
QueueIncomingRTCP(ts time.Time, buf []byte, attr interceptor.Attributes)
QueueOutgoingRTP(ts time.Time, header *rtp.Header, payload []byte, attr interceptor.Attributes)
QueueOutgoingRTCP(ts time.Time, pkts []rtcp.Packet, attr interceptor.Attributes)
GetStats() Stats
Stop()
Start()
}
- Memory: Maintains per-SSRC state
- CPU: Minimal overhead per packet
- Thread-safe: All operations are mutex-protected
- Accuracy: Jitter calculated according to RFC 3550
Use Cases
Adaptive Bitrate
if s := getter.Get(ssrc); s != nil {
// Reduce bitrate if loss is high
if s.RemoteInboundRTPStreamStats.FractionLost > 0.05 {
reduceEncoderBitrate()
}
}
Network Diagnostics
if s := getter.Get(ssrc); s != nil {
if s.RemoteInboundRTPStreamStats.RoundTripTime > 200*time.Millisecond {
log.Warn("High RTT detected")
}
}
Quality of Experience
func estimateQoE(s *stats.Stats) float64 {
// MOS-like score (1-5)
score := 4.5
// Penalize loss
if s.PacketsLost > 0 {
lossRate := float64(s.PacketsLost) / float64(s.PacketsReceived)
score -= lossRate * 10
}
// Penalize jitter
if s.Jitter > 0.030 { // 30ms
score -= (s.Jitter - 0.030) * 20
}
return max(1.0, min(5.0, score))
}
See Also
Reference
For more details, see the pkg.go.dev documentation.