Skip to main content
The Chain type allows you to combine multiple interceptors into a single interceptor that executes them in sequence. This enables you to build complex packet processing pipelines from simple, focused interceptors.

Overview

A chain executes interceptors in order, passing the output of each interceptor as the input to the next. This creates a pipeline where each interceptor can:
  • Modify packets before passing them to the next interceptor
  • Add side effects (logging, metrics, etc.)
  • Conditionally drop or generate packets
  • Maintain per-stream state

Chain type definition

From chain.go:
// Chain is an interceptor that runs all child interceptors in order.
type Chain struct {
    interceptors []Interceptor
}

// NewChain returns a new Chain interceptor.
func NewChain(interceptors []Interceptor) *Chain {
    return &Chain{interceptors: interceptors}
}

Creating a chain

Create a chain by passing a slice of interceptors to NewChain:
chain := NewChain([]Interceptor{
    &StatsInterceptor{},
    &LoggingInterceptor{},
    &RecordingInterceptor{},
})
Interceptors are executed in the order they appear in the slice. The first interceptor receives packets first for local/outgoing streams, and last for remote/incoming streams due to the wrapping pattern.

Execution order

The execution order differs between bind and packet processing:

Bind methods

Bind methods are called in forward order (first to last):
func (i *Chain) BindLocalStream(ctx *StreamInfo, writer RTPWriter) RTPWriter {
    for _, interceptor := range i.interceptors {
        writer = interceptor.BindLocalStream(ctx, writer)
    }
    return writer
}

Packet processing

Because each interceptor wraps the previous writer/reader, packets flow in reverse order for bind:
Outgoing (Local) RTP:
  Application → Interceptor 1 → Interceptor 2 → Interceptor 3 → Network

Incoming (Remote) RTP:
  Network → Interceptor 3 → Interceptor 2 → Interceptor 1 → Application
For outgoing streams, interceptors process packets in the order they appear in the chain. For incoming streams, they process packets in reverse order due to the reader/writer wrapping pattern.

Implementation details

Here’s how the chain implements each interceptor method:
func (i *Chain) BindLocalStream(ctx *StreamInfo, writer RTPWriter) RTPWriter {
    for _, interceptor := range i.interceptors {
        writer = interceptor.BindLocalStream(ctx, writer)
    }
    return writer
}

func (i *Chain) UnbindLocalStream(ctx *StreamInfo) {
    for _, interceptor := range i.interceptors {
        interceptor.UnbindLocalStream(ctx)
    }
}

func (i *Chain) BindRemoteStream(ctx *StreamInfo, reader RTPReader) RTPReader {
    for _, interceptor := range i.interceptors {
        reader = interceptor.BindRemoteStream(ctx, reader)
    }
    return reader
}

func (i *Chain) UnbindRemoteStream(ctx *StreamInfo) {
    for _, interceptor := range i.interceptors {
        interceptor.UnbindRemoteStream(ctx)
    }
}
The Close() method collects errors from all interceptors, ensuring that cleanup happens even if some interceptors fail to close.

Usage examples

Basic chain

Combine interceptors for different purposes:
package main

import (
    "github.com/pion/interceptor"
)

func main() {
    // Create individual interceptors
    stats := &StatsInterceptor{}
    logger := &LoggingInterceptor{Level: "debug"}
    recorder := &RecordingInterceptor{Path: "/tmp/recordings"}
    
    // Combine into a chain
    chain := interceptor.NewChain([]interceptor.Interceptor{
        stats,
        logger,
        recorder,
    })
    
    // Use with a peer connection
    // ...
}

Conditional interceptor inclusion

Build chains dynamically based on configuration:
func createInterceptorChain(config *Config) interceptor.Interceptor {
    var interceptors []interceptor.Interceptor
    
    if config.EnableStats {
        interceptors = append(interceptors, &StatsInterceptor{})
    }
    
    if config.EnableLogging {
        interceptors = append(interceptors, &LoggingInterceptor{
            Level: config.LogLevel,
        })
    }
    
    if config.RecordingPath != "" {
        interceptors = append(interceptors, &RecordingInterceptor{
            Path: config.RecordingPath,
        })
    }
    
    return interceptor.NewChain(interceptors)
}

Nesting chains

Chains can contain other chains:
// Create sub-chains for related functionality
monitoringChain := interceptor.NewChain([]interceptor.Interceptor{
    &StatsInterceptor{},
    &MetricsInterceptor{},
})

processingChain := interceptor.NewChain([]interceptor.Interceptor{
    &FECInterceptor{},
    &RetransmissionInterceptor{},
})

// Combine into a master chain
masterChain := interceptor.NewChain([]interceptor.Interceptor{
    monitoringChain,
    processingChain,
    &LoggingInterceptor{},
})

Ordering considerations

Statistics before modification

Place statistics interceptors before interceptors that modify packets:
// Good: Stats see unmodified packets
chain := interceptor.NewChain([]interceptor.Interceptor{
    &StatsInterceptor{},
    &HeaderModificationInterceptor{},
})

// Bad: Stats see modified packets
chain := interceptor.NewChain([]interceptor.Interceptor{
    &HeaderModificationInterceptor{},
    &StatsInterceptor{},
})

Error handling order

Place validation interceptors early to reject invalid packets:
chain := interceptor.NewChain([]interceptor.Interceptor{
    &ValidationInterceptor{},     // Drop invalid packets early
    &ExpensiveProcessing{},       // Only process valid packets
    &NetworkTransmission{},
})

Recording after processing

Place recording interceptors after processing to record final output:
chain := interceptor.NewChain([]interceptor.Interceptor{
    &FECEncoder{},               // Add redundancy
    &RTXPacketizer{},           // Add retransmission support
    &RecordingInterceptor{},    // Record final output
})

Performance considerations

Each interceptor in the chain adds overhead. Keep chains as short as possible by combining related functionality into single interceptors.
// Less efficient: Many small interceptors
chain := interceptor.NewChain([]interceptor.Interceptor{
    &PacketCounterInterceptor{},
    &ByteCounterInterceptor{},
    &BitrateCalculatorInterceptor{},
})

// More efficient: Combined interceptor
chain := interceptor.NewChain([]interceptor.Interceptor{
    &ComprehensiveStatsInterceptor{}, // Does all three
})
Reuse buffers and objects where possible to reduce GC pressure.
type OptimizedInterceptor struct {
    NoOp
    bufferPool sync.Pool
}

func (i *OptimizedInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        buf := i.bufferPool.Get().([]byte)
        defer i.bufferPool.Put(buf)
        
        // Process with pooled buffer
        return writer.Write(header, payload, attributes)
    })
}
Initialize expensive resources only when needed.
type LazyInterceptor struct {
    NoOp
    once      sync.Once
    processor *ExpensiveProcessor
}

func (i *LazyInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        i.once.Do(func() {
            i.processor = NewExpensiveProcessor()
        })
        return writer.Write(header, payload, attributes)
    })
}

Testing chains

Test chains by verifying packet flow through all interceptors:
func TestChainExecution(t *testing.T) {
    var order []string
    
    chain := interceptor.NewChain([]interceptor.Interceptor{
        &TestInterceptor{Name: "first", Order: &order},
        &TestInterceptor{Name: "second", Order: &order},
        &TestInterceptor{Name: "third", Order: &order},
    })
    
    // Create a mock writer
    mockWriter := &MockRTPWriter{}
    
    // Bind and write a packet
    info := &interceptor.StreamInfo{SSRC: 12345}
    writer := chain.BindLocalStream(info, mockWriter)
    
    header := &rtp.Header{}
    payload := []byte{0x01, 0x02, 0x03}
    writer.Write(header, payload, interceptor.Attributes{})
    
    // Verify execution order
    expected := []string{"first", "second", "third"}
    if !reflect.DeepEqual(order, expected) {
        t.Errorf("Expected order %v, got %v", expected, order)
    }
}

Common patterns

Monitoring pipeline

monitoringChain := interceptor.NewChain([]interceptor.Interceptor{
    &PacketCounterInterceptor{},
    &BitrateCalculatorInterceptor{},
    &QualityMetricsInterceptor{},
    &AlertingInterceptor{},
})

Quality of service pipeline

qosChain := interceptor.NewChain([]interceptor.Interceptor{
    &CongestionControlInterceptor{},
    &BitrateControlInterceptor{},
    &PacketPacingInterceptor{},
})

Recording pipeline

recordingChain := interceptor.NewChain([]interceptor.Interceptor{
    &TimestampInterceptor{},
    &PacketBufferInterceptor{},
    &FileWriterInterceptor{Path: "/tmp/recording.rtp"},
})

Testing pipeline

testingChain := interceptor.NewChain([]interceptor.Interceptor{
    &PacketLossSimulatorInterceptor{LossRate: 0.05},
    &LatencySimulatorInterceptor{Delay: 50 * time.Millisecond},
    &JitterSimulatorInterceptor{MaxJitter: 10 * time.Millisecond},
})

Interceptor interface

Core interface details

Registry

Managing interceptor factories

Architecture

Overall design principles

Build docs developers (and LLMs) love