Overview
This page showcases real-world interceptor implementations from the Pion Interceptor library. Each example demonstrates different patterns and techniques you can use in your own interceptors.NACK Generator/Responder Example
The NACK (Negative Acknowledgment) system is a complete example of sender/receiver coordination. This example fromexamples/nack/main.go:1 shows how to use NACK interceptors for packet loss recovery.
Setting Up NACK Receiver
package main
import (
"fmt"
"log"
"net"
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
const (
listenPort = 6420
mtu = 1500
ssrc = 5000
)
func receiveRoutine() {
serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
if err != nil {
panic(err)
}
conn, err := net.ListenUDP("udp4", serverAddr)
if err != nil {
panic(err)
}
// Create NACK Generator
generatorFactory, err := nack.NewGeneratorInterceptor()
if err != nil {
panic(err)
}
generator, err := generatorFactory.NewInterceptor("")
if err != nil {
panic(err)
}
// Create interceptor chain
chain := interceptor.NewChain([]interceptor.Interceptor{generator})
// Bind stream reader
streamReader := chain.BindRemoteStream(
&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
},
interceptor.RTPReaderFunc(
func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(b), nil, nil
},
),
)
// Bind RTCP writer for sending NACKs
chain.BindRTCPWriter(interceptor.RTCPWriterFunc(
func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
buf, err := rtcp.Marshal(pkts)
if err != nil {
return 0, err
}
return conn.WriteTo(buf, addr)
},
))
// Read incoming packets
for buffer := make([]byte, mtu); ; {
i, _, err := conn.ReadFrom(buffer)
if err != nil {
panic(err)
}
log.Println("Received RTP")
if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
panic(err)
}
}
}
Setting Up NACK Sender
func sendRoutine() {
serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
if err != nil {
panic(err)
}
conn, err := net.DialUDP("udp4", nil, serverAddr)
if err != nil {
panic(err)
}
// Create NACK Responder
responderFactory, err := nack.NewResponderInterceptor()
if err != nil {
panic(err)
}
responder, err := responderFactory.NewInterceptor("")
if err != nil {
panic(err)
}
chain := interceptor.NewChain([]interceptor.Interceptor{responder})
// Bind RTCP reader to receive NACKs
rtcpReader := chain.BindRTCPReader(
interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(in), nil, nil
}),
)
// Bind stream writer
streamWriter := chain.BindLocalStream(
&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
},
interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
headerBuf, err := header.Marshal()
if err != nil {
return 0, err
}
return conn.Write(append(headerBuf, payload...))
}),
)
// Read RTCP NACKs in background
go func() {
for rtcpBuf := make([]byte, mtu); ; {
i, err := conn.Read(rtcpBuf)
if err != nil {
panic(err)
}
log.Println("Received NACK")
if _, _, err = rtcpReader.Read(rtcpBuf[:i], nil); err != nil {
panic(err)
}
}
}()
// Send RTP packets
for sequenceNumber := uint16(0); ; sequenceNumber++ {
if _, err := streamWriter.Write(&rtp.Header{
Version: 2,
SSRC: ssrc,
SequenceNumber: sequenceNumber,
}, []byte{0x0, 0x1, 0x2}, nil); err != nil {
fmt.Println(err)
}
time.Sleep(time.Millisecond * 200)
}
}
The NACK example demonstrates the complete lifecycle: detecting packet loss, generating NACKs, receiving NACKs, and retransmitting packets.
Sender Report Generator
The sender report interceptor frompkg/report/sender_interceptor.go:1 shows how to maintain per-stream state and generate periodic RTCP reports.
Key Patterns
type SenderInterceptor struct {
interceptor.NoOp
interval time.Duration
now func() time.Time
newTicker TickerFactory
streams sync.Map
log logging.LeveledLogger
loggerFactory logging.LoggerFactory
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
}
// BindRTCPWriter starts the report generation loop
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
s.m.Lock()
defer s.m.Unlock()
if s.isClosed() {
return writer
}
s.wg.Add(1)
go s.loop(writer)
return writer
}
// loop generates reports on an interval
func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer s.wg.Done()
ticker := s.newTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ticker.Ch():
now := s.now()
s.streams.Range(func(_, value any) bool {
if stream, ok := value.(*senderStream); ok {
if _, err := rtcpWriter.Write(
[]rtcp.Packet{stream.generateReport(now)},
interceptor.Attributes{},
); err != nil {
s.log.Warnf("failed sending: %+v", err)
}
}
return true
})
case <-s.close:
return
}
}
}
// BindLocalStream tracks per-stream statistics
func (s *SenderInterceptor) BindLocalStream(
info *interceptor.StreamInfo,
writer interceptor.RTPWriter,
) interceptor.RTPWriter {
stream := newSenderStream(info.SSRC, info.ClockRate, s.useLatestPacket)
s.streams.Store(info.SSRC, stream)
return interceptor.RTPWriterFunc(
func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) {
stream.processRTP(s.now(), header, payload)
return writer.Write(header, payload, a)
},
)
}
// UnbindLocalStream cleans up when stream ends
func (s *SenderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
s.streams.Delete(info.SSRC)
}
// Close ensures clean shutdown
func (s *SenderInterceptor) Close() error {
defer s.wg.Wait()
s.m.Lock()
defer s.m.Unlock()
if !s.isClosed() {
close(s.close)
}
return nil
}
func (s *SenderInterceptor) isClosed() bool {
select {
case <-s.close:
return true
default:
return false
}
}
This pattern is excellent for any interceptor that needs to:
- Maintain per-stream state
- Run background tasks
- Generate periodic reports
- Clean up resources gracefully
Packet Dump Interceptor
The packet dump interceptor frompkg/packetdump/sender_interceptor.go:1 demonstrates simple packet inspection:
type SenderInterceptor struct {
interceptor.NoOp
*PacketDumper
}
// BindRTCPWriter logs outgoing RTCP packets
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
return interceptor.RTCPWriterFunc(
func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
s.logRTCPPackets(pkts, attributes)
return writer.Write(pkts, attributes)
},
)
}
// BindLocalStream logs outgoing RTP packets
func (s *SenderInterceptor) BindLocalStream(
_ *interceptor.StreamInfo,
writer interceptor.RTPWriter,
) interceptor.RTPWriter {
return interceptor.RTPWriterFunc(
func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
s.logRTPPacket(header, payload, attributes)
return writer.Write(header, payload, attributes)
},
)
}
// Close cleans up resources
func (s *SenderInterceptor) Close() error {
return s.PacketDumper.Close()
}
This pattern is perfect for debugging, monitoring, or analyzing packet flows without modifying them.
NACK Generator Implementation Details
The NACK generator frompkg/nack/generator_interceptor.go:1 shows advanced state management:
type GeneratorInterceptor struct {
interceptor.NoOp
streamsFilter func(info *interceptor.StreamInfo) bool
size uint16
skipLastN uint16
maxNacksPerPacket uint16
interval time.Duration
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
loggerFactory logging.LoggerFactory
nackCountLogs map[uint32]map[uint16]uint16
receiveLogs map[uint32]*receiveLog
receiveLogsMu sync.Mutex
}
// BindRemoteStream tracks incoming packets
func (n *GeneratorInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
// Filter streams based on RTCP feedback
if !n.streamsFilter(info) {
return reader
}
// Create receive log for this stream
receiveLog, _ := newReceiveLog(n.size)
n.receiveLogsMu.Lock()
n.receiveLogs[info.SSRC] = receiveLog
n.receiveLogsMu.Unlock()
return interceptor.RTPReaderFunc(
func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}
if attr == nil {
attr = make(interceptor.Attributes)
}
header, err := attr.GetRTPHeader(b[:i])
if err != nil {
return 0, nil, err
}
// Track sequence number
receiveLog.add(header.SequenceNumber)
return i, attr, nil
},
)
}
// UnbindRemoteStream cleans up state
func (n *GeneratorInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
n.receiveLogsMu.Lock()
delete(n.receiveLogs, info.SSRC)
delete(n.nackCountLogs, info.SSRC)
n.receiveLogsMu.Unlock()
}
Always protect shared state with proper locking, especially when accessed from multiple goroutines and lifecycle methods.
Chaining Interceptors
TheChain implementation from chain.go:1 shows how to compose multiple interceptors:
type Chain struct {
interceptors []Interceptor
}
func NewChain(interceptors []Interceptor) *Chain {
return &Chain{interceptors: interceptors}
}
// BindLocalStream chains all interceptors
func (i *Chain) BindLocalStream(ctx *StreamInfo, writer RTPWriter) RTPWriter {
for _, interceptor := range i.interceptors {
writer = interceptor.BindLocalStream(ctx, writer)
}
return writer
}
// Close closes all interceptors and collects errors
func (i *Chain) Close() error {
var errs []error
for _, interceptor := range i.interceptors {
errs = append(errs, interceptor.Close())
}
return flattenErrs(errs)
}
Using Multiple Interceptors
// Create multiple interceptors
statsFactory, _ := stats.NewReceiverInterceptor()
statsInterceptor, _ := statsFactory.NewInterceptor("")
nackFactory, _ := nack.NewGeneratorInterceptor()
nackInterceptor, _ := nackFactory.NewInterceptor("")
reportFactory, _ := report.NewReceiverInterceptor()
reportInterceptor, _ := reportFactory.NewInterceptor("")
// Chain them together
chain := interceptor.NewChain([]interceptor.Interceptor{
statsInterceptor,
nackInterceptor,
reportInterceptor,
})
// Use the chain like a single interceptor
streamReader := chain.BindRemoteStream(streamInfo, reader)
Interceptors in a chain are executed in order. Place packet modifiers before statistics collectors to ensure accurate counts.
Next Steps
Custom Interceptors
Learn how to build your own interceptors
Best Practices
Patterns for robust interceptor development