Overview
Building reliable interceptors requires careful attention to concurrency, error handling, resource management, and performance. This guide covers essential patterns and practices learned from production interceptor implementations.Architecture Patterns
Embed NoOp for Partial Implementation
Always embed
interceptor.NoOp and override only the methods you need. This provides forward compatibility if the interface is extended.type MyInterceptor struct {
interceptor.NoOp // Provides default implementations
// Your fields here
}
// Only override what you need
func (m *MyInterceptor) BindLocalStream(
info *interceptor.StreamInfo,
writer interceptor.RTPWriter,
) interceptor.RTPWriter {
// Your implementation
}
Use the Factory Pattern
The factory pattern enables flexible configuration:// Define option functions
type MyOption func(*MyInterceptor) error
func WithBufferSize(size int) MyOption {
return func(m *MyInterceptor) error {
if size <= 0 {
return errors.New("buffer size must be positive")
}
m.bufferSize = size
return nil
}
}
func WithLogger(logger logging.LeveledLogger) MyOption {
return func(m *MyInterceptor) error {
m.log = logger
return nil
}
}
// Factory struct
type MyInterceptorFactory struct {
opts []MyOption
}
// Constructor
func NewMyInterceptor(opts ...MyOption) (*MyInterceptorFactory, error) {
return &MyInterceptorFactory{opts: opts}, nil
}
// Implement Factory interface
func (f *MyInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
m := &MyInterceptor{
bufferSize: 512, // defaults
interval: time.Second,
}
// Apply options
for _, opt := range f.opts {
if err := opt(m); err != nil {
return nil, err
}
}
// Initialize logger if not provided
if m.log == nil {
m.log = logging.NewDefaultLoggerFactory().NewLogger("my_interceptor")
}
return m, nil
}
Concurrency and Thread Safety
Protect Shared State
Interceptor methods can be called from multiple goroutines. Always protect shared state with proper synchronization.
type SafeInterceptor struct {
interceptor.NoOp
streams map[uint32]*StreamState
streamsMu sync.RWMutex // Use RWMutex for read-heavy workloads
}
// Use read lock for read-only access
func (s *SafeInterceptor) getStream(ssrc uint32) *StreamState {
s.streamsMu.RLock()
defer s.streamsMu.RUnlock()
return s.streams[ssrc]
}
// Use write lock for modifications
func (s *SafeInterceptor) addStream(ssrc uint32, state *StreamState) {
s.streamsMu.Lock()
defer s.streamsMu.Unlock()
s.streams[ssrc] = state
}
Use sync.Map for Concurrent Access
For highly concurrent scenarios, considersync.Map:
type ConcurrentInterceptor struct {
interceptor.NoOp
streams sync.Map // map[uint32]*StreamState
}
func (c *ConcurrentInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
state := &StreamState{ssrc: info.SSRC}
c.streams.Store(info.SSRC, state)
return interceptor.RTPReaderFunc(
func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
// Access without explicit locking
if val, ok := c.streams.Load(info.SSRC); ok {
state := val.(*StreamState)
// Process state
}
return reader.Read(b, a)
},
)
}
Proper Goroutine Management
type BackgroundInterceptor struct {
interceptor.NoOp
wg sync.WaitGroup
close chan struct{}
closeMu sync.Mutex
}
func (b *BackgroundInterceptor) BindRTCPWriter(
writer interceptor.RTCPWriter,
) interceptor.RTCPWriter {
b.closeMu.Lock()
defer b.closeMu.Unlock()
if b.isClosed() {
return writer
}
b.wg.Add(1)
go b.backgroundTask(writer)
return writer
}
func (b *BackgroundInterceptor) backgroundTask(writer interceptor.RTCPWriter) {
defer b.wg.Done()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Do work
case <-b.close:
return
}
}
}
func (b *BackgroundInterceptor) isClosed() bool {
select {
case <-b.close:
return true
default:
return false
}
}
func (b *BackgroundInterceptor) Close() error {
defer b.wg.Wait() // Wait for all goroutines
b.closeMu.Lock()
defer b.closeMu.Unlock()
if !b.isClosed() {
close(b.close) // Signal shutdown
}
return nil
}
Always use
sync.WaitGroup to ensure goroutines complete before Close() returns.Error Handling
Log Errors, Don’t Drop Packets
func (m *MyInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
return interceptor.RTPReaderFunc(
func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
// Don't swallow errors from upstream
return 0, nil, err
}
if attr == nil {
attr = make(interceptor.Attributes)
}
header, err := attr.GetRTPHeader(b[:i])
if err != nil {
// Log but continue - don't drop the packet
m.log.Warnf("failed to parse RTP header: %v", err)
return i, attr, nil
}
// Process packet
if err := m.processPacket(header); err != nil {
// Log processing errors but pass the packet through
m.log.Warnf("failed to process packet: %v", err)
}
return i, attr, nil
},
)
}
Avoid dropping packets in production interceptors unless absolutely necessary. Log errors instead and pass packets through.
Handle Errors in Background Tasks
func (b *BackgroundInterceptor) sendReports(writer interceptor.RTCPWriter) {
packets := b.generateReports()
if _, err := writer.Write(packets, interceptor.Attributes{}); err != nil {
// Log but don't crash
b.log.Warnf("failed to send reports: %v", err)
// Optionally: implement retry logic or metrics
}
}
Aggregate Multiple Errors
When closing multiple resources, collect all errors:func (m *MyInterceptor) Close() error {
var errs []error
if err := m.closeDumper(); err != nil {
errs = append(errs, fmt.Errorf("close dumper: %w", err))
}
if err := m.closeLogger(); err != nil {
errs = append(errs, fmt.Errorf("close logger: %w", err))
}
if len(errs) == 0 {
return nil
}
if len(errs) == 1 {
return errs[0]
}
// Return a joined error
return fmt.Errorf("multiple errors: %v", errs)
}
Resource Management
Always Clean Up in Unbind Methods
func (s *StatefulInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
// Allocate resources
state := &StreamState{
ssrc: info.SSRC,
buffer: make([]byte, s.bufferSize),
ticker: time.NewTicker(s.interval),
}
s.streams.Store(info.SSRC, state)
return reader
}
func (s *StatefulInterceptor) UnbindRemoteStream(info *interceptor.StreamInfo) {
// Clean up resources
if val, ok := s.streams.LoadAndDelete(info.SSRC); ok {
state := val.(*StreamState)
state.ticker.Stop() // Stop ticker
// Close any channels, files, etc.
}
}
Unbind methods are called when streams end. Always clean up per-stream resources here to prevent leaks.
Implement Proper Close Lifecycle
func (m *MyInterceptor) Close() error {
// 1. Signal shutdown
m.closeMu.Lock()
if !m.isClosed() {
close(m.close)
}
m.closeMu.Unlock()
// 2. Wait for goroutines (outside lock)
m.wg.Wait()
// 3. Clean up resources
m.streams.Range(func(key, value any) bool {
if state, ok := value.(*StreamState); ok {
state.cleanup()
}
return true
})
// 4. Close external resources
var errs []error
if m.file != nil {
if err := m.file.Close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errs[0]
}
return nil
}
Performance Optimization
Use Attributes for Caching
Attributes cache parsed RTP/RTCP data:func (m *MyInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
return interceptor.RTPReaderFunc(
func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}
if attr == nil {
attr = make(interceptor.Attributes)
}
// GetRTPHeader caches the result in attributes
// Subsequent interceptors in the chain can reuse it
header, err := attr.GetRTPHeader(b[:i])
if err != nil {
return 0, nil, err
}
// Process header...
return i, attr, nil // Pass cached attributes downstream
},
)
}
GetRTPHeader() and GetRTCPPackets() automatically cache unmarshaled data in attributes, improving performance for interceptor chains.Preallocate Buffers
type OptimizedInterceptor struct {
interceptor.NoOp
bufferPool sync.Pool
}
func NewOptimizedInterceptor() *OptimizedInterceptor {
return &OptimizedInterceptor{
bufferPool: sync.Pool{
New: func() any {
b := make([]byte, 1500) // MTU size
return &b
},
},
}
}
func (o *OptimizedInterceptor) processPacket() {
// Get buffer from pool
bufPtr := o.bufferPool.Get().(*[]byte)
buf := *bufPtr
defer o.bufferPool.Put(bufPtr)
// Use buffer...
}
Avoid Allocations in Hot Path
// Bad: allocates on every packet
func (m *MyInterceptor) process(header *rtp.Header) {
info := fmt.Sprintf("SSRC: %d, Seq: %d", header.SSRC, header.SequenceNumber)
m.log.Debug(info)
}
// Good: only allocates when logging is enabled
func (m *MyInterceptor) process(header *rtp.Header) {
if m.log.IsDebugEnabled() {
m.log.Debugf("SSRC: %d, Seq: %d", header.SSRC, header.SequenceNumber)
}
}
Stream Filtering
Filter by RTCP Feedback
func streamSupportsFeedback(info *interceptor.StreamInfo, feedbackType string) bool {
for _, fb := range info.RTCPFeedback {
if fb.Type == feedbackType {
return true
}
}
return false
}
func (n *NackInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
// Only process streams that support NACK
if !streamSupportsFeedback(info, "nack") {
return reader
}
// Process this stream...
return reader
}
Filter by Media Type
func isVideoStream(info *interceptor.StreamInfo) bool {
return strings.HasPrefix(info.MimeType, "video/")
}
func (v *VideoInterceptor) BindRemoteStream(
info *interceptor.StreamInfo,
reader interceptor.RTPReader,
) interceptor.RTPReader {
if !isVideoStream(info) {
return reader // Pass through audio streams
}
// Process video stream...
}
Testing Best Practices
Make Time Testable
type TestableInterceptor struct {
interceptor.NoOp
now func() time.Time // Mockable time function
newTicker func(time.Duration) Ticker // Mockable ticker
}
func NewTestableInterceptor() *TestableInterceptor {
return &TestableInterceptor{
now: time.Now, // Default to real time
newTicker: func(d time.Duration) Ticker {
return &timeTicker{time.NewTicker(d)}
},
}
}
// In tests:
func TestInterceptor(t *testing.T) {
mockTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
interceptor := NewTestableInterceptor()
interceptor.now = func() time.Time { return mockTime }
// Test with controlled time...
}
Provide Test Helpers
// Helper to create test stream info
func testStreamInfo(ssrc uint32) *interceptor.StreamInfo {
return &interceptor.StreamInfo{
SSRC: ssrc,
ClockRate: 90000,
MimeType: "video/VP8",
RTCPFeedback: []interceptor.RTCPFeedback{
{Type: "nack", Parameter: ""},
},
}
}
// Helper to create test RTP packet
func testRTPPacket(seq uint16, ssrc uint32) *rtp.Packet {
return &rtp.Packet{
Header: rtp.Header{
Version: 2,
SequenceNumber: seq,
SSRC: ssrc,
Timestamp: uint32(seq * 3000),
},
Payload: []byte{0x00, 0x01, 0x02},
}
}
Documentation
Document Thread Safety
// SafeStatsCollector collects packet statistics.
// All methods are safe for concurrent use.
type SafeStatsCollector struct {
interceptor.NoOp
count atomic.Uint64 // atomic operations are inherently thread-safe
}
Document Resource Management
// StreamLogger logs packets to disk.
// Close() must be called to flush buffers and close files.
// UnbindLocalStream() is called automatically when streams end.
type StreamLogger struct {
interceptor.NoOp
file *os.File
}
Document Configuration Options
// WithInterval sets the report generation interval.
// Default is 1 second. Minimum is 100ms.
func WithInterval(d time.Duration) Option {
return func(i *Interceptor) error {
if d < 100*time.Millisecond {
return errors.New("interval must be at least 100ms")
}
i.interval = d
return nil
}
}
Summary Checklist
- Embed
interceptor.NoOpfor partial implementation - Use factory pattern with option functions
- Protect shared state with appropriate locks
- Use
sync.WaitGroupfor goroutine management - Clean up resources in Unbind and Close methods
- Log errors instead of dropping packets
- Cache parsed data using Attributes
- Filter streams early when possible
- Make time mockable for testing
- Document thread safety and resource management
Next Steps
Custom Interceptors
Build your first custom interceptor
Examples
Study real-world implementations