The Registry provides a centralized way to manage interceptor factories and build complete interceptor chains. It collects factories and constructs chains automatically when you create a new peer connection.
Overview
The Registry pattern solves several problems:
Centralized configuration : Register all interceptors in one place
Lazy instantiation : Interceptors are created only when needed
Per-connection instances : Each peer connection gets its own interceptor instances
Automatic chaining : Registered interceptors are automatically combined into a chain
Registry type definition
From registry.go:
// Registry is a collector for interceptors.
type Registry struct {
factories [] Factory
}
// Add adds a new Interceptor to the registry.
func ( r * Registry ) Add ( f Factory ) {
r . factories = append ( r . factories , f )
}
// Build constructs a single Interceptor from a InterceptorRegistry.
func ( r * Registry ) Build ( id string ) ( Interceptor , error ) {
if len ( r . factories ) == 0 {
return & NoOp {}, nil
}
interceptors := make ([] Interceptor , 0 , len ( r . factories ))
for _ , f := range r . factories {
i , err := f . NewInterceptor ( id )
if err != nil {
return nil , err
}
interceptors = append ( interceptors , i )
}
return NewChain ( interceptors ), nil
}
Factory interface
Interceptors are added to the registry through factories:
// Factory provides an interface for constructing interceptors.
type Factory interface {
NewInterceptor ( id string ) ( Interceptor , error )
}
The id parameter typically corresponds to a peer connection identifier, allowing interceptors to maintain per-connection state.
Basic usage
Create a registry, add factories, and build interceptors:
package main
import (
" log "
" github.com/pion/interceptor "
)
func main () {
// Create a registry
registry := & interceptor . Registry {}
// Add interceptor factories
registry . Add ( & StatsInterceptorFactory {})
registry . Add ( & LoggingInterceptorFactory {})
registry . Add ( & RecordingInterceptorFactory { Path : "/tmp/recordings" })
// Build interceptors for a peer connection
chain , err := registry . Build ( "peer-connection-1" )
if err != nil {
log . Fatal ( err )
}
defer chain . Close ()
// Use the chain with your peer connection
// ...
}
Implementing a factory
A factory typically holds configuration and creates interceptor instances:
Simple factory
Configurable factory
Factory with validation
type StatsInterceptorFactory struct {}
func ( f * StatsInterceptorFactory ) NewInterceptor ( id string ) ( interceptor . Interceptor , error ) {
return & StatsInterceptor {
connectionID : id ,
metrics : make ( map [ uint32 ] * StreamMetrics ),
}, nil
}
Building interceptors
The Build method creates a new interceptor chain:
chain , err := registry . Build ( "peer-connection-1" )
if err != nil {
log . Fatal ( err )
}
defer chain . Close ()
If no factories are registered, Build returns a NoOp interceptor that passes all packets through unchanged.
Error handling
The Build method returns an error if any factory fails:
chain , err := registry . Build ( "peer-connection-1" )
if err != nil {
// One of the factories failed to create an interceptor
log . Printf ( "Failed to build interceptor chain: %v " , err )
return err
}
defer chain . Close ()
Always defer Close() on the built interceptor to ensure proper cleanup, even if an error occurs later.
Per-connection state
The id parameter allows interceptors to maintain separate state for each connection:
type StatsInterceptor struct {
NoOp
connectionID string
mu sync . Mutex
streams map [ uint32 ] * StreamStats
}
func ( i * StatsInterceptor ) BindLocalStream ( info * StreamInfo , writer RTPWriter ) RTPWriter {
i . mu . Lock ()
i . streams [ info . SSRC ] = & StreamStats {
ConnectionID : i . connectionID ,
SSRC : info . SSRC ,
StartTime : time . Now (),
}
i . mu . Unlock ()
return RTPWriterFunc ( func ( header * rtp . Header , payload [] byte , attributes Attributes ) ( int , error ) {
// Update stats for this connection and stream
i . recordPacket ( header . SSRC , len ( payload ))
return writer . Write ( header , payload , attributes )
})
}
Dynamic configuration
Build registries based on runtime configuration:
func createRegistry ( config * Config ) ( * interceptor . Registry , error ) {
registry := & interceptor . Registry {}
if config . Stats . Enabled {
factory := & StatsInterceptorFactory {
Interval : config . Stats . Interval ,
Exporter : config . Stats . Exporter ,
}
registry . Add ( factory )
}
if config . Logging . Enabled {
factory := & LoggingInterceptorFactory {
Level : config . Logging . Level ,
Output : config . Logging . Output ,
}
registry . Add ( factory )
}
if config . Recording . Enabled {
factory := & RecordingInterceptorFactory {
Path : config . Recording . Path ,
MaxSize : config . Recording . MaxSize ,
Compress : config . Recording . Compress ,
}
if err := factory . Validate (); err != nil {
return nil , fmt . Errorf ( "invalid recording config: %w " , err )
}
registry . Add ( factory )
}
return registry , nil
}
Shared resources
Factories can share resources across interceptor instances:
type SharedStatsCollector struct {
mu sync . Mutex
metrics map [ string ] * ConnectionMetrics // keyed by connection ID
}
type StatsInterceptorFactory struct {
collector * SharedStatsCollector
}
func ( f * StatsInterceptorFactory ) NewInterceptor ( id string ) ( interceptor . Interceptor , error ) {
f . collector . mu . Lock ()
f . collector . metrics [ id ] = & ConnectionMetrics {}
f . collector . mu . Unlock ()
return & StatsInterceptor {
connectionID : id ,
collector : f . collector ,
}, nil
}
// Usage
collector := & SharedStatsCollector {
metrics : make ( map [ string ] * ConnectionMetrics ),
}
registry := & interceptor . Registry {}
registry . Add ( & StatsInterceptorFactory { collector : collector })
// Build multiple connections sharing the same collector
for i := 0 ; i < 10 ; i ++ {
id := fmt . Sprintf ( "conn- %d " , i )
chain , _ := registry . Build ( id )
// All chains share the same collector
}
Testing with registry
The registry makes it easy to test with different interceptor configurations:
func TestWithInterceptors ( t * testing . T ) {
tests := [] struct {
name string
registry * interceptor . Registry
}{
{
name : "no interceptors" ,
registry : & interceptor . Registry {},
},
{
name : "with stats" ,
registry : func () * interceptor . Registry {
r := & interceptor . Registry {}
r . Add ( & StatsInterceptorFactory {})
return r
}(),
},
{
name : "full pipeline" ,
registry : func () * interceptor . Registry {
r := & interceptor . Registry {}
r . Add ( & StatsInterceptorFactory {})
r . Add ( & LoggingInterceptorFactory { Output : io . Discard })
r . Add ( & RecordingInterceptorFactory { Path : t . TempDir ()})
return r
}(),
},
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
chain , err := tt . registry . Build ( "test-connection" )
if err != nil {
t . Fatalf ( "Failed to build chain: %v " , err )
}
defer chain . Close ()
// Test with the chain
testPeerConnection ( t , chain )
})
}
}
Best practices
Validate configuration in factories
Validate factory configuration in NewInterceptor to fail fast with clear errors. func ( f * RecordingInterceptorFactory ) NewInterceptor ( id string ) ( interceptor . Interceptor , error ) {
if f . Path == "" {
return nil , errors . New ( "recording path is required" )
}
if f . MaxSize < 0 {
return nil , errors . New ( "max size must be non-negative" )
}
// Create interceptor...
}
Factories are executed in the order they’re added. Add factories in the desired execution order. registry := & interceptor . Registry {}
registry . Add ( & StatsInterceptorFactory {}) // First: collect stats
registry . Add ( & ProcessingInterceptorFactory {}) // Second: process
registry . Add ( & RecordingInterceptorFactory {}) // Third: record processed output
Create one registry and reuse it for all peer connections. // Setup phase
globalRegistry := createGlobalRegistry ( config )
// For each peer connection
func createPeerConnection ( id string ) ( * PeerConnection , error ) {
chain , err := globalRegistry . Build ( id )
if err != nil {
return nil , err
}
return & PeerConnection {
ID : id ,
Interceptor : chain ,
}, nil
}
Use unique connection IDs
Ensure connection IDs are unique to avoid conflicts in shared resources. import " github.com/google/uuid "
func newConnection () ( * Connection , error ) {
id := uuid . New (). String ()
chain , err := registry . Build ( id )
if err != nil {
return nil , err
}
return & Connection { ID : id , Interceptor : chain }, nil
}
Clean up on factory errors
If a factory fails during Build, previously created interceptors may leak. Consider wrapping Build: func ( r * Registry ) BuildWithCleanup ( id string ) ( Interceptor , error ) {
if len ( r . factories ) == 0 {
return & NoOp {}, nil
}
var created [] Interceptor
defer func () {
if r := recover (); r != nil {
for _ , i := range created {
i . Close ()
}
panic ( r )
}
}()
for _ , f := range r . factories {
i , err := f . NewInterceptor ( id )
if err != nil {
// Clean up already created interceptors
for _ , prev := range created {
prev . Close ()
}
return nil , err
}
created = append ( created , i )
}
return NewChain ( created ), nil
}
Common patterns
Global registry
var defaultRegistry * interceptor . Registry
func init () {
defaultRegistry = & interceptor . Registry {}
defaultRegistry . Add ( & StatsInterceptorFactory {})
defaultRegistry . Add ( & LoggingInterceptorFactory { Output : os . Stdout })
}
func NewPeerConnection () ( * PeerConnection , error ) {
chain , err := defaultRegistry . Build ( generateID ())
if err != nil {
return nil , err
}
return & PeerConnection { Interceptor : chain }, nil
}
Configuration-based registry
type InterceptorConfig struct {
Stats * StatsConfig
Logging * LoggingConfig
Recording * RecordingConfig
}
func NewRegistry ( config * InterceptorConfig ) * interceptor . Registry {
registry := & interceptor . Registry {}
if config . Stats != nil {
registry . Add ( & StatsInterceptorFactory { Config : config . Stats })
}
if config . Logging != nil {
registry . Add ( & LoggingInterceptorFactory { Config : config . Logging })
}
if config . Recording != nil {
registry . Add ( & RecordingInterceptorFactory { Config : config . Recording })
}
return registry
}
Environment-specific registries
func NewDevelopmentRegistry () * interceptor . Registry {
registry := & interceptor . Registry {}
registry . Add ( & VerboseLoggingInterceptorFactory {})
registry . Add ( & DebugStatsInterceptorFactory {})
return registry
}
func NewProductionRegistry () * interceptor . Registry {
registry := & interceptor . Registry {}
registry . Add ( & OptimizedStatsInterceptorFactory {})
registry . Add ( & ErrorLoggingInterceptorFactory {})
registry . Add ( & MetricsExportInterceptorFactory {})
return registry
}
func NewRegistryForEnvironment ( env string ) * interceptor . Registry {
switch env {
case "production" :
return NewProductionRegistry ()
case "development" :
return NewDevelopmentRegistry ()
default :
return & interceptor . Registry {}
}
}
Chaining How chains work internally
Interceptor interface Implementing interceptors
Architecture Overall design principles