Skip to main content

Overview

The Stream Interceptor allows you to tap into audio streams and consume the data without affecting the actual playback. This is useful for real-time audio analysis, recording, transcription, or monitoring.
The interceptor uses a “man-in-the-middle” approach where data flows through transform streams to both the audio player and your custom consumers.

Basic Concept

An InterceptedStream is a Transform stream that can have multiple consumers (interceptors) attached to it. Audio data flows through the stream to both the player and all registered interceptors:
Source → InterceptedStream → Audio Player

         Interceptor 1
         Interceptor 2
         Interceptor N

Setting Up

Create Stream Interceptor

import { Player, PlayerStreamInterceptor } from 'discord-player';

const player = new Player(client, {
  // ... other options
});

const interceptor = new PlayerStreamInterceptor(player, {
  shouldIntercept: async (queue, track, format, stream) => {
    // Return true to intercept this stream
    return true;
  }
});

Configuration Options

shouldIntercept
ShouldInterceptFunction
required
Function that determines whether a stream should be intercepted
type ShouldInterceptFunction = <T = any>(
  queue: GuildQueue<T>,
  track: Track,
  format: StreamType,
  stream: InterceptedStream,
) => Promise<boolean> | boolean;

Registering Interceptors

Basic Interceptor

Register a handler to receive stream data:
const removeListener = interceptor.onStream(async (queue, track, format, stream) => {
  console.log('Intercepting stream for:', track.title);
  console.log('Format:', format); // 'ogg/opus', 'webm/opus', 'opus', etc.
  
  // Create a consumer to receive data
  const consumer = new Writable({
    write(chunk, encoding, callback) {
      // Process audio chunk
      console.log('Received chunk:', chunk.length, 'bytes');
      callback();
    }
  });
  
  // Add consumer to interceptors
  stream.interceptors.add(consumer);
});

// Later: remove the listener
removeListener();

InterceptedStream API

Properties

interceptors
Set<Writable>
Set of writable streams that receive copies of the data

Methods

startIntercepting
() => void
Start intercepting the stream (default state)
stopIntercepting
() => void
Temporarily stop sending data to interceptors
isIntercepting
() => boolean
Check if the stream is currently being intercepted

Use Cases

Audio Recording

Record audio to a file while it plays:
import fs from 'fs';
import { Writable } from 'stream';

interceptor.onStream(async (queue, track, format, stream) => {
  const filename = `recordings/${track.id}-${Date.now()}.opus`;
  const fileStream = fs.createWriteStream(filename);
  
  stream.interceptors.add(fileStream);
  
  console.log(`Recording to ${filename}`);
  
  // Cleanup when done
  fileStream.on('finish', () => {
    console.log(`Recording saved: ${filename}`);
  });
});

Audio Analysis

Analyze audio in real-time:
import { Writable } from 'stream';

interceptor.onStream(async (queue, track, format, stream) => {
  let totalBytes = 0;
  let chunks = 0;
  
  const analyzer = new Writable({
    write(chunk, encoding, callback) {
      totalBytes += chunk.length;
      chunks++;
      
      // Log stats every 100 chunks
      if (chunks % 100 === 0) {
        console.log(`Stats: ${chunks} chunks, ${totalBytes} bytes total`);
      }
      
      callback();
    }
  });
  
  stream.interceptors.add(analyzer);
});

Conditional Interception

Only intercept specific tracks:
const interceptor = new PlayerStreamInterceptor(player, {
  shouldIntercept: async (queue, track, format, stream) => {
    // Only intercept tracks from specific user
    if (track.requestedBy?.id === 'SPECIFIC_USER_ID') {
      return true;
    }
    
    // Only intercept long tracks
    if (track.durationMS > 300000) { // 5 minutes
      return true;
    }
    
    // Only intercept opus streams
    if (format === 'opus' || format.includes('opus')) {
      return true;
    }
    
    return false;
  }
});

Multiple Consumers

Attach multiple consumers to a single stream:
import fs from 'fs';
import { Writable } from 'stream';

interceptor.onStream(async (queue, track, format, stream) => {
  // Consumer 1: Record to file
  const recorder = fs.createWriteStream(`recordings/${track.id}.opus`);
  stream.interceptors.add(recorder);
  
  // Consumer 2: Analyze volume
  const volumeAnalyzer = new Writable({
    write(chunk, encoding, callback) {
      // Calculate volume from chunk
      const volume = calculateVolume(chunk);
      if (volume > threshold) {
        console.log('High volume detected!');
      }
      callback();
    }
  });
  stream.interceptors.add(volumeAnalyzer);
  
  // Consumer 3: Send to external service
  const uploader = new Writable({
    write(chunk, encoding, callback) {
      sendToExternalService(chunk)
        .then(() => callback())
        .catch(callback);
    }
  });
  stream.interceptors.add(uploader);
});

Pause and Resume Interception

interceptor.onStream(async (queue, track, format, stream) => {
  const consumer = new Writable({
    write(chunk, encoding, callback) {
      console.log('Processing chunk:', chunk.length);
      callback();
    }
  });
  
  stream.interceptors.add(consumer);
  
  // Stop intercepting after 30 seconds
  setTimeout(() => {
    console.log('Pausing interception...');
    stream.stopIntercepting();
  }, 30000);
  
  // Resume after another 10 seconds
  setTimeout(() => {
    console.log('Resuming interception...');
    stream.startIntercepting();
  }, 40000);
});

Advanced Example: Live Transcription

import { Writable } from 'stream';

interceptor.onStream(async (queue, track, format, stream) => {
  const buffer: Buffer[] = [];
  let isProcessing = false;
  
  const transcriber = new Writable({
    write(chunk, encoding, callback) {
      buffer.push(chunk);
      
      // Process buffer when we have enough data
      if (buffer.length >= 100 && !isProcessing) {
        isProcessing = true;
        
        const audioData = Buffer.concat(buffer);
        buffer.length = 0; // Clear buffer
        
        // Send to transcription service
        transcribeAudio(audioData)
          .then(text => {
            if (text) {
              queue.metadata.channel.send(`🎵 Lyrics: ${text}`);
            }
            isProcessing = false;
          })
          .catch(error => {
            console.error('Transcription error:', error);
            isProcessing = false;
          });
      }
      
      callback();
    },
    
    final(callback) {
      // Process remaining buffer
      if (buffer.length > 0) {
        const audioData = Buffer.concat(buffer);
        transcribeAudio(audioData)
          .then(() => callback())
          .catch(callback);
      } else {
        callback();
      }
    }
  });
  
  stream.interceptors.add(transcriber);
});

async function transcribeAudio(audioBuffer: Buffer): Promise<string> {
  // Your transcription logic here
  // This could call services like Google Speech-to-Text, AWS Transcribe, etc.
  return 'transcribed text';
}

Stream Lifecycle

The intercepted stream follows this lifecycle:
  1. Creation: InterceptedStream is created when playback starts
  2. Interception Check: shouldIntercept is called to determine if interception should happen
  3. Handler Execution: All registered onStream handlers are called
  4. Data Flow: Audio chunks flow to both the player and all interceptors
  5. Cleanup: When the stream ends, all interceptors are cleaned up
interceptor.onStream(async (queue, track, format, stream) => {
  const consumer = new Writable({ /* ... */ });
  
  stream.interceptors.add(consumer);
  
  // Cleanup event
  consumer.on('finish', () => {
    console.log('Stream finished');
  });
  
  consumer.on('error', (error) => {
    console.error('Consumer error:', error);
  });
});

Performance Considerations

Each interceptor adds processing overhead. Be mindful of:
  • CPU Usage: Processing each chunk can be CPU-intensive
  • Memory: Buffering data increases memory usage
  • Backpressure: Slow interceptors can cause backpressure
  • Error Handling: Always handle errors in your consumers

Best Practices

Don’t block the stream. Process data asynchronously and use callbacks properly.
If your consumer can’t keep up, implement proper backpressure handling.
Always clean up file streams, buffers, and external connections when done.
Only intercept when necessary to minimize overhead.
If buffering data, implement size limits to prevent memory leaks.

Removing Interceptors

Remove an interceptor from a stream:
interceptor.onStream(async (queue, track, format, stream) => {
  const consumer = new Writable({ /* ... */ });
  
  stream.interceptors.add(consumer);
  
  // Remove after 30 seconds
  setTimeout(() => {
    stream.interceptors.delete(consumer);
    consumer.destroy();
    console.log('Interceptor removed');
  }, 30000);
});

Error Handling

interceptor.onStream(async (queue, track, format, stream) => {
  const consumer = new Writable({
    write(chunk, encoding, callback) {
      try {
        // Your processing logic
        processChunk(chunk);
        callback();
      } catch (error) {
        console.error('Processing error:', error);
        callback(error);
      }
    }
  });
  
  consumer.on('error', (error) => {
    console.error('Consumer error:', error);
    // Optionally remove the consumer
    stream.interceptors.delete(consumer);
  });
  
  stream.interceptors.add(consumer);
});

Queue Events

Learn about queue events

Audio Filters

Apply real-time audio effects

Build docs developers (and LLMs) love