Skip to main content
The Pion Interceptor library uses two key types for passing metadata: Attributes for generic key-value storage, and StreamInfo for media stream context. These types enable communication between interceptors and provide essential stream information.

Attributes

Attributes provide a generic key-value store for passing metadata along with packets.

Type definition

From attributes.go:
// Attributes are a generic key/value store used by interceptors.
type Attributes map[any]any

// Get returns the attribute associated with key.
func (a Attributes) Get(key any) any {
    return a[key]
}

// Set sets the attribute associated with key to the given value.
func (a Attributes) Set(key any, val any) {
    a[key] = val
}

Basic usage

Store and retrieve arbitrary values:
attributes := make(interceptor.Attributes)

// Set values
attributes.Set("timestamp", time.Now())
attributes.Set("sequence", 12345)
attributes.Set("metadata", &CustomMetadata{/* ... */})

// Get values
if ts := attributes.Get("timestamp"); ts != nil {
    timestamp := ts.(time.Time)
    fmt.Println("Timestamp:", timestamp)
}
Always check for nil and perform type assertions when retrieving values from Attributes, as the map can contain any type.

Specialized helper methods

Attributes provides helper methods for common RTP/RTCP operations:
// GetRTPHeader gets the RTP header if present. If it is not present, it will be
// unmarshalled from the raw byte slice and stored in the attributes.
func (a Attributes) GetRTPHeader(raw []byte) (*rtp.Header, error) {
    if val, ok := a[rtpHeaderKey]; ok {
        if header, ok := val.(*rtp.Header); ok {
            return header, nil
        }
        return nil, errInvalidType
    }
    header := &rtp.Header{}
    if _, err := header.Unmarshal(raw); err != nil {
        return nil, err
    }
    a[rtpHeaderKey] = header
    return header, nil
}
These helper methods cache the unmarshalled data in the attributes map, avoiding redundant parsing if multiple interceptors need to access the same headers or packets.

Using RTP headers in interceptors

func (i *MyInterceptor) BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader {
    return RTPReaderFunc(func(b []byte, a Attributes) (int, Attributes, error) {
        // Get the parsed RTP header (cached after first access)
        header, err := a.GetRTPHeader(b)
        if err != nil {
            return 0, a, err
        }
        
        // Use header information
        fmt.Printf("SSRC: %d, Sequence: %d, Timestamp: %d\n",
            header.SSRC, header.SequenceNumber, header.Timestamp)
        
        return reader.Read(b, a)
    })
}

Using RTCP packets in interceptors

func (i *MyInterceptor) BindRTCPReader(reader RTCPReader) RTCPReader {
    return RTCPReaderFunc(func(b []byte, a Attributes) (int, Attributes, error) {
        // Get the parsed RTCP packets (cached after first access)
        packets, err := a.GetRTCPPackets(b)
        if err != nil {
            return 0, a, err
        }
        
        // Process each packet
        for _, pkt := range packets {
            switch p := pkt.(type) {
            case *rtcp.SenderReport:
                fmt.Printf("Sender Report: SSRC=%d, PacketCount=%d\n",
                    p.SSRC, p.PacketCount)
            case *rtcp.ReceiverReport:
                fmt.Printf("Receiver Report: SSRC=%d\n", p.SSRC)
            }
        }
        
        return reader.Read(b, a)
    })
}

Custom attribute keys

Use typed keys to avoid conflicts between interceptors:
type myKeyType int

const (
    timestampKey myKeyType = iota
    sequenceKey
    metadataKey
)

// In one interceptor
attributes.Set(timestampKey, time.Now())

// In another interceptor
if val := attributes.Get(timestampKey); val != nil {
    if ts, ok := val.(time.Time); ok {
        // Use the timestamp
    }
}
Using typed constants as keys prevents accidental conflicts between interceptors and provides type safety.

StreamInfo

StreamInfo provides context about a media stream when it’s bound or unbound from an interceptor.

Type definition

From streaminfo.go:
// StreamInfo is the Context passed when a StreamLocal or StreamRemote has been Binded or Unbinded.
type StreamInfo struct {
    ID                                string
    Attributes                        Attributes
    SSRC                              uint32
    SSRCRetransmission                uint32
    SSRCForwardErrorCorrection        uint32
    PayloadType                       uint8
    PayloadTypeRetransmission         uint8
    PayloadTypeForwardErrorCorrection uint8
    RTPHeaderExtensions               []RTPHeaderExtension
    MimeType                          string
    ClockRate                         uint32
    Channels                          uint16
    SDPFmtpLine                       string
    RTCPFeedback                      []RTCPFeedback
}

RTPHeaderExtension type

// RTPHeaderExtension represents a negotiated RFC5285 RTP header extension.
type RTPHeaderExtension struct {
    URI string
    ID  int
}

RTCPFeedback type

// RTCPFeedback signals the connection to use additional RTCP packet types.
// https://draft.ortc.org/#dom-rtcrtcpfeedback
type RTCPFeedback struct {
    // Type is the type of feedback.
    // see: https://draft.ortc.org/#dom-rtcrtcpfeedback
    // valid: ack, ccm, nack, goog-remb, transport-cc
    Type string

    // The parameter value depends on the type.
    // For example, type="nack" parameter="pli" will send Picture Loss Indicator packets.
    Parameter string
}

StreamInfo fields

ID: Unique identifier for the streamAttributes: Stream-specific attributes that persist across the stream’s lifetime
func (i *MyInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    fmt.Printf("Stream ID: %s\n", info.ID)
    info.Attributes.Set("startTime", time.Now())
    return writer
}

func (i *MyInterceptor) UnbindLocalStream(info *StreamInfo) {
    if start := info.Attributes.Get("startTime"); start != nil {
        duration := time.Since(start.(time.Time))
        fmt.Printf("Stream duration: %v\n", duration)
    }
}
SSRC: Synchronization source identifier for the main streamSSRCRetransmission: SSRC for retransmitted packets (RFC 4588)SSRCForwardErrorCorrection: SSRC for FEC packets
func (i *MyInterceptor) BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader {
    fmt.Printf("Main SSRC: %d\n", info.SSRC)
    if info.SSRCRetransmission != 0 {
        fmt.Printf("RTX SSRC: %d\n", info.SSRCRetransmission)
    }
    if info.SSRCForwardErrorCorrection != 0 {
        fmt.Printf("FEC SSRC: %d\n", info.SSRCForwardErrorCorrection)
    }
    return reader
}
PayloadType: RTP payload type for the main streamPayloadTypeRetransmission: Payload type for retransmitted packetsPayloadTypeForwardErrorCorrection: Payload type for FEC packets
func (i *MyInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        // Check payload type
        switch header.PayloadType {
        case info.PayloadType:
            // Handle main stream
        case info.PayloadTypeRetransmission:
            // Handle retransmission
        case info.PayloadTypeForwardErrorCorrection:
            // Handle FEC
        }
        return writer.Write(header, payload, attributes)
    })
}
MimeType: Media codec type (e.g., “video/VP8”, “audio/opus”)ClockRate: RTP timestamp clock rateChannels: Number of audio channels (0 for video)SDPFmtpLine: Format-specific parameters from SDP
func (i *MyInterceptor) BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader {
    if strings.HasPrefix(info.MimeType, "video/") {
        fmt.Printf("Video codec: %s at %d Hz\n", info.MimeType, info.ClockRate)
    } else if strings.HasPrefix(info.MimeType, "audio/") {
        fmt.Printf("Audio codec: %s at %d Hz, %d channels\n",
            info.MimeType, info.ClockRate, info.Channels)
    }
    
    if info.SDPFmtpLine != "" {
        fmt.Printf("Format params: %s\n", info.SDPFmtpLine)
    }
    
    return reader
}
RTPHeaderExtensions: Negotiated RTP header extensions
func (i *MyInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    // Find audio level extension
    var audioLevelID int
    for _, ext := range info.RTPHeaderExtensions {
        if ext.URI == "urn:ietf:params:rtp-hdrext:ssrc-audio-level" {
            audioLevelID = ext.ID
            break
        }
    }
    
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        if audioLevelID != 0 {
            // Add audio level extension
            header.SetExtension(uint8(audioLevelID), []byte{0x80 | 0x1F})
        }
        return writer.Write(header, payload, attributes)
    })
}
RTCPFeedback: Negotiated RTCP feedback mechanisms
func (i *MyInterceptor) BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader {
    // Check supported feedback
    supportsPLI := false
    supportsFIR := false
    
    for _, fb := range info.RTCPFeedback {
        if fb.Type == "nack" && fb.Parameter == "pli" {
            supportsPLI = true
        }
        if fb.Type == "ccm" && fb.Parameter == "fir" {
            supportsFIR = true
        }
    }
    
    fmt.Printf("Supports PLI: %v, FIR: %v\n", supportsPLI, supportsFIR)
    return reader
}

Common usage patterns

Per-stream state management

type StatefulInterceptor struct {
    NoOp
    mu      sync.Mutex
    streams map[uint32]*StreamState
}

type StreamState struct {
    Info          *StreamInfo
    PacketCount   uint64
    ByteCount     uint64
    StartTime     time.Time
    LastPacketTime time.Time
}

func (i *StatefulInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    i.mu.Lock()
    i.streams[info.SSRC] = &StreamState{
        Info:      info,
        StartTime: time.Now(),
    }
    i.mu.Unlock()
    
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        i.mu.Lock()
        state := i.streams[info.SSRC]
        state.PacketCount++
        state.ByteCount += uint64(len(payload))
        state.LastPacketTime = time.Now()
        i.mu.Unlock()
        
        return writer.Write(header, payload, attributes)
    })
}

func (i *StatefulInterceptor) UnbindLocalStream(info *StreamInfo) {
    i.mu.Lock()
    defer i.mu.Unlock()
    
    if state, ok := i.streams[info.SSRC]; ok {
        duration := state.LastPacketTime.Sub(state.StartTime)
        avgBitrate := float64(state.ByteCount*8) / duration.Seconds()
        
        fmt.Printf("Stream %d stats: %d packets, %.2f kbps\n",
            info.SSRC, state.PacketCount, avgBitrate/1000)
        
        delete(i.streams, info.SSRC)
    }
}

Codec-specific processing

func (i *CodecInterceptor) BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader {
    switch info.MimeType {
    case "video/VP8":
        return i.createVP8Reader(info, reader)
    case "video/VP9":
        return i.createVP9Reader(info, reader)
    case "video/H264":
        return i.createH264Reader(info, reader)
    case "audio/opus":
        return i.createOpusReader(info, reader)
    default:
        return reader
    }
}

Sharing data between bind and packet handlers

func (i *MyInterceptor) BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter {
    // Store configuration in StreamInfo.Attributes
    clockRate := info.ClockRate
    isVideo := strings.HasPrefix(info.MimeType, "video/")
    
    info.Attributes.Set("clockRate", clockRate)
    info.Attributes.Set("isVideo", isVideo)
    
    return RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes Attributes) (int, error) {
        // Retrieve from StreamInfo.Attributes in packet handler
        if info.Attributes.Get("isVideo").(bool) {
            // Video-specific processing
        }
        
        return writer.Write(header, payload, attributes)
    })
}

Best practices

Use typed keys

Define custom types for attribute keys to avoid conflicts between interceptors.

Check for nil

Always check if retrieved attributes are nil before type assertions.

Cache parsed data

Use GetRTPHeader/GetRTCPPackets to avoid redundant parsing across interceptors.

Clean up in UnbindStream

Remove per-stream data in the Unbind methods to prevent memory leaks.

Interceptor interface

Core interface and lifecycle

Chaining

Passing attributes through chains

Architecture

Overall design principles

Build docs developers (and LLMs) love