Skip to main content

Overview

Buffers values from the source Observable based on opening and closing signals. When the openings Observable emits, a new buffer starts collecting values. For each opening, a closing Observable is created using closingSelector, and when it emits, that buffer is emitted and closed.
bufferToggle allows you to create multiple concurrent buffers with independent lifecycles, making it powerful for complex buffering scenarios.

Type Signature

function bufferToggle<T, O>(
  openings: ObservableInput<O>,
  closingSelector: (value: O) => ObservableInput<any>
): OperatorFunction<T, T[]>

Parameters

openings
ObservableInput<O>
required
An Observable (or Promise, Array, etc.) that signals when to start new buffers. Each emission triggers the creation of a new buffer.
closingSelector
(value: O) => ObservableInput<any>
required
A function that receives the value emitted by openings and returns an Observable (or other ObservableInput). When this returned Observable emits its first value, the associated buffer is emitted and closed.

Returns

return
OperatorFunction<T, T[]>
A function that returns an Observable of arrays. Each array contains the values collected between an opening signal and its corresponding closing signal.

Usage Examples

Basic Example: Toggle Buffers with Intervals

import { fromEvent, interval, bufferToggle, EMPTY } from 'rxjs';

const clicks = fromEvent(document, 'click');
const openings = interval(1000);

// Every other second, collect clicks from the next 500ms
const buffered = clicks.pipe(
  bufferToggle(openings, i => i % 2 ? interval(500) : EMPTY)
);

buffered.subscribe(x => console.log(x));
// At 1s (i=0): EMPTY closes immediately, emits []
// At 2s (i=1): buffer opens, closes after 500ms
// At 3s (i=2): EMPTY closes immediately, emits []
// At 4s (i=3): buffer opens, closes after 500ms

Dynamic Buffer Duration

import { fromEvent, bufferToggle, interval, timer } from 'rxjs';

const mouseMoves = fromEvent<MouseEvent>(document, 'mousemove');
const clicks = fromEvent<MouseEvent>(document, 'click');

// Each click starts a buffer, duration depends on click count
let clickCount = 0;
const buffered = mouseMoves.pipe(
  bufferToggle(
    clicks,
    () => {
      clickCount++;
      // First click: 1s buffer, second: 2s, third: 3s, etc.
      return timer(clickCount * 1000);
    }
  )
);

buffered.subscribe(moves => {
  console.log(`Captured ${moves.length} mouse movements`);
});

Multiple Concurrent Buffers

import { interval, bufferToggle, timer, map } from 'rxjs';

const source = interval(100).pipe(
  map(i => ({ value: i, timestamp: Date.now() }))
);

// Start a buffer every 500ms, each lasts for 1 second
const openings = interval(500);
const buffered = source.pipe(
  bufferToggle(openings, () => timer(1000))
);

buffered.subscribe(items => {
  console.log('Buffer emitted with', items.length, 'items');
  console.log('Values:', items.map(i => i.value));
});

// At ~1.0s: buffer started at 0s closes (10 items)
// At ~1.5s: buffer started at 0.5s closes (10 items)
// At ~2.0s: buffer started at 1.0s closes (10 items)
// Multiple buffers can be active simultaneously!

Marble Diagram

Source:   --a--b--c--d--e--f--g--h--i--j--k--l--m--n--|
Openings: -----O-----------O-----------O-----------|
Closing:      |--300ms--|  |--300ms--|  |--300ms--|
Result:   ----------[b,c,d]-----------[g,h,i]-----------[k,l,m]--|
Each O starts a new buffer that runs for 300ms.

Common Use Cases

  1. Time-Windowed Sampling: Capture data during specific time windows
  2. User-Controlled Recording: Start/stop recording based on user actions
  3. Conditional Buffering: Buffer different durations based on conditions
  4. Overlapping Time Windows: Analyze data with multiple concurrent buffers
  5. Event Correlation: Collect related events within dynamically-determined windows
Multiple buffers can be active at the same time. All active buffers receive all emitted values from the source.

Advanced Example: Performance Monitoring

import { fromEvent, bufferToggle, interval, timer, map, filter } from 'rxjs';

interface PerformanceMetric {
  type: string;
  duration: number;
  timestamp: number;
}

const performanceEvents = new Subject<PerformanceMetric>();

// Monitor performance every 10 seconds for a 5-second window
const monitoringPeriods = interval(10000);

const performanceReports = performanceEvents.pipe(
  bufferToggle(
    monitoringPeriods,
    () => timer(5000)
  ),
  filter(metrics => metrics.length > 0),
  map(metrics => ({
    count: metrics.length,
    avgDuration: metrics.reduce((sum, m) => sum + m.duration, 0) / metrics.length,
    maxDuration: Math.max(...metrics.map(m => m.duration)),
    types: [...new Set(metrics.map(m => m.type))],
    timestamp: Date.now()
  }))
);

performanceReports.subscribe(report => {
  console.log('Performance report:', report);
  if (report.avgDuration > 1000) {
    console.warn('Performance degradation detected!');
  }
});

Error Handling

If either the openings Observable or any closing Observable errors, the error is propagated to all active buffers and the output Observable.
import { interval, bufferToggle, throwError, catchError } from 'rxjs';

const source = interval(100);
const openings = interval(1000);

const buffered = source.pipe(
  bufferToggle(
    openings,
    i => i === 2 ? throwError(() => new Error('Buffer error')) : timer(300)
  ),
  catchError(err => {
    console.error('Error caught:', err.message);
    return EMPTY;
  })
);

buffered.subscribe(x => console.log(x));