Overview
Branches out the source Observable values as nested Observables (“windows”), where each window emits at most windowSize values. You can optionally control when new windows start with the startWindowEvery parameter.
windowCount is the Observable equivalent of bufferCount. Use it when you need to apply Observable operators to fixed-size groups of values.
Type Signature
function windowCount < T >(
windowSize : number ,
startWindowEvery : number = 0
) : OperatorFunction < T , Observable < T >>
Parameters
The maximum number of values each window will emit before completing.
The interval at which to start new windows. If 0 or not provided, a new window starts when the previous one completes. If less than windowSize, windows will overlap. If greater, some values will be skipped between windows.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window Observable emits at most windowSize values from the source.
Usage Examples
Basic Example: Fixed-Size Windows
Non-overlapping Windows
Process Each Window
import { fromEvent , windowCount , mergeAll , map } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const result = clicks . pipe (
windowCount ( 3 ),
map (( win , i ) => win . pipe (
map ( event => `Window ${ i } click` )
)),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// Click 1: "Window 0 click"
// Click 2: "Window 0 click"
// Click 3: "Window 0 click"
// Click 4: "Window 1 click" (new window)
Overlapping Windows
import { fromEvent , windowCount , mergeMap , skip , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
// Start new window every click, each window has 3 items
const result = clicks . pipe (
windowCount ( 3 , 1 ),
mergeMap (( win , i ) =>
win . pipe (
map (() => `Window ${ i } ` ),
skip ( 1 ) // Skip first click of each window
)
),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// Click 1: (nothing - skipped)
// Click 2: "Window 0", "Window 1"
// Click 3: "Window 0", "Window 1", "Window 2"
// Click 4: "Window 1", "Window 2", "Window 3"
Skip Values Between Windows
import { range , windowCount , mergeMap , toArray } from 'rxjs' ;
// Window size 2, start new window every 3 values
range ( 1 , 10 ). pipe (
windowCount ( 2 , 3 ),
mergeMap (( window$ , index ) =>
window$ . pipe (
toArray (),
map ( values => ({ window: index , values }))
)
)
). subscribe ( x => console . log ( x ));
// { window: 0, values: [1, 2] }
// { window: 1, values: [4, 5] } (3 was skipped)
// { window: 2, values: [7, 8] } (6 was skipped)
// { window: 3, values: [10] } (9 was skipped)
Marble Diagram
windowCount(3)
Source: --1--2--3--4--5--6--7--8--9--|
Window1: --1--2--3|
Window2: --4--5--6|
Window3: --7--8--9|
Result: --W1-----W2------W3------W4--|
windowCount(2, 1) - Overlapping
Source: --1--2--3--4--5--|
Window0: --1--2|
Window1: --2--3|
Window2: --3--4|
Window3: --4--5|
Window4: --5|
Result: --W0-W1-W2-W3-W4-|
Common Use Cases
Batch Processing : Process items in fixed-size batches with Observable operations
Sliding Window Analysis : Analyze trends across overlapping windows
Pagination : Create pages of results from a stream
Chunked Uploads : Split large datasets into chunks for upload
Moving Average : Calculate statistics over sliding windows
Pattern Detection : Look for patterns in fixed-size sequences
When the source completes, any incomplete window (with fewer than windowSize items) is still emitted and completes normally.
Advanced Example: Batch API Requests with Retry
import { from , windowCount , mergeMap , toArray , retry , catchError } from 'rxjs' ;
interface Item {
id : number ;
data : any ;
}
const items : Item [] = Array . from ({ length: 100 }, ( _ , i ) => ({
id: i ,
data: `item ${ i } `
}));
from ( items ). pipe (
windowCount ( 10 ), // Process 10 items at a time
mergeMap (( window$ , batchIndex ) =>
window$ . pipe (
toArray (),
mergeMap ( batch => {
console . log ( `Processing batch ${ batchIndex } with ${ batch . length } items` );
return from (
fetch ( '/api/batch' , {
method: 'POST' ,
headers: { 'Content-Type' : 'application/json' },
body: JSON . stringify ({ batch , batchIndex })
}). then ( res => {
if ( ! res . ok ) throw new Error ( `Batch ${ batchIndex } failed` );
return res . json ();
})
). pipe (
retry ( 3 ),
map ( result => ({
batchIndex ,
success: true ,
result
})),
catchError ( error => {
console . error ( `Batch ${ batchIndex } failed after retries:` , error );
return of ({
batchIndex ,
success: false ,
error: error . message
});
})
);
})
)
),
2 // Process 2 batches concurrently
). subscribe (
result => {
if ( result . success ) {
console . log ( `✓ Batch ${ result . batchIndex } completed` );
} else {
console . log ( `✗ Batch ${ result . batchIndex } failed: ${ result . error } ` );
}
},
error => console . error ( 'Fatal error:' , error ),
() => console . log ( 'All batches processed' )
);
Moving Average Calculation
import { interval , windowCount , mergeMap , toArray , map } from 'rxjs' ;
const values$ = interval ( 500 ). pipe (
map (() => Math . floor ( Math . random () * 100 )),
take ( 20 )
);
// 3-value moving average with sliding window
values$ . pipe (
windowCount ( 3 , 1 ),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
filter ( arr => arr . length === 3 ),
map ( values => ({
values ,
average: values . reduce (( a , b ) => a + b , 0 ) / values . length
}))
)
)
). subscribe ( result => {
console . log ( `Values: [ ${ result . values . join ( ', ' ) } ], Avg: ${ result . average . toFixed ( 2 ) } ` );
});
Pattern Detection
import { fromEvent , windowCount , mergeMap , toArray , filter } from 'rxjs' ;
const clicks$ = fromEvent ( document , 'click' );
// Detect triple-click pattern
const tripleClicks$ = clicks$ . pipe (
windowCount ( 3 , 1 ),
mergeMap ( window$ =>
window$ . pipe (
map ( e => ( e as MouseEvent ). timeStamp ),
toArray (),
filter ( timestamps => {
if ( timestamps . length !== 3 ) return false ;
// Check if all 3 clicks happened within 500ms
return timestamps [ 2 ] - timestamps [ 0 ] < 500 ;
}),
map (() => 'Triple click detected!' )
)
)
);
tripleClicks$ . subscribe ( msg => {
console . log ( msg );
showNotification ( 'Triple click!' );
});
Data Validation in Chunks
import { from , windowCount , mergeMap , toArray , every } from 'rxjs' ;
interface DataRecord {
id : number ;
value : number ;
valid : boolean ;
}
const records : DataRecord [] = generateRecords ( 1000 );
from ( records ). pipe (
windowCount ( 50 ), // Validate 50 records at a time
mergeMap (( window$ , chunkIndex ) =>
window$ . pipe (
toArray (),
mergeMap ( chunk => {
// Validate chunk
const allValid = chunk . every ( r => r . valid && r . value > 0 );
return of ({
chunkIndex ,
size: chunk . length ,
valid: allValid ,
invalidCount: chunk . filter ( r => ! r . valid ). length
});
})
)
)
). subscribe ( validation => {
console . log ( `Chunk ${ validation . chunkIndex } : ${
validation . valid ? '✓ Valid' : `✗ ${ validation . invalidCount } invalid records`
} ` );
});
Use windowCount with mergeMap and a concurrency limit to control memory usage and processing throughput.
import { from , windowCount , mergeMap } from 'rxjs' ;
const largeDataset = generateLargeDataset ();
from ( largeDataset ). pipe (
windowCount ( 100 ),
mergeMap (
window$ => window$ . pipe (
toArray (),
mergeMap ( chunk => processChunk ( chunk ))
),
3 // Process max 3 windows concurrently
)
). subscribe ( result => console . log ( 'Processed:' , result ));
Window vs Buffer
import { range , windowCount , mergeMap , reduce } from 'rxjs' ;
// Can apply Observable operators to each window
range ( 1 , 10 ). pipe (
windowCount ( 3 ),
mergeMap ( win => win . pipe (
reduce (( sum , val ) => sum + val , 0 )
))
). subscribe ( x => console . log ( x ));
// 6, 15, 24, 10