Skip to main content
The Registry provides a centralized way to manage interceptor factories and build complete interceptor chains. It collects factories and constructs chains automatically when you create a new peer connection.

Overview

The Registry pattern solves several problems:
  • Centralized configuration: Register all interceptors in one place
  • Lazy instantiation: Interceptors are created only when needed
  • Per-connection instances: Each peer connection gets its own interceptor instances
  • Automatic chaining: Registered interceptors are automatically combined into a chain

Registry type definition

From registry.go:
// Registry is a collector for interceptors.
type Registry struct {
    factories []Factory
}

// Add adds a new Interceptor to the registry.
func (r *Registry) Add(f Factory) {
    r.factories = append(r.factories, f)
}

// Build constructs a single Interceptor from a InterceptorRegistry.
func (r *Registry) Build(id string) (Interceptor, error) {
    if len(r.factories) == 0 {
        return &NoOp{}, nil
    }

    interceptors := make([]Interceptor, 0, len(r.factories))
    for _, f := range r.factories {
        i, err := f.NewInterceptor(id)
        if err != nil {
            return nil, err
        }

        interceptors = append(interceptors, i)
    }

    return NewChain(interceptors), nil
}

Factory interface

Interceptors are added to the registry through factories:
// Factory provides an interface for constructing interceptors.
type Factory interface {
    NewInterceptor(id string) (Interceptor, error)
}
The id parameter typically corresponds to a peer connection identifier, allowing interceptors to maintain per-connection state.

Basic usage

Create a registry, add factories, and build interceptors:
package main

import (
    "log"
    "github.com/pion/interceptor"
)

func main() {
    // Create a registry
    registry := &interceptor.Registry{}
    
    // Add interceptor factories
    registry.Add(&StatsInterceptorFactory{})
    registry.Add(&LoggingInterceptorFactory{})
    registry.Add(&RecordingInterceptorFactory{Path: "/tmp/recordings"})
    
    // Build interceptors for a peer connection
    chain, err := registry.Build("peer-connection-1")
    if err != nil {
        log.Fatal(err)
    }
    defer chain.Close()
    
    // Use the chain with your peer connection
    // ...
}

Implementing a factory

A factory typically holds configuration and creates interceptor instances:
type StatsInterceptorFactory struct{}

func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
    return &StatsInterceptor{
        connectionID: id,
        metrics:     make(map[uint32]*StreamMetrics),
    }, nil
}

Building interceptors

The Build method creates a new interceptor chain:
chain, err := registry.Build("peer-connection-1")
if err != nil {
    log.Fatal(err)
}
defer chain.Close()
If no factories are registered, Build returns a NoOp interceptor that passes all packets through unchanged.

Error handling

The Build method returns an error if any factory fails:
chain, err := registry.Build("peer-connection-1")
if err != nil {
    // One of the factories failed to create an interceptor
    log.Printf("Failed to build interceptor chain: %v", err)
    return err
}
defer chain.Close()
Always defer Close() on the built interceptor to ensure proper cleanup, even if an error occurs later.

Per-connection state

The id parameter allows interceptors to maintain separate state for each connection:
type StatsInterceptor struct {
    NoOp
    connectionID string
    mu           sync.Mutex
    streams      map[uint32]*StreamStats
}

func (i *StatsInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    i.mu.Lock()
    i.streams[info.SSRC] = &StreamStats{
        ConnectionID: i.connectionID,
        SSRC:        info.SSRC,
        StartTime:   time.Now(),
    }
    i.mu.Unlock()
    
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        // Update stats for this connection and stream
        i.recordPacket(header.SSRC, len(payload))
        return writer.Write(header, payload, attributes)
    })
}

Dynamic configuration

Build registries based on runtime configuration:
func createRegistry(config *Config) (*interceptor.Registry, error) {
    registry := &interceptor.Registry{}
    
    if config.Stats.Enabled {
        factory := &StatsInterceptorFactory{
            Interval: config.Stats.Interval,
            Exporter: config.Stats.Exporter,
        }
        registry.Add(factory)
    }
    
    if config.Logging.Enabled {
        factory := &LoggingInterceptorFactory{
            Level:  config.Logging.Level,
            Output: config.Logging.Output,
        }
        registry.Add(factory)
    }
    
    if config.Recording.Enabled {
        factory := &RecordingInterceptorFactory{
            Path:      config.Recording.Path,
            MaxSize:   config.Recording.MaxSize,
            Compress:  config.Recording.Compress,
        }
        if err := factory.Validate(); err != nil {
            return nil, fmt.Errorf("invalid recording config: %w", err)
        }
        registry.Add(factory)
    }
    
    return registry, nil
}

Shared resources

Factories can share resources across interceptor instances:
type SharedStatsCollector struct {
    mu      sync.Mutex
    metrics map[string]*ConnectionMetrics // keyed by connection ID
}

type StatsInterceptorFactory struct {
    collector *SharedStatsCollector
}

func (f *StatsInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
    f.collector.mu.Lock()
    f.collector.metrics[id] = &ConnectionMetrics{}
    f.collector.mu.Unlock()
    
    return &StatsInterceptor{
        connectionID: id,
        collector:   f.collector,
    }, nil
}

// Usage
collector := &SharedStatsCollector{
    metrics: make(map[string]*ConnectionMetrics),
}

registry := &interceptor.Registry{}
registry.Add(&StatsInterceptorFactory{collector: collector})

// Build multiple connections sharing the same collector
for i := 0; i < 10; i++ {
    id := fmt.Sprintf("conn-%d", i)
    chain, _ := registry.Build(id)
    // All chains share the same collector
}

Testing with registry

The registry makes it easy to test with different interceptor configurations:
func TestWithInterceptors(t *testing.T) {
    tests := []struct {
        name     string
        registry *interceptor.Registry
    }{
        {
            name:     "no interceptors",
            registry: &interceptor.Registry{},
        },
        {
            name: "with stats",
            registry: func() *interceptor.Registry {
                r := &interceptor.Registry{}
                r.Add(&StatsInterceptorFactory{})
                return r
            }(),
        },
        {
            name: "full pipeline",
            registry: func() *interceptor.Registry {
                r := &interceptor.Registry{}
                r.Add(&StatsInterceptorFactory{})
                r.Add(&LoggingInterceptorFactory{Output: io.Discard})
                r.Add(&RecordingInterceptorFactory{Path: t.TempDir()})
                return r
            }(),
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            chain, err := tt.registry.Build("test-connection")
            if err != nil {
                t.Fatalf("Failed to build chain: %v", err)
            }
            defer chain.Close()
            
            // Test with the chain
            testPeerConnection(t, chain)
        })
    }
}

Best practices

Validate factory configuration in NewInterceptor to fail fast with clear errors.
func (f *RecordingInterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
    if f.Path == "" {
        return nil, errors.New("recording path is required")
    }
    
    if f.MaxSize < 0 {
        return nil, errors.New("max size must be non-negative")
    }
    
    // Create interceptor...
}
Factories are executed in the order they’re added. Add factories in the desired execution order.
registry := &interceptor.Registry{}
registry.Add(&StatsInterceptorFactory{})      // First: collect stats
registry.Add(&ProcessingInterceptorFactory{}) // Second: process
registry.Add(&RecordingInterceptorFactory{})  // Third: record processed output
Create one registry and reuse it for all peer connections.
// Setup phase
globalRegistry := createGlobalRegistry(config)

// For each peer connection
func createPeerConnection(id string) (*PeerConnection, error) {
    chain, err := globalRegistry.Build(id)
    if err != nil {
        return nil, err
    }
    
    return &PeerConnection{
        ID:          id,
        Interceptor: chain,
    }, nil
}
Ensure connection IDs are unique to avoid conflicts in shared resources.
import "github.com/google/uuid"

func newConnection() (*Connection, error) {
    id := uuid.New().String()
    chain, err := registry.Build(id)
    if err != nil {
        return nil, err
    }
    return &Connection{ID: id, Interceptor: chain}, nil
}
If a factory fails during Build, previously created interceptors may leak. Consider wrapping Build:
func (r *Registry) BuildWithCleanup(id string) (Interceptor, error) {
    if len(r.factories) == 0 {
        return &NoOp{}, nil
    }
    
    var created []Interceptor
    defer func() {
        if r := recover(); r != nil {
            for _, i := range created {
                i.Close()
            }
            panic(r)
        }
    }()
    
    for _, f := range r.factories {
        i, err := f.NewInterceptor(id)
        if err != nil {
            // Clean up already created interceptors
            for _, prev := range created {
                prev.Close()
            }
            return nil, err
        }
        created = append(created, i)
    }
    
    return NewChain(created), nil
}

Common patterns

Global registry

var defaultRegistry *interceptor.Registry

func init() {
    defaultRegistry = &interceptor.Registry{}
    defaultRegistry.Add(&StatsInterceptorFactory{})
    defaultRegistry.Add(&LoggingInterceptorFactory{Output: os.Stdout})
}

func NewPeerConnection() (*PeerConnection, error) {
    chain, err := defaultRegistry.Build(generateID())
    if err != nil {
        return nil, err
    }
    return &PeerConnection{Interceptor: chain}, nil
}

Configuration-based registry

type InterceptorConfig struct {
    Stats     *StatsConfig
    Logging   *LoggingConfig
    Recording *RecordingConfig
}

func NewRegistry(config *InterceptorConfig) *interceptor.Registry {
    registry := &interceptor.Registry{}
    
    if config.Stats != nil {
        registry.Add(&StatsInterceptorFactory{Config: config.Stats})
    }
    
    if config.Logging != nil {
        registry.Add(&LoggingInterceptorFactory{Config: config.Logging})
    }
    
    if config.Recording != nil {
        registry.Add(&RecordingInterceptorFactory{Config: config.Recording})
    }
    
    return registry
}

Environment-specific registries

func NewDevelopmentRegistry() *interceptor.Registry {
    registry := &interceptor.Registry{}
    registry.Add(&VerboseLoggingInterceptorFactory{})
    registry.Add(&DebugStatsInterceptorFactory{})
    return registry
}

func NewProductionRegistry() *interceptor.Registry {
    registry := &interceptor.Registry{}
    registry.Add(&OptimizedStatsInterceptorFactory{})
    registry.Add(&ErrorLoggingInterceptorFactory{})
    registry.Add(&MetricsExportInterceptorFactory{})
    return registry
}

func NewRegistryForEnvironment(env string) *interceptor.Registry {
    switch env {
    case "production":
        return NewProductionRegistry()
    case "development":
        return NewDevelopmentRegistry()
    default:
        return &interceptor.Registry{}
    }
}

Chaining

How chains work internally

Interceptor interface

Implementing interceptors

Architecture

Overall design principles

Build docs developers (and LLMs) love