Overview
Branches out the source Observable values as a nested Observable (a “window”) whenever the windowBoundaries Observable emits. Like buffer, but emits Observables instead of arrays, allowing you to apply operators to each window.
window is the higher-order version of buffer. Use it when you need to apply Observable operators to each group of values rather than working with arrays.
Type Signature
function window < T >(
windowBoundaries : ObservableInput < any >
) : OperatorFunction < T , Observable < T >>
Parameters
windowBoundaries
ObservableInput<any>
required
An Observable (or Promise, Array, etc.) that signals when to close the current window and open a new one. Each emission from this notifier completes the current window Observable and starts a new one.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each inner Observable emits values from the source until the windowBoundaries emits.
Usage Examples
Basic Example: Window on Interval
Simple Windowing
Count Window Items
import { fromEvent , interval , window , map , take , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const sec = interval ( 1000 );
const result = clicks . pipe (
window ( sec ),
map ( win => win . pipe ( take ( 2 ))),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// Every second, emit at most 2 click events from that window
Process Each Window Differently
import { interval , window , mergeMap , map , toArray } from 'rxjs' ;
const source = interval ( 100 ). pipe ( take ( 50 ));
const windowBoundary = interval ( 1000 );
source . pipe (
window ( windowBoundary ),
mergeMap (( win , windowIndex ) => {
if ( windowIndex % 2 === 0 ) {
// Even windows: get sum
return win . pipe (
reduce (( sum , val ) => sum + val , 0 ),
map ( sum => ({ window: windowIndex , type: 'sum' , value: sum }))
);
} else {
// Odd windows: get array
return win . pipe (
toArray (),
map ( arr => ({ window: windowIndex , type: 'array' , value: arr }))
);
}
})
). subscribe ( result => console . log ( result ));
Window with Click Boundaries
import { interval , fromEvent , window , mergeAll } from 'rxjs' ;
const numbers = interval ( 1000 );
const clicks = fromEvent ( document , 'click' );
// Each click creates a new window
const windowedNumbers = numbers . pipe (
window ( clicks ),
tap (() => console . log ( 'New window started' )),
mergeAll ()
);
windowedNumbers . subscribe ( x => {
console . log ( 'Number:' , x );
});
Marble Diagram
Source: --a--b--c--d--e--f--g--h--i--j--|
Boundaries: --------1--------2--------3-----|
Window 1: --a--b--c|
Window 2: --d--e--f|
Window 3: --g--h--i|
Window 4: --j--|
Result: --------W1-------W2-------W3-----W4--|
(emits Observable windows)
Common Use Cases
Batched Processing : Apply different operations to groups of values
Time-Based Analysis : Analyze metrics within time windows
Event Grouping : Group events and process each group
Streaming Analytics : Calculate statistics per window
Rate Limiting : Process items in windows with delays between windows
Real-time Aggregation : Aggregate data over sliding or tumbling windows
Each window Observable must be subscribed to (typically via mergeAll, mergeMap, or switchAll), otherwise values will be lost.
Advanced Example: Analytics Dashboard
import { interval , window , mergeMap , reduce , timer } from 'rxjs' ;
interface Metric {
timestamp : number ;
value : number ;
source : string ;
}
const metrics$ = interval ( 100 ). pipe (
map ( i => ({
timestamp: Date . now (),
value: Math . random () * 100 ,
source: [ 'server1' , 'server2' , 'server3' ][ i % 3 ]
}))
);
// Create windows every 5 seconds
const windowBoundary$ = timer ( 5000 , 5000 );
metrics$ . pipe (
window ( windowBoundary$ ),
mergeMap (( window$ , windowIndex ) => {
return window$ . pipe (
reduce (( acc , metric ) => {
// Accumulate statistics per window
acc . count ++ ;
acc . sum += metric . value ;
acc . max = Math . max ( acc . max , metric . value );
acc . min = Math . min ( acc . min , metric . value );
if ( ! acc . bySour [ metric . source ]) {
acc . bySource [ metric . source ] = { count: 0 , sum: 0 };
}
acc . bySource [ metric . source ]. count ++ ;
acc . bySource [ metric . source ]. sum += metric . value ;
return acc ;
}, {
windowIndex ,
count: 0 ,
sum: 0 ,
max: - Infinity ,
min: Infinity ,
bySource: {} as Record < string , { count : number ; sum : number }>
}),
map ( stats => ({
... stats ,
avg: stats . sum / stats . count ,
bySource: Object . entries ( stats . bySource ). map (([ source , data ]) => ({
source ,
count: data . count ,
avg: data . sum / data . count
}))
}))
);
})
). subscribe ( windowStats => {
console . log ( 'Window statistics:' , windowStats );
updateDashboard ( windowStats );
});
Sliding Window Analytics
import { interval , window , mergeMap , toArray , pairwise } from 'rxjs' ;
const data$ = interval ( 500 ). pipe (
map (() => Math . floor ( Math . random () * 100 ))
);
const windowTrigger$ = interval ( 2000 );
// Calculate trend for each window
data$ . pipe (
window ( windowTrigger$ ),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
filter ( arr => arr . length > 1 ),
map ( values => {
const first = values [ 0 ];
const last = values [ values . length - 1 ];
const avg = values . reduce (( a , b ) => a + b , 0 ) / values . length ;
const trend = last > first ? 'up' : last < first ? 'down' : 'stable' ;
return {
values ,
count: values . length ,
avg: Math . round ( avg ),
first ,
last ,
trend ,
change: last - first
};
})
)
)
). subscribe ( analysis => {
console . log ( 'Window analysis:' , analysis );
});
Error Handling in Windows
import { interval , window , mergeMap , catchError , of } from 'rxjs' ;
const source$ = interval ( 100 ). pipe (
map ( i => {
if ( i === 15 ) throw new Error ( 'Error at 15' );
return i ;
})
);
const boundary$ = interval ( 1000 );
source$ . pipe (
window ( boundary$ ),
mergeMap (( window$ , index ) =>
window$ . pipe (
toArray (),
map ( values => ({ window: index , values })),
catchError ( error => {
console . error ( `Error in window ${ index } :` , error . message );
return of ({ window: index , values: [], error: error . message });
})
)
)
). subscribe ( result => {
console . log ( 'Window result:' , result );
});
Window vs Buffer
window (Observables)
buffer (Arrays)
import { interval , window , mergeMap , take } from 'rxjs' ;
interval ( 100 ). pipe (
window ( interval ( 1000 )),
mergeMap ( win => win . pipe ( take ( 3 )))
). subscribe ( x => console . log ( x ));
// Can apply any Observable operator to window
Use window when you need to apply Observable operators (like debounceTime, distinctUntilChanged, etc.) to each group. Use buffer when you just need the values as an array.
Memory Considerations
If you don’t subscribe to the window Observables (via mergeAll, switchAll, etc.), they will still buffer values internally, potentially causing memory leaks.
// BAD: Windows not subscribed
window ( trigger$ ). subscribe ( win => {
console . log ( 'Got window:' , win ); // Just logs the Observable, doesn't subscribe
});
// GOOD: Windows are flattened
window ( trigger$ ). pipe (
mergeAll () // or mergeMap, switchAll, etc.
). subscribe ( value => {
console . log ( 'Value:' , value );
});