Skip to main content

Overview

Collects values from the source Observable into arrays. Uses a factory function to create an Observable that determines when to close each buffer. When the closing Observable emits, the current buffer is emitted and a new one immediately begins.
bufferWhen is ideal when you need dynamic, self-renewing buffers where the closing condition can vary or depend on runtime state.

Type Signature

function bufferWhen<T>(
  closingSelector: () => ObservableInput<any>
): OperatorFunction<T, T[]>

Parameters

closingSelector
() => ObservableInput<any>
required
A factory function that takes no arguments and returns an Observable (or Promise, Array, etc.). This is called when each buffer opens. When the returned Observable emits its first value, the current buffer closes and emits, and a new buffer immediately opens.

Returns

return
OperatorFunction<T, T[]>
A function that returns an Observable of arrays. Each array contains all values collected since the previous buffer closed.

Usage Examples

Basic Example: Random Duration Buffers

import { fromEvent, bufferWhen, interval } from 'rxjs';

const clicks = fromEvent(document, 'click');

// Emit buffers at random intervals (1-5 seconds)
const buffered = clicks.pipe(
  bufferWhen(() => interval(1000 + Math.random() * 4000))
);

buffered.subscribe(x => console.log(x));
// Random interval: [MouseEvent, MouseEvent, ...]
// Different random interval: [MouseEvent, ...]

Dynamic Buffer Duration Based on State

import { interval, bufferWhen, timer } from 'rxjs';

const source = interval(100);
let bufferDuration = 1000;

const buffered = source.pipe(
  bufferWhen(() => {
    const duration = bufferDuration;
    // Increase duration by 500ms each time
    bufferDuration += 500;
    return timer(duration);
  })
);

buffered.subscribe(x => {
  console.log(`Buffer (duration was ${bufferDuration - 500}ms):`, x);
});
// First buffer (~1s): [0, 1, 2, ..., 9]
// Second buffer (~1.5s): [10, 11, ..., 24]
// Third buffer (~2s): [25, 26, ..., 44]

Adaptive Buffering Based on Load

import { fromEvent, bufferWhen, timer, tap } from 'rxjs';

const events = fromEvent(document, 'click');
let recentBufferSizes: number[] = [];

const adaptiveBuffered = events.pipe(
  bufferWhen(() => {
    // Calculate average buffer size
    const avgSize = recentBufferSizes.length > 0
      ? recentBufferSizes.reduce((a, b) => a + b, 0) / recentBufferSizes.length
      : 5;
    
    // If buffers are getting large, close them faster
    const duration = avgSize > 10 ? 500 : avgSize > 5 ? 1000 : 2000;
    
    return timer(duration);
  }),
  tap(buffer => {
    recentBufferSizes.push(buffer.length);
    // Keep only last 5 measurements
    if (recentBufferSizes.length > 5) {
      recentBufferSizes.shift();
    }
  })
);

adaptiveBuffered.subscribe(x => {
  console.log(`Adaptive buffer (${x.length} items):`, x);
});

Using Promises as Closing Selectors

import { interval, bufferWhen } from 'rxjs';

const source = interval(100);

const buffered = source.pipe(
  bufferWhen(() => {
    // Simulate async operation
    return new Promise(resolve => {
      setTimeout(() => resolve('close'), 1000 + Math.random() * 1000);
    });
  })
);

buffered.subscribe(x => console.log('Buffer:', x));

Marble Diagram

Source:   --1--2--3--4--5--6--7--8--9--10--11--12--|
Closing:  |--c1--|  |--c2--|  |--c3--|
Result:   ------[1,2,3]------[4,5,6]------[7,8,9]------[10,11,12]--|
Each closing selector determines when its buffer emits. A new buffer starts immediately after.

Common Use Cases

  1. Variable Rate Buffering: Adjust buffer timing based on system load or conditions
  2. Backpressure Management: Dynamically control buffer size based on downstream capacity
  3. Adaptive Batching: Change batch frequency based on data volume
  4. Conditional Buffering: Use different timing strategies based on application state
  5. Self-Adjusting Windows: Create buffers that adapt to data patterns
The closingSelector is called immediately when subscribing and then again each time a buffer closes, creating a continuous chain of buffers.

Advanced Example: Network Request Batching

import { Subject, bufferWhen, timer, mergeMap } from 'rxjs';

interface ApiRequest {
  id: string;
  endpoint: string;
  data: any;
}

const requestQueue = new Subject<ApiRequest>();
let pendingRequests = 0;
let networkSlow = false;

const batchedRequests = requestQueue.pipe(
  bufferWhen(() => {
    // If network is slow or we have many pending, wait longer
    const delay = networkSlow ? 5000 : pendingRequests > 10 ? 2000 : 1000;
    return timer(delay);
  }),
  filter(requests => requests.length > 0),
  mergeMap(async (batch) => {
    pendingRequests += batch.length;
    const startTime = Date.now();
    
    try {
      const response = await fetch('/api/batch', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(batch)
      });
      
      const duration = Date.now() - startTime;
      networkSlow = duration > 3000;
      pendingRequests -= batch.length;
      
      return response.json();
    } catch (error) {
      pendingRequests -= batch.length;
      throw error;
    }
  })
);

batchedRequests.subscribe(
  results => console.log('Batch completed:', results),
  error => console.error('Batch failed:', error)
);

// Usage
requestQueue.next({ 
  id: '1', 
  endpoint: '/users', 
  data: { name: 'John' } 
});

Memory and Performance

If the closing selector never emits or takes a very long time, the buffer will continue growing indefinitely, potentially causing memory issues.
import { interval, bufferWhen, NEVER, take } from 'rxjs';

// BAD: This will cause memory issues!
const bad = interval(1).pipe(
  bufferWhen(() => NEVER), // Buffer never closes!
  take(1) // Never reached
);

// GOOD: Always ensure closing selectors eventually emit
const good = interval(1).pipe(
  bufferWhen(() => timer(1000)),
  take(5)
);
  • buffer - Buffer based on a notifier Observable
  • bufferCount - Buffer based on count
  • bufferTime - Buffer based on time intervals
  • bufferToggle - Buffer with opening and closing signals
  • windowWhen - Like bufferWhen, but emits Observables instead of arrays