Skip to main content

Overview

This page showcases real-world interceptor implementations from the Pion Interceptor library. Each example demonstrates different patterns and techniques you can use in your own interceptors.

NACK Generator/Responder Example

The NACK (Negative Acknowledgment) system is a complete example of sender/receiver coordination. This example from examples/nack/main.go:1 shows how to use NACK interceptors for packet loss recovery.

Setting Up NACK Receiver

package main

import (
    "fmt"
    "log"
    "net"
    "time"

    "github.com/pion/interceptor"
    "github.com/pion/interceptor/pkg/nack"
    "github.com/pion/rtcp"
    "github.com/pion/rtp"
)

const (
    listenPort = 6420
    mtu        = 1500
    ssrc       = 5000
)

func receiveRoutine() {
    serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
    if err != nil {
        panic(err)
    }

    conn, err := net.ListenUDP("udp4", serverAddr)
    if err != nil {
        panic(err)
    }

    // Create NACK Generator
    generatorFactory, err := nack.NewGeneratorInterceptor()
    if err != nil {
        panic(err)
    }

    generator, err := generatorFactory.NewInterceptor("")
    if err != nil {
        panic(err)
    }

    // Create interceptor chain
    chain := interceptor.NewChain([]interceptor.Interceptor{generator})

    // Bind stream reader
    streamReader := chain.BindRemoteStream(
        &interceptor.StreamInfo{
            SSRC:         ssrc,
            RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
        },
        interceptor.RTPReaderFunc(
            func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
                return len(b), nil, nil
            },
        ),
    )

    // Bind RTCP writer for sending NACKs
    chain.BindRTCPWriter(interceptor.RTCPWriterFunc(
        func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
            buf, err := rtcp.Marshal(pkts)
            if err != nil {
                return 0, err
            }
            return conn.WriteTo(buf, addr)
        },
    ))

    // Read incoming packets
    for buffer := make([]byte, mtu); ; {
        i, _, err := conn.ReadFrom(buffer)
        if err != nil {
            panic(err)
        }

        log.Println("Received RTP")

        if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
            panic(err)
        }
    }
}

Setting Up NACK Sender

func sendRoutine() {
    serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
    if err != nil {
        panic(err)
    }

    conn, err := net.DialUDP("udp4", nil, serverAddr)
    if err != nil {
        panic(err)
    }

    // Create NACK Responder
    responderFactory, err := nack.NewResponderInterceptor()
    if err != nil {
        panic(err)
    }

    responder, err := responderFactory.NewInterceptor("")
    if err != nil {
        panic(err)
    }

    chain := interceptor.NewChain([]interceptor.Interceptor{responder})

    // Bind RTCP reader to receive NACKs
    rtcpReader := chain.BindRTCPReader(
        interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
            return len(in), nil, nil
        }),
    )

    // Bind stream writer
    streamWriter := chain.BindLocalStream(
        &interceptor.StreamInfo{
            SSRC:         ssrc,
            RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
        },
        interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, _ interceptor.Attributes) (int, error) {
            headerBuf, err := header.Marshal()
            if err != nil {
                return 0, err
            }
            return conn.Write(append(headerBuf, payload...))
        }),
    )

    // Read RTCP NACKs in background
    go func() {
        for rtcpBuf := make([]byte, mtu); ; {
            i, err := conn.Read(rtcpBuf)
            if err != nil {
                panic(err)
            }

            log.Println("Received NACK")

            if _, _, err = rtcpReader.Read(rtcpBuf[:i], nil); err != nil {
                panic(err)
            }
        }
    }()

    // Send RTP packets
    for sequenceNumber := uint16(0); ; sequenceNumber++ {
        if _, err := streamWriter.Write(&rtp.Header{
            Version:        2,
            SSRC:           ssrc,
            SequenceNumber: sequenceNumber,
        }, []byte{0x0, 0x1, 0x2}, nil); err != nil {
            fmt.Println(err)
        }

        time.Sleep(time.Millisecond * 200)
    }
}
The NACK example demonstrates the complete lifecycle: detecting packet loss, generating NACKs, receiving NACKs, and retransmitting packets.

Sender Report Generator

The sender report interceptor from pkg/report/sender_interceptor.go:1 shows how to maintain per-stream state and generate periodic RTCP reports.

Key Patterns

type SenderInterceptor struct {
    interceptor.NoOp
    interval      time.Duration
    now           func() time.Time
    newTicker     TickerFactory
    streams       sync.Map
    log           logging.LeveledLogger
    loggerFactory logging.LoggerFactory
    m             sync.Mutex
    wg            sync.WaitGroup
    close         chan struct{}
}

// BindRTCPWriter starts the report generation loop
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
    s.m.Lock()
    defer s.m.Unlock()

    if s.isClosed() {
        return writer
    }

    s.wg.Add(1)
    go s.loop(writer)

    return writer
}

// loop generates reports on an interval
func (s *SenderInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
    defer s.wg.Done()

    ticker := s.newTicker(s.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.Ch():
            now := s.now()
            s.streams.Range(func(_, value any) bool {
                if stream, ok := value.(*senderStream); ok {
                    if _, err := rtcpWriter.Write(
                        []rtcp.Packet{stream.generateReport(now)},
                        interceptor.Attributes{},
                    ); err != nil {
                        s.log.Warnf("failed sending: %+v", err)
                    }
                }
                return true
            })

        case <-s.close:
            return
        }
    }
}

// BindLocalStream tracks per-stream statistics
func (s *SenderInterceptor) BindLocalStream(
    info *interceptor.StreamInfo,
    writer interceptor.RTPWriter,
) interceptor.RTPWriter {
    stream := newSenderStream(info.SSRC, info.ClockRate, s.useLatestPacket)
    s.streams.Store(info.SSRC, stream)

    return interceptor.RTPWriterFunc(
        func(header *rtp.Header, payload []byte, a interceptor.Attributes) (int, error) {
            stream.processRTP(s.now(), header, payload)
            return writer.Write(header, payload, a)
        },
    )
}

// UnbindLocalStream cleans up when stream ends
func (s *SenderInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
    s.streams.Delete(info.SSRC)
}

// Close ensures clean shutdown
func (s *SenderInterceptor) Close() error {
    defer s.wg.Wait()
    s.m.Lock()
    defer s.m.Unlock()

    if !s.isClosed() {
        close(s.close)
    }

    return nil
}

func (s *SenderInterceptor) isClosed() bool {
    select {
    case <-s.close:
        return true
    default:
        return false
    }
}
This pattern is excellent for any interceptor that needs to:
  • Maintain per-stream state
  • Run background tasks
  • Generate periodic reports
  • Clean up resources gracefully

Packet Dump Interceptor

The packet dump interceptor from pkg/packetdump/sender_interceptor.go:1 demonstrates simple packet inspection:
type SenderInterceptor struct {
    interceptor.NoOp
    *PacketDumper
}

// BindRTCPWriter logs outgoing RTCP packets
func (s *SenderInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
    return interceptor.RTCPWriterFunc(
        func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
            s.logRTCPPackets(pkts, attributes)
            return writer.Write(pkts, attributes)
        },
    )
}

// BindLocalStream logs outgoing RTP packets
func (s *SenderInterceptor) BindLocalStream(
    _ *interceptor.StreamInfo,
    writer interceptor.RTPWriter,
) interceptor.RTPWriter {
    return interceptor.RTPWriterFunc(
        func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
            s.logRTPPacket(header, payload, attributes)
            return writer.Write(header, payload, attributes)
        },
    )
}

// Close cleans up resources
func (s *SenderInterceptor) Close() error {
    return s.PacketDumper.Close()
}
This pattern is perfect for debugging, monitoring, or analyzing packet flows without modifying them.

NACK Generator Implementation Details

The NACK generator from pkg/nack/generator_interceptor.go:1 shows advanced state management:
type GeneratorInterceptor struct {
    interceptor.NoOp
    streamsFilter     func(info *interceptor.StreamInfo) bool
    size              uint16
    skipLastN         uint16
    maxNacksPerPacket uint16
    interval          time.Duration
    m                 sync.Mutex
    wg                sync.WaitGroup
    close             chan struct{}
    log               logging.LeveledLogger
    loggerFactory     logging.LoggerFactory
    nackCountLogs     map[uint32]map[uint16]uint16
    receiveLogs       map[uint32]*receiveLog
    receiveLogsMu     sync.Mutex
}

// BindRemoteStream tracks incoming packets
func (n *GeneratorInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    // Filter streams based on RTCP feedback
    if !n.streamsFilter(info) {
        return reader
    }

    // Create receive log for this stream
    receiveLog, _ := newReceiveLog(n.size)
    n.receiveLogsMu.Lock()
    n.receiveLogs[info.SSRC] = receiveLog
    n.receiveLogsMu.Unlock()

    return interceptor.RTPReaderFunc(
        func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
            i, attr, err := reader.Read(b, a)
            if err != nil {
                return 0, nil, err
            }

            if attr == nil {
                attr = make(interceptor.Attributes)
            }

            header, err := attr.GetRTPHeader(b[:i])
            if err != nil {
                return 0, nil, err
            }

            // Track sequence number
            receiveLog.add(header.SequenceNumber)

            return i, attr, nil
        },
    )
}

// UnbindRemoteStream cleans up state
func (n *GeneratorInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
    n.receiveLogsMu.Lock()
    delete(n.receiveLogs, info.SSRC)
    delete(n.nackCountLogs, info.SSRC)
    n.receiveLogsMu.Unlock()
}
Always protect shared state with proper locking, especially when accessed from multiple goroutines and lifecycle methods.

Chaining Interceptors

The Chain implementation from chain.go:1 shows how to compose multiple interceptors:
type Chain struct {
    interceptors []Interceptor
}

func NewChain(interceptors []Interceptor) *Chain {
    return &Chain{interceptors: interceptors}
}

// BindLocalStream chains all interceptors
func (i *Chain) BindLocalStream(ctx *StreamInfo, writer RTPWriter) RTPWriter {
    for _, interceptor := range i.interceptors {
        writer = interceptor.BindLocalStream(ctx, writer)
    }
    return writer
}

// Close closes all interceptors and collects errors
func (i *Chain) Close() error {
    var errs []error
    for _, interceptor := range i.interceptors {
        errs = append(errs, interceptor.Close())
    }
    return flattenErrs(errs)
}

Using Multiple Interceptors

// Create multiple interceptors
statsFactory, _ := stats.NewReceiverInterceptor()
statsInterceptor, _ := statsFactory.NewInterceptor("")

nackFactory, _ := nack.NewGeneratorInterceptor()
nackInterceptor, _ := nackFactory.NewInterceptor("")

reportFactory, _ := report.NewReceiverInterceptor()
reportInterceptor, _ := reportFactory.NewInterceptor("")

// Chain them together
chain := interceptor.NewChain([]interceptor.Interceptor{
    statsInterceptor,
    nackInterceptor,
    reportInterceptor,
})

// Use the chain like a single interceptor
streamReader := chain.BindRemoteStream(streamInfo, reader)
Interceptors in a chain are executed in order. Place packet modifiers before statistics collectors to ensure accurate counts.

Next Steps

Custom Interceptors

Learn how to build your own interceptors

Best Practices

Patterns for robust interceptor development

Build docs developers (and LLMs) love