Skip to main content

Overview

Branches out the source Observable values as nested Observables (“windows”), where each window emits at most windowSize values. You can optionally control when new windows start with the startWindowEvery parameter.
windowCount is the Observable equivalent of bufferCount. Use it when you need to apply Observable operators to fixed-size groups of values.

Type Signature

function windowCount<T>(
  windowSize: number,
  startWindowEvery: number = 0
): OperatorFunction<T, Observable<T>>

Parameters

windowSize
number
required
The maximum number of values each window will emit before completing.
startWindowEvery
number
default:"0"
The interval at which to start new windows. If 0 or not provided, a new window starts when the previous one completes. If less than windowSize, windows will overlap. If greater, some values will be skipped between windows.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window Observable emits at most windowSize values from the source.

Usage Examples

Basic Example: Fixed-Size Windows

import { fromEvent, windowCount, mergeAll, map } from 'rxjs';

const clicks = fromEvent(document, 'click');

const result = clicks.pipe(
  windowCount(3),
  map((win, i) => win.pipe(
    map(event => `Window ${i} click`)
  )),
  mergeAll()
);

result.subscribe(x => console.log(x));
// Click 1: "Window 0 click"
// Click 2: "Window 0 click"  
// Click 3: "Window 0 click"
// Click 4: "Window 1 click" (new window)

Overlapping Windows

import { fromEvent, windowCount, mergeMap, skip, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');

// Start new window every click, each window has 3 items
const result = clicks.pipe(
  windowCount(3, 1),
  mergeMap((win, i) => 
    win.pipe(
      map(() => `Window ${i}`),
      skip(1) // Skip first click of each window
    )
  ),
  mergeAll()
);

result.subscribe(x => console.log(x));
// Click 1: (nothing - skipped)
// Click 2: "Window 0", "Window 1"
// Click 3: "Window 0", "Window 1", "Window 2"
// Click 4: "Window 1", "Window 2", "Window 3"

Skip Values Between Windows

import { range, windowCount, mergeMap, toArray } from 'rxjs';

// Window size 2, start new window every 3 values
range(1, 10).pipe(
  windowCount(2, 3),
  mergeMap((window$, index) => 
    window$.pipe(
      toArray(),
      map(values => ({ window: index, values }))
    )
  )
).subscribe(x => console.log(x));
// { window: 0, values: [1, 2] }
// { window: 1, values: [4, 5] } (3 was skipped)
// { window: 2, values: [7, 8] } (6 was skipped)
// { window: 3, values: [10] }   (9 was skipped)

Marble Diagram

windowCount(3)

Source:  --1--2--3--4--5--6--7--8--9--|
Window1: --1--2--3|
Window2:          --4--5--6|
Window3:                   --7--8--9|
Result:  --W1-----W2------W3------W4--|

windowCount(2, 1) - Overlapping

Source:  --1--2--3--4--5--|
Window0: --1--2|
Window1:    --2--3|
Window2:       --3--4|
Window3:          --4--5|
Window4:             --5|
Result:  --W0-W1-W2-W3-W4-|

Common Use Cases

  1. Batch Processing: Process items in fixed-size batches with Observable operations
  2. Sliding Window Analysis: Analyze trends across overlapping windows
  3. Pagination: Create pages of results from a stream
  4. Chunked Uploads: Split large datasets into chunks for upload
  5. Moving Average: Calculate statistics over sliding windows
  6. Pattern Detection: Look for patterns in fixed-size sequences
When the source completes, any incomplete window (with fewer than windowSize items) is still emitted and completes normally.

Advanced Example: Batch API Requests with Retry

import { from, windowCount, mergeMap, toArray, retry, catchError } from 'rxjs';

interface Item {
  id: number;
  data: any;
}

const items: Item[] = Array.from({ length: 100 }, (_, i) => ({
  id: i,
  data: `item${i}`
}));

from(items).pipe(
  windowCount(10), // Process 10 items at a time
  mergeMap((window$, batchIndex) => 
    window$.pipe(
      toArray(),
      mergeMap(batch => {
        console.log(`Processing batch ${batchIndex} with ${batch.length} items`);
        
        return from(
          fetch('/api/batch', {
            method: 'POST',
            headers: { 'Content-Type': 'application/json' },
            body: JSON.stringify({ batch, batchIndex })
          }).then(res => {
            if (!res.ok) throw new Error(`Batch ${batchIndex} failed`);
            return res.json();
          })
        ).pipe(
          retry(3),
          map(result => ({ 
            batchIndex, 
            success: true, 
            result 
          })),
          catchError(error => {
            console.error(`Batch ${batchIndex} failed after retries:`, error);
            return of({ 
              batchIndex, 
              success: false, 
              error: error.message 
            });
          })
        );
      })
    )
  ), 
  2 // Process 2 batches concurrently
).subscribe(
  result => {
    if (result.success) {
      console.log(`✓ Batch ${result.batchIndex} completed`);
    } else {
      console.log(`✗ Batch ${result.batchIndex} failed: ${result.error}`);
    }
  },
  error => console.error('Fatal error:', error),
  () => console.log('All batches processed')
);

Moving Average Calculation

import { interval, windowCount, mergeMap, toArray, map } from 'rxjs';

const values$ = interval(500).pipe(
  map(() => Math.floor(Math.random() * 100)),
  take(20)
);

// 3-value moving average with sliding window
values$.pipe(
  windowCount(3, 1),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      filter(arr => arr.length === 3),
      map(values => ({
        values,
        average: values.reduce((a, b) => a + b, 0) / values.length
      }))
    )
  )
).subscribe(result => {
  console.log(`Values: [${result.values.join(', ')}], Avg: ${result.average.toFixed(2)}`);
});

Pattern Detection

import { fromEvent, windowCount, mergeMap, toArray, filter } from 'rxjs';

const clicks$ = fromEvent(document, 'click');

// Detect triple-click pattern
const tripleClicks$ = clicks$.pipe(
  windowCount(3, 1),
  mergeMap(window$ => 
    window$.pipe(
      map(e => (e as MouseEvent).timeStamp),
      toArray(),
      filter(timestamps => {
        if (timestamps.length !== 3) return false;
        // Check if all 3 clicks happened within 500ms
        return timestamps[2] - timestamps[0] < 500;
      }),
      map(() => 'Triple click detected!')
    )
  )
);

tripleClicks$.subscribe(msg => {
  console.log(msg);
  showNotification('Triple click!');
});

Data Validation in Chunks

import { from, windowCount, mergeMap, toArray, every } from 'rxjs';

interface DataRecord {
  id: number;
  value: number;
  valid: boolean;
}

const records: DataRecord[] = generateRecords(1000);

from(records).pipe(
  windowCount(50), // Validate 50 records at a time
  mergeMap((window$, chunkIndex) => 
    window$.pipe(
      toArray(),
      mergeMap(chunk => {
        // Validate chunk
        const allValid = chunk.every(r => r.valid && r.value > 0);
        
        return of({
          chunkIndex,
          size: chunk.length,
          valid: allValid,
          invalidCount: chunk.filter(r => !r.valid).length
        });
      })
    )
  )
).subscribe(validation => {
  console.log(`Chunk ${validation.chunkIndex}: ${
    validation.valid ? '✓ Valid' : `✗ ${validation.invalidCount} invalid records`
  }`);
});

Performance Optimization

Use windowCount with mergeMap and a concurrency limit to control memory usage and processing throughput.
import { from, windowCount, mergeMap } from 'rxjs';

const largeDataset = generateLargeDataset();

from(largeDataset).pipe(
  windowCount(100),
  mergeMap(
    window$ => window$.pipe(
      toArray(),
      mergeMap(chunk => processChunk(chunk))
    ),
    3 // Process max 3 windows concurrently
  )
).subscribe(result => console.log('Processed:', result));

Window vs Buffer

import { range, windowCount, mergeMap, reduce } from 'rxjs';

// Can apply Observable operators to each window
range(1, 10).pipe(
  windowCount(3),
  mergeMap(win => win.pipe(
    reduce((sum, val) => sum + val, 0)
  ))
).subscribe(x => console.log(x));
// 6, 15, 24, 10