Skip to main content

Overview

Building reliable interceptors requires careful attention to concurrency, error handling, resource management, and performance. This guide covers essential patterns and practices learned from production interceptor implementations.

Architecture Patterns

Embed NoOp for Partial Implementation

Always embed interceptor.NoOp and override only the methods you need. This provides forward compatibility if the interface is extended.
type MyInterceptor struct {
    interceptor.NoOp  // Provides default implementations
    // Your fields here
}

// Only override what you need
func (m *MyInterceptor) BindLocalStream(
    info *interceptor.StreamInfo,
    writer interceptor.RTPWriter,
) interceptor.RTPWriter {
    // Your implementation
}

Use the Factory Pattern

The factory pattern enables flexible configuration:
// Define option functions
type MyOption func(*MyInterceptor) error

func WithBufferSize(size int) MyOption {
    return func(m *MyInterceptor) error {
        if size <= 0 {
            return errors.New("buffer size must be positive")
        }
        m.bufferSize = size
        return nil
    }
}

func WithLogger(logger logging.LeveledLogger) MyOption {
    return func(m *MyInterceptor) error {
        m.log = logger
        return nil
    }
}

// Factory struct
type MyInterceptorFactory struct {
    opts []MyOption
}

// Constructor
func NewMyInterceptor(opts ...MyOption) (*MyInterceptorFactory, error) {
    return &MyInterceptorFactory{opts: opts}, nil
}

// Implement Factory interface
func (f *MyInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
    m := &MyInterceptor{
        bufferSize: 512,  // defaults
        interval:   time.Second,
    }

    // Apply options
    for _, opt := range f.opts {
        if err := opt(m); err != nil {
            return nil, err
        }
    }

    // Initialize logger if not provided
    if m.log == nil {
        m.log = logging.NewDefaultLoggerFactory().NewLogger("my_interceptor")
    }

    return m, nil
}

Concurrency and Thread Safety

Protect Shared State

Interceptor methods can be called from multiple goroutines. Always protect shared state with proper synchronization.
type SafeInterceptor struct {
    interceptor.NoOp
    streams   map[uint32]*StreamState
    streamsMu sync.RWMutex  // Use RWMutex for read-heavy workloads
}

// Use read lock for read-only access
func (s *SafeInterceptor) getStream(ssrc uint32) *StreamState {
    s.streamsMu.RLock()
    defer s.streamsMu.RUnlock()
    return s.streams[ssrc]
}

// Use write lock for modifications
func (s *SafeInterceptor) addStream(ssrc uint32, state *StreamState) {
    s.streamsMu.Lock()
    defer s.streamsMu.Unlock()
    s.streams[ssrc] = state
}

Use sync.Map for Concurrent Access

For highly concurrent scenarios, consider sync.Map:
type ConcurrentInterceptor struct {
    interceptor.NoOp
    streams sync.Map  // map[uint32]*StreamState
}

func (c *ConcurrentInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    state := &StreamState{ssrc: info.SSRC}
    c.streams.Store(info.SSRC, state)

    return interceptor.RTPReaderFunc(
        func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
            // Access without explicit locking
            if val, ok := c.streams.Load(info.SSRC); ok {
                state := val.(*StreamState)
                // Process state
            }
            return reader.Read(b, a)
        },
    )
}

Proper Goroutine Management

type BackgroundInterceptor struct {
    interceptor.NoOp
    wg       sync.WaitGroup
    close    chan struct{}
    closeMu  sync.Mutex
}

func (b *BackgroundInterceptor) BindRTCPWriter(
    writer interceptor.RTCPWriter,
) interceptor.RTCPWriter {
    b.closeMu.Lock()
    defer b.closeMu.Unlock()

    if b.isClosed() {
        return writer
    }

    b.wg.Add(1)
    go b.backgroundTask(writer)

    return writer
}

func (b *BackgroundInterceptor) backgroundTask(writer interceptor.RTCPWriter) {
    defer b.wg.Done()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            // Do work
        case <-b.close:
            return
        }
    }
}

func (b *BackgroundInterceptor) isClosed() bool {
    select {
    case <-b.close:
        return true
    default:
        return false
    }
}

func (b *BackgroundInterceptor) Close() error {
    defer b.wg.Wait()  // Wait for all goroutines

    b.closeMu.Lock()
    defer b.closeMu.Unlock()

    if !b.isClosed() {
        close(b.close)  // Signal shutdown
    }

    return nil
}
Always use sync.WaitGroup to ensure goroutines complete before Close() returns.

Error Handling

Log Errors, Don’t Drop Packets

func (m *MyInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    return interceptor.RTPReaderFunc(
        func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
            i, attr, err := reader.Read(b, a)
            if err != nil {
                // Don't swallow errors from upstream
                return 0, nil, err
            }

            if attr == nil {
                attr = make(interceptor.Attributes)
            }

            header, err := attr.GetRTPHeader(b[:i])
            if err != nil {
                // Log but continue - don't drop the packet
                m.log.Warnf("failed to parse RTP header: %v", err)
                return i, attr, nil
            }

            // Process packet
            if err := m.processPacket(header); err != nil {
                // Log processing errors but pass the packet through
                m.log.Warnf("failed to process packet: %v", err)
            }

            return i, attr, nil
        },
    )
}
Avoid dropping packets in production interceptors unless absolutely necessary. Log errors instead and pass packets through.

Handle Errors in Background Tasks

func (b *BackgroundInterceptor) sendReports(writer interceptor.RTCPWriter) {
    packets := b.generateReports()

    if _, err := writer.Write(packets, interceptor.Attributes{}); err != nil {
        // Log but don't crash
        b.log.Warnf("failed to send reports: %v", err)
        // Optionally: implement retry logic or metrics
    }
}

Aggregate Multiple Errors

When closing multiple resources, collect all errors:
func (m *MyInterceptor) Close() error {
    var errs []error

    if err := m.closeDumper(); err != nil {
        errs = append(errs, fmt.Errorf("close dumper: %w", err))
    }

    if err := m.closeLogger(); err != nil {
        errs = append(errs, fmt.Errorf("close logger: %w", err))
    }

    if len(errs) == 0 {
        return nil
    }

    if len(errs) == 1 {
        return errs[0]
    }

    // Return a joined error
    return fmt.Errorf("multiple errors: %v", errs)
}

Resource Management

Always Clean Up in Unbind Methods

func (s *StatefulInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    // Allocate resources
    state := &StreamState{
        ssrc:   info.SSRC,
        buffer: make([]byte, s.bufferSize),
        ticker: time.NewTicker(s.interval),
    }
    s.streams.Store(info.SSRC, state)

    return reader
}

func (s *StatefulInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
    // Clean up resources
    if val, ok := s.streams.LoadAndDelete(info.SSRC); ok {
        state := val.(*StreamState)
        state.ticker.Stop()  // Stop ticker
        // Close any channels, files, etc.
    }
}
Unbind methods are called when streams end. Always clean up per-stream resources here to prevent leaks.

Implement Proper Close Lifecycle

func (m *MyInterceptor) Close() error {
    // 1. Signal shutdown
    m.closeMu.Lock()
    if !m.isClosed() {
        close(m.close)
    }
    m.closeMu.Unlock()

    // 2. Wait for goroutines (outside lock)
    m.wg.Wait()

    // 3. Clean up resources
    m.streams.Range(func(key, value any) bool {
        if state, ok := value.(*StreamState); ok {
            state.cleanup()
        }
        return true
    })

    // 4. Close external resources
    var errs []error
    if m.file != nil {
        if err := m.file.Close(); err != nil {
            errs = append(errs, err)
        }
    }

    if len(errs) > 0 {
        return errs[0]
    }
    return nil
}

Performance Optimization

Use Attributes for Caching

Attributes cache parsed RTP/RTCP data:
func (m *MyInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    return interceptor.RTPReaderFunc(
        func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
            i, attr, err := reader.Read(b, a)
            if err != nil {
                return 0, nil, err
            }

            if attr == nil {
                attr = make(interceptor.Attributes)
            }

            // GetRTPHeader caches the result in attributes
            // Subsequent interceptors in the chain can reuse it
            header, err := attr.GetRTPHeader(b[:i])
            if err != nil {
                return 0, nil, err
            }

            // Process header...

            return i, attr, nil  // Pass cached attributes downstream
        },
    )
}
GetRTPHeader() and GetRTCPPackets() automatically cache unmarshaled data in attributes, improving performance for interceptor chains.

Preallocate Buffers

type OptimizedInterceptor struct {
    interceptor.NoOp
    bufferPool sync.Pool
}

func NewOptimizedInterceptor() *OptimizedInterceptor {
    return &OptimizedInterceptor{
        bufferPool: sync.Pool{
            New: func() any {
                b := make([]byte, 1500)  // MTU size
                return &b
            },
        },
    }
}

func (o *OptimizedInterceptor) processPacket() {
    // Get buffer from pool
    bufPtr := o.bufferPool.Get().(*[]byte)
    buf := *bufPtr
    defer o.bufferPool.Put(bufPtr)

    // Use buffer...
}

Avoid Allocations in Hot Path

// Bad: allocates on every packet
func (m *MyInterceptor) process(header *rtp.Header) {
    info := fmt.Sprintf("SSRC: %d, Seq: %d", header.SSRC, header.SequenceNumber)
    m.log.Debug(info)
}

// Good: only allocates when logging is enabled
func (m *MyInterceptor) process(header *rtp.Header) {
    if m.log.IsDebugEnabled() {
        m.log.Debugf("SSRC: %d, Seq: %d", header.SSRC, header.SequenceNumber)
    }
}

Stream Filtering

Filter by RTCP Feedback

func streamSupportsFeedback(info *interceptor.StreamInfo, feedbackType string) bool {
    for _, fb := range info.RTCPFeedback {
        if fb.Type == feedbackType {
            return true
        }
    }
    return false
}

func (n *NackInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    // Only process streams that support NACK
    if !streamSupportsFeedback(info, "nack") {
        return reader
    }

    // Process this stream...
    return reader
}

Filter by Media Type

func isVideoStream(info *interceptor.StreamInfo) bool {
    return strings.HasPrefix(info.MimeType, "video/")
}

func (v *VideoInterceptor) BindRemoteStream(
    info *interceptor.StreamInfo,
    reader interceptor.RTPReader,
) interceptor.RTPReader {
    if !isVideoStream(info) {
        return reader  // Pass through audio streams
    }

    // Process video stream...
}

Testing Best Practices

Make Time Testable

type TestableInterceptor struct {
    interceptor.NoOp
    now       func() time.Time  // Mockable time function
    newTicker func(time.Duration) Ticker  // Mockable ticker
}

func NewTestableInterceptor() *TestableInterceptor {
    return &TestableInterceptor{
        now: time.Now,  // Default to real time
        newTicker: func(d time.Duration) Ticker {
            return &timeTicker{time.NewTicker(d)}
        },
    }
}

// In tests:
func TestInterceptor(t *testing.T) {
    mockTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
    interceptor := NewTestableInterceptor()
    interceptor.now = func() time.Time { return mockTime }
    // Test with controlled time...
}

Provide Test Helpers

// Helper to create test stream info
func testStreamInfo(ssrc uint32) *interceptor.StreamInfo {
    return &interceptor.StreamInfo{
        SSRC:      ssrc,
        ClockRate: 90000,
        MimeType:  "video/VP8",
        RTCPFeedback: []interceptor.RTCPFeedback{
            {Type: "nack", Parameter: ""},
        },
    }
}

// Helper to create test RTP packet
func testRTPPacket(seq uint16, ssrc uint32) *rtp.Packet {
    return &rtp.Packet{
        Header: rtp.Header{
            Version:        2,
            SequenceNumber: seq,
            SSRC:           ssrc,
            Timestamp:      uint32(seq * 3000),
        },
        Payload: []byte{0x00, 0x01, 0x02},
    }
}

Documentation

Document Thread Safety

// SafeStatsCollector collects packet statistics.
// All methods are safe for concurrent use.
type SafeStatsCollector struct {
    interceptor.NoOp
    count atomic.Uint64  // atomic operations are inherently thread-safe
}

Document Resource Management

// StreamLogger logs packets to disk.
// Close() must be called to flush buffers and close files.
// UnbindLocalStream() is called automatically when streams end.
type StreamLogger struct {
    interceptor.NoOp
    file *os.File
}

Document Configuration Options

// WithInterval sets the report generation interval.
// Default is 1 second. Minimum is 100ms.
func WithInterval(d time.Duration) Option {
    return func(i *Interceptor) error {
        if d < 100*time.Millisecond {
            return errors.New("interval must be at least 100ms")
        }
        i.interval = d
        return nil
    }
}

Summary Checklist

  • Embed interceptor.NoOp for partial implementation
  • Use factory pattern with option functions
  • Protect shared state with appropriate locks
  • Use sync.WaitGroup for goroutine management
  • Clean up resources in Unbind and Close methods
  • Log errors instead of dropping packets
  • Cache parsed data using Attributes
  • Filter streams early when possible
  • Make time mockable for testing
  • Document thread safety and resource management

Next Steps

Custom Interceptors

Build your first custom interceptor

Examples

Study real-world implementations

Build docs developers (and LLMs) love