Overview
The Stream Interceptor allows you to tap into audio streams and consume the data without affecting the actual playback. This is useful for real-time audio analysis, recording, transcription, or monitoring.
The interceptor uses a “man-in-the-middle” approach where data flows through transform streams to both the audio player and your custom consumers.
Basic Concept
An InterceptedStream is a Transform stream that can have multiple consumers (interceptors) attached to it. Audio data flows through the stream to both the player and all registered interceptors:
Source → InterceptedStream → Audio Player
↓
Interceptor 1
Interceptor 2
Interceptor N
Setting Up
Create Stream Interceptor
import { Player , PlayerStreamInterceptor } from 'discord-player' ;
const player = new Player ( client , {
// ... other options
});
const interceptor = new PlayerStreamInterceptor ( player , {
shouldIntercept : async ( queue , track , format , stream ) => {
// Return true to intercept this stream
return true ;
}
});
Configuration Options
shouldIntercept
ShouldInterceptFunction
required
Function that determines whether a stream should be intercepted type ShouldInterceptFunction = < T = any >(
queue : GuildQueue < T >,
track : Track ,
format : StreamType ,
stream : InterceptedStream ,
) => Promise < boolean > | boolean ;
Registering Interceptors
Basic Interceptor
Register a handler to receive stream data:
const removeListener = interceptor . onStream ( async ( queue , track , format , stream ) => {
console . log ( 'Intercepting stream for:' , track . title );
console . log ( 'Format:' , format ); // 'ogg/opus', 'webm/opus', 'opus', etc.
// Create a consumer to receive data
const consumer = new Writable ({
write ( chunk , encoding , callback ) {
// Process audio chunk
console . log ( 'Received chunk:' , chunk . length , 'bytes' );
callback ();
}
});
// Add consumer to interceptors
stream . interceptors . add ( consumer );
});
// Later: remove the listener
removeListener ();
InterceptedStream API
Properties
Set of writable streams that receive copies of the data
Methods
Start intercepting the stream (default state)
Temporarily stop sending data to interceptors
Check if the stream is currently being intercepted
Use Cases
Audio Recording
Record audio to a file while it plays:
import fs from 'fs' ;
import { Writable } from 'stream' ;
interceptor . onStream ( async ( queue , track , format , stream ) => {
const filename = `recordings/ ${ track . id } - ${ Date . now () } .opus` ;
const fileStream = fs . createWriteStream ( filename );
stream . interceptors . add ( fileStream );
console . log ( `Recording to ${ filename } ` );
// Cleanup when done
fileStream . on ( 'finish' , () => {
console . log ( `Recording saved: ${ filename } ` );
});
});
Audio Analysis
Analyze audio in real-time:
import { Writable } from 'stream' ;
interceptor . onStream ( async ( queue , track , format , stream ) => {
let totalBytes = 0 ;
let chunks = 0 ;
const analyzer = new Writable ({
write ( chunk , encoding , callback ) {
totalBytes += chunk . length ;
chunks ++ ;
// Log stats every 100 chunks
if ( chunks % 100 === 0 ) {
console . log ( `Stats: ${ chunks } chunks, ${ totalBytes } bytes total` );
}
callback ();
}
});
stream . interceptors . add ( analyzer );
});
Conditional Interception
Only intercept specific tracks:
const interceptor = new PlayerStreamInterceptor ( player , {
shouldIntercept : async ( queue , track , format , stream ) => {
// Only intercept tracks from specific user
if ( track . requestedBy ?. id === 'SPECIFIC_USER_ID' ) {
return true ;
}
// Only intercept long tracks
if ( track . durationMS > 300000 ) { // 5 minutes
return true ;
}
// Only intercept opus streams
if ( format === 'opus' || format . includes ( 'opus' )) {
return true ;
}
return false ;
}
});
Multiple Consumers
Attach multiple consumers to a single stream:
import fs from 'fs' ;
import { Writable } from 'stream' ;
interceptor . onStream ( async ( queue , track , format , stream ) => {
// Consumer 1: Record to file
const recorder = fs . createWriteStream ( `recordings/ ${ track . id } .opus` );
stream . interceptors . add ( recorder );
// Consumer 2: Analyze volume
const volumeAnalyzer = new Writable ({
write ( chunk , encoding , callback ) {
// Calculate volume from chunk
const volume = calculateVolume ( chunk );
if ( volume > threshold ) {
console . log ( 'High volume detected!' );
}
callback ();
}
});
stream . interceptors . add ( volumeAnalyzer );
// Consumer 3: Send to external service
const uploader = new Writable ({
write ( chunk , encoding , callback ) {
sendToExternalService ( chunk )
. then (() => callback ())
. catch ( callback );
}
});
stream . interceptors . add ( uploader );
});
Pause and Resume Interception
interceptor . onStream ( async ( queue , track , format , stream ) => {
const consumer = new Writable ({
write ( chunk , encoding , callback ) {
console . log ( 'Processing chunk:' , chunk . length );
callback ();
}
});
stream . interceptors . add ( consumer );
// Stop intercepting after 30 seconds
setTimeout (() => {
console . log ( 'Pausing interception...' );
stream . stopIntercepting ();
}, 30000 );
// Resume after another 10 seconds
setTimeout (() => {
console . log ( 'Resuming interception...' );
stream . startIntercepting ();
}, 40000 );
});
Advanced Example: Live Transcription
import { Writable } from 'stream' ;
interceptor . onStream ( async ( queue , track , format , stream ) => {
const buffer : Buffer [] = [];
let isProcessing = false ;
const transcriber = new Writable ({
write ( chunk , encoding , callback ) {
buffer . push ( chunk );
// Process buffer when we have enough data
if ( buffer . length >= 100 && ! isProcessing ) {
isProcessing = true ;
const audioData = Buffer . concat ( buffer );
buffer . length = 0 ; // Clear buffer
// Send to transcription service
transcribeAudio ( audioData )
. then ( text => {
if ( text ) {
queue . metadata . channel . send ( `🎵 Lyrics: ${ text } ` );
}
isProcessing = false ;
})
. catch ( error => {
console . error ( 'Transcription error:' , error );
isProcessing = false ;
});
}
callback ();
},
final ( callback ) {
// Process remaining buffer
if ( buffer . length > 0 ) {
const audioData = Buffer . concat ( buffer );
transcribeAudio ( audioData )
. then (() => callback ())
. catch ( callback );
} else {
callback ();
}
}
});
stream . interceptors . add ( transcriber );
});
async function transcribeAudio ( audioBuffer : Buffer ) : Promise < string > {
// Your transcription logic here
// This could call services like Google Speech-to-Text, AWS Transcribe, etc.
return 'transcribed text' ;
}
Stream Lifecycle
The intercepted stream follows this lifecycle:
Creation : InterceptedStream is created when playback starts
Interception Check : shouldIntercept is called to determine if interception should happen
Handler Execution : All registered onStream handlers are called
Data Flow : Audio chunks flow to both the player and all interceptors
Cleanup : When the stream ends, all interceptors are cleaned up
interceptor . onStream ( async ( queue , track , format , stream ) => {
const consumer = new Writable ({ /* ... */ });
stream . interceptors . add ( consumer );
// Cleanup event
consumer . on ( 'finish' , () => {
console . log ( 'Stream finished' );
});
consumer . on ( 'error' , ( error ) => {
console . error ( 'Consumer error:' , error );
});
});
Each interceptor adds processing overhead. Be mindful of:
CPU Usage : Processing each chunk can be CPU-intensive
Memory : Buffering data increases memory usage
Backpressure : Slow interceptors can cause backpressure
Error Handling : Always handle errors in your consumers
Best Practices
Don’t block the stream. Process data asynchronously and use callbacks properly.
Implement backpressure handling
If your consumer can’t keep up, implement proper backpressure handling.
Always clean up file streams, buffers, and external connections when done.
Use shouldIntercept wisely
Only intercept when necessary to minimize overhead.
If buffering data, implement size limits to prevent memory leaks.
Removing Interceptors
Remove an interceptor from a stream:
interceptor . onStream ( async ( queue , track , format , stream ) => {
const consumer = new Writable ({ /* ... */ });
stream . interceptors . add ( consumer );
// Remove after 30 seconds
setTimeout (() => {
stream . interceptors . delete ( consumer );
consumer . destroy ();
console . log ( 'Interceptor removed' );
}, 30000 );
});
Error Handling
interceptor . onStream ( async ( queue , track , format , stream ) => {
const consumer = new Writable ({
write ( chunk , encoding , callback ) {
try {
// Your processing logic
processChunk ( chunk );
callback ();
} catch ( error ) {
console . error ( 'Processing error:' , error );
callback ( error );
}
}
});
consumer . on ( 'error' , ( error ) => {
console . error ( 'Consumer error:' , error );
// Optionally remove the consumer
stream . interceptors . delete ( consumer );
});
stream . interceptors . add ( consumer );
});
Queue Events Learn about queue events
Audio Filters Apply real-time audio effects