Skip to main content

Overview

Branches out the source Observable values as a nested Observable (a “window”) whenever the windowBoundaries Observable emits. Like buffer, but emits Observables instead of arrays, allowing you to apply operators to each window.
window is the higher-order version of buffer. Use it when you need to apply Observable operators to each group of values rather than working with arrays.

Type Signature

function window<T>(
  windowBoundaries: ObservableInput<any>
): OperatorFunction<T, Observable<T>>

Parameters

windowBoundaries
ObservableInput<any>
required
An Observable (or Promise, Array, etc.) that signals when to close the current window and open a new one. Each emission from this notifier completes the current window Observable and starts a new one.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each inner Observable emits values from the source until the windowBoundaries emits.

Usage Examples

Basic Example: Window on Interval

import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const sec = interval(1000);

const result = clicks.pipe(
  window(sec),
  map(win => win.pipe(take(2))),
  mergeAll()
);

result.subscribe(x => console.log(x));
// Every second, emit at most 2 click events from that window

Process Each Window Differently

import { interval, window, mergeMap, map, toArray } from 'rxjs';

const source = interval(100).pipe(take(50));
const windowBoundary = interval(1000);

source.pipe(
  window(windowBoundary),
  mergeMap((win, windowIndex) => {
    if (windowIndex % 2 === 0) {
      // Even windows: get sum
      return win.pipe(
        reduce((sum, val) => sum + val, 0),
        map(sum => ({ window: windowIndex, type: 'sum', value: sum }))
      );
    } else {
      // Odd windows: get array
      return win.pipe(
        toArray(),
        map(arr => ({ window: windowIndex, type: 'array', value: arr }))
      );
    }
  })
).subscribe(result => console.log(result));

Window with Click Boundaries

import { interval, fromEvent, window, mergeAll } from 'rxjs';

const numbers = interval(1000);
const clicks = fromEvent(document, 'click');

// Each click creates a new window
const windowedNumbers = numbers.pipe(
  window(clicks),
  tap(() => console.log('New window started')),
  mergeAll()
);

windowedNumbers.subscribe(x => {
  console.log('Number:', x);
});

Marble Diagram

Source:     --a--b--c--d--e--f--g--h--i--j--|
Boundaries: --------1--------2--------3-----|
Window 1:   --a--b--c|
Window 2:             --d--e--f|
Window 3:                      --g--h--i|
Window 4:                               --j--|
Result:     --------W1-------W2-------W3-----W4--|
            (emits Observable windows)

Common Use Cases

  1. Batched Processing: Apply different operations to groups of values
  2. Time-Based Analysis: Analyze metrics within time windows
  3. Event Grouping: Group events and process each group
  4. Streaming Analytics: Calculate statistics per window
  5. Rate Limiting: Process items in windows with delays between windows
  6. Real-time Aggregation: Aggregate data over sliding or tumbling windows
Each window Observable must be subscribed to (typically via mergeAll, mergeMap, or switchAll), otherwise values will be lost.

Advanced Example: Analytics Dashboard

import { interval, window, mergeMap, reduce, timer } from 'rxjs';

interface Metric {
  timestamp: number;
  value: number;
  source: string;
}

const metrics$ = interval(100).pipe(
  map(i => ({
    timestamp: Date.now(),
    value: Math.random() * 100,
    source: ['server1', 'server2', 'server3'][i % 3]
  }))
);

// Create windows every 5 seconds
const windowBoundary$ = timer(5000, 5000);

metrics$.pipe(
  window(windowBoundary$),
  mergeMap((window$, windowIndex) => {
    return window$.pipe(
      reduce((acc, metric) => {
        // Accumulate statistics per window
        acc.count++;
        acc.sum += metric.value;
        acc.max = Math.max(acc.max, metric.value);
        acc.min = Math.min(acc.min, metric.value);
        
        if (!acc.bySour[metric.source]) {
          acc.bySource[metric.source] = { count: 0, sum: 0 };
        }
        acc.bySource[metric.source].count++;
        acc.bySource[metric.source].sum += metric.value;
        
        return acc;
      }, {
        windowIndex,
        count: 0,
        sum: 0,
        max: -Infinity,
        min: Infinity,
        bySource: {} as Record<string, { count: number; sum: number }>
      }),
      map(stats => ({
        ...stats,
        avg: stats.sum / stats.count,
        bySource: Object.entries(stats.bySource).map(([source, data]) => ({
          source,
          count: data.count,
          avg: data.sum / data.count
        }))
      }))
    );
  })
).subscribe(windowStats => {
  console.log('Window statistics:', windowStats);
  updateDashboard(windowStats);
});

Sliding Window Analytics

import { interval, window, mergeMap, toArray, pairwise } from 'rxjs';

const data$ = interval(500).pipe(
  map(() => Math.floor(Math.random() * 100))
);

const windowTrigger$ = interval(2000);

// Calculate trend for each window
data$.pipe(
  window(windowTrigger$),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      filter(arr => arr.length > 1),
      map(values => {
        const first = values[0];
        const last = values[values.length - 1];
        const avg = values.reduce((a, b) => a + b, 0) / values.length;
        const trend = last > first ? 'up' : last < first ? 'down' : 'stable';
        
        return {
          values,
          count: values.length,
          avg: Math.round(avg),
          first,
          last,
          trend,
          change: last - first
        };
      })
    )
  )
).subscribe(analysis => {
  console.log('Window analysis:', analysis);
});

Error Handling in Windows

import { interval, window, mergeMap, catchError, of } from 'rxjs';

const source$ = interval(100).pipe(
  map(i => {
    if (i === 15) throw new Error('Error at 15');
    return i;
  })
);

const boundary$ = interval(1000);

source$.pipe(
  window(boundary$),
  mergeMap((window$, index) => 
    window$.pipe(
      toArray(),
      map(values => ({ window: index, values })),
      catchError(error => {
        console.error(`Error in window ${index}:`, error.message);
        return of({ window: index, values: [], error: error.message });
      })
    )
  )
).subscribe(result => {
  console.log('Window result:', result);
});

Window vs Buffer

import { interval, window, mergeMap, take } from 'rxjs';

interval(100).pipe(
  window(interval(1000)),
  mergeMap(win => win.pipe(take(3)))
).subscribe(x => console.log(x));
// Can apply any Observable operator to window
Use window when you need to apply Observable operators (like debounceTime, distinctUntilChanged, etc.) to each group. Use buffer when you just need the values as an array.

Memory Considerations

If you don’t subscribe to the window Observables (via mergeAll, switchAll, etc.), they will still buffer values internally, potentially causing memory leaks.
// BAD: Windows not subscribed
window(trigger$).subscribe(win => {
  console.log('Got window:', win); // Just logs the Observable, doesn't subscribe
});

// GOOD: Windows are flattened
window(trigger$).pipe(
  mergeAll() // or mergeMap, switchAll, etc.
).subscribe(value => {
  console.log('Value:', value);
});