Overview
Collects values from the source Observable into arrays. Uses a factory function to create an Observable that determines when to close each buffer. When the closing Observable emits, the current buffer is emitted and a new one immediately begins.
bufferWhen is ideal when you need dynamic, self-renewing buffers where the closing condition can vary or depend on runtime state.
Type Signature
function bufferWhen < T >(
closingSelector : () => ObservableInput < any >
) : OperatorFunction < T , T []>
Parameters
closingSelector
() => ObservableInput<any>
required
A factory function that takes no arguments and returns an Observable (or Promise, Array, etc.). This is called when each buffer opens. When the returned Observable emits its first value, the current buffer closes and emits, and a new buffer immediately opens.
Returns
A function that returns an Observable of arrays. Each array contains all values collected since the previous buffer closed.
Usage Examples
Basic Example: Random Duration Buffers
Random Intervals
Fixed Interval
import { fromEvent , bufferWhen , interval } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
// Emit buffers at random intervals (1-5 seconds)
const buffered = clicks . pipe (
bufferWhen (() => interval ( 1000 + Math . random () * 4000 ))
);
buffered . subscribe ( x => console . log ( x ));
// Random interval: [MouseEvent, MouseEvent, ...]
// Different random interval: [MouseEvent, ...]
Dynamic Buffer Duration Based on State
import { interval , bufferWhen , timer } from 'rxjs' ;
const source = interval ( 100 );
let bufferDuration = 1000 ;
const buffered = source . pipe (
bufferWhen (() => {
const duration = bufferDuration ;
// Increase duration by 500ms each time
bufferDuration += 500 ;
return timer ( duration );
})
);
buffered . subscribe ( x => {
console . log ( `Buffer (duration was ${ bufferDuration - 500 } ms):` , x );
});
// First buffer (~1s): [0, 1, 2, ..., 9]
// Second buffer (~1.5s): [10, 11, ..., 24]
// Third buffer (~2s): [25, 26, ..., 44]
Adaptive Buffering Based on Load
import { fromEvent , bufferWhen , timer , tap } from 'rxjs' ;
const events = fromEvent ( document , 'click' );
let recentBufferSizes : number [] = [];
const adaptiveBuffered = events . pipe (
bufferWhen (() => {
// Calculate average buffer size
const avgSize = recentBufferSizes . length > 0
? recentBufferSizes . reduce (( a , b ) => a + b , 0 ) / recentBufferSizes . length
: 5 ;
// If buffers are getting large, close them faster
const duration = avgSize > 10 ? 500 : avgSize > 5 ? 1000 : 2000 ;
return timer ( duration );
}),
tap ( buffer => {
recentBufferSizes . push ( buffer . length );
// Keep only last 5 measurements
if ( recentBufferSizes . length > 5 ) {
recentBufferSizes . shift ();
}
})
);
adaptiveBuffered . subscribe ( x => {
console . log ( `Adaptive buffer ( ${ x . length } items):` , x );
});
Using Promises as Closing Selectors
import { interval , bufferWhen } from 'rxjs' ;
const source = interval ( 100 );
const buffered = source . pipe (
bufferWhen (() => {
// Simulate async operation
return new Promise ( resolve => {
setTimeout (() => resolve ( 'close' ), 1000 + Math . random () * 1000 );
});
})
);
buffered . subscribe ( x => console . log ( 'Buffer:' , x ));
Marble Diagram
Source: --1--2--3--4--5--6--7--8--9--10--11--12--|
Closing: |--c1--| |--c2--| |--c3--|
Result: ------[1,2,3]------[4,5,6]------[7,8,9]------[10,11,12]--|
Each closing selector determines when its buffer emits. A new buffer starts immediately after.
Common Use Cases
Variable Rate Buffering : Adjust buffer timing based on system load or conditions
Backpressure Management : Dynamically control buffer size based on downstream capacity
Adaptive Batching : Change batch frequency based on data volume
Conditional Buffering : Use different timing strategies based on application state
Self-Adjusting Windows : Create buffers that adapt to data patterns
The closingSelector is called immediately when subscribing and then again each time a buffer closes, creating a continuous chain of buffers.
Advanced Example: Network Request Batching
import { Subject , bufferWhen , timer , mergeMap } from 'rxjs' ;
interface ApiRequest {
id : string ;
endpoint : string ;
data : any ;
}
const requestQueue = new Subject < ApiRequest >();
let pendingRequests = 0 ;
let networkSlow = false ;
const batchedRequests = requestQueue . pipe (
bufferWhen (() => {
// If network is slow or we have many pending, wait longer
const delay = networkSlow ? 5000 : pendingRequests > 10 ? 2000 : 1000 ;
return timer ( delay );
}),
filter ( requests => requests . length > 0 ),
mergeMap ( async ( batch ) => {
pendingRequests += batch . length ;
const startTime = Date . now ();
try {
const response = await fetch ( '/api/batch' , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ( batch )
});
const duration = Date . now () - startTime ;
networkSlow = duration > 3000 ;
pendingRequests -= batch . length ;
return response . json ();
} catch ( error ) {
pendingRequests -= batch . length ;
throw error ;
}
})
);
batchedRequests . subscribe (
results => console . log ( 'Batch completed:' , results ),
error => console . error ( 'Batch failed:' , error )
);
// Usage
requestQueue . next ({
id: '1' ,
endpoint: '/users' ,
data: { name: 'John' }
});
If the closing selector never emits or takes a very long time, the buffer will continue growing indefinitely, potentially causing memory issues.
import { interval , bufferWhen , NEVER , take } from 'rxjs' ;
// BAD: This will cause memory issues!
const bad = interval ( 1 ). pipe (
bufferWhen (() => NEVER ), // Buffer never closes!
take ( 1 ) // Never reached
);
// GOOD: Always ensure closing selectors eventually emit
const good = interval ( 1 ). pipe (
bufferWhen (() => timer ( 1000 )),
take ( 5 )
);
buffer - Buffer based on a notifier Observable
bufferCount - Buffer based on count
bufferTime - Buffer based on time intervals
bufferToggle - Buffer with opening and closing signals
windowWhen - Like bufferWhen, but emits Observables instead of arrays