Overview
Branches out the source Observable values as nested Observables (“windows”) using a factory function to determine when each window closes. Opens a window immediately, then calls closingSelector to get an Observable that signals when to close that window and open a new one.
windowWhen is ideal for creating self-renewing windows where the closing condition can be dynamic or based on runtime state.
Type Signature
function windowWhen < T >(
closingSelector : () => ObservableInput < any >
) : OperatorFunction < T , Observable < T >>
Parameters
closingSelector
() => ObservableInput<any>
required
A factory function that takes no arguments and returns an Observable (or Promise, Array, etc.). This is called when each window opens. When the returned Observable emits its first value, the current window closes and a new one immediately opens.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values until the closing selector emits.
Usage Examples
Basic Example: Random Duration Windows
Random Intervals
Count Items Per Window
import { fromEvent , windowWhen , interval , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
// Windows of random duration (1-5 seconds)
const result = clicks . pipe (
windowWhen (() => interval ( 1000 + Math . random () * 4000 )),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// Each window closes after a random interval
Dynamic Window Duration
import { interval , windowWhen , timer , mergeMap , toArray } from 'rxjs' ;
const source = interval ( 100 );
let windowDuration = 1000 ;
source . pipe (
windowWhen (() => {
const duration = windowDuration ;
console . log ( `Opening window with ${ duration } ms duration` );
// Increase duration each time
windowDuration += 500 ;
return timer ( duration );
}),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( values => ({
window: i ,
duration: 1000 + ( i * 500 ),
count: values . length ,
values
}))
)
),
take ( 5 )
). subscribe ( result => {
console . log ( 'Window result:' , result );
});
Adaptive Window Based on Load
import { fromEvent , windowWhen , timer , mergeMap , toArray } from 'rxjs' ;
const events$ = fromEvent ( document , 'click' );
const recentWindowSizes : number [] = [];
events$ . pipe (
windowWhen (() => {
// Calculate average window size
const avgSize = recentWindowSizes . length > 0
? recentWindowSizes . reduce (( a , b ) => a + b , 0 ) / recentWindowSizes . length
: 5 ;
// If windows are getting large, close them faster
let duration : number ;
if ( avgSize > 10 ) {
duration = 500 ;
console . log ( 'High activity detected, using 500ms windows' );
} else if ( avgSize > 5 ) {
duration = 1000 ;
console . log ( 'Medium activity, using 1s windows' );
} else {
duration = 2000 ;
console . log ( 'Low activity, using 2s windows' );
}
return timer ( duration );
}),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
tap ( values => {
recentWindowSizes . push ( values . length );
// Keep only last 5 measurements
if ( recentWindowSizes . length > 5 ) {
recentWindowSizes . shift ();
}
}),
map (( values , i ) => ({
window: i ,
count: values . length ,
avgRecentSize: ( recentWindowSizes . reduce (( a , b ) => a + b , 0 ) / recentWindowSizes . length ). toFixed ( 1 )
}))
)
)
). subscribe ( result => {
console . log ( 'Adaptive window:' , result );
});
Marble Diagram
Source: --1--2--3--4--5--6--7--8--9--|
Closing: |--c1--| |--c2--| |--c3--|
Window1: --1--2--3|
Window2: --4--5--6|
Window3: --7--8--9|
Result: ----------W1------W2------W3------|
Common Use Cases
Adaptive Batching : Adjust window size based on system load or data volume
Backpressure Management : Control window duration based on processing capacity
Variable Rate Sampling : Change sampling frequency dynamically
Conditional Windowing : Use different strategies based on application state
Resource-Aware Processing : Adjust windows based on available resources
Self-Adjusting Analytics : Windows that adapt to data patterns
The closingSelector is called immediately when subscribing and then again each time a window closes, creating a continuous chain of windows.
Advanced Example: Smart Request Batching
import { Subject , windowWhen , timer , mergeMap , toArray } from 'rxjs' ;
interface ApiRequest {
id : string ;
priority : 'high' | 'normal' | 'low' ;
endpoint : string ;
}
const requestQueue$ = new Subject < ApiRequest >();
let pendingRequests = 0 ;
let lastBatchSize = 0 ;
let networkSlow = false ;
requestQueue$ . pipe (
windowWhen (() => {
// Adaptive batching logic
let delay : number ;
if ( networkSlow ) {
delay = 5000 ;
console . log ( 'Network slow, batching for 5s' );
} else if ( pendingRequests > 20 ) {
delay = 500 ;
console . log ( 'High load, quick 500ms batches' );
} else if ( lastBatchSize > 10 ) {
delay = 1000 ;
console . log ( 'Large batches, using 1s windows' );
} else {
delay = 2000 ;
console . log ( 'Normal operation, 2s windows' );
}
return timer ( delay );
}),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
filter ( batch => batch . length > 0 ),
mergeMap ( async ( batch ) => {
lastBatchSize = batch . length ;
pendingRequests += batch . length ;
const startTime = Date . now ();
try {
// Separate by priority
const highPriority = batch . filter ( r => r . priority === 'high' );
const normalPriority = batch . filter ( r => r . priority === 'normal' );
const lowPriority = batch . filter ( r => r . priority === 'low' );
// Process high priority first
const results = [];
if ( highPriority . length > 0 ) {
const res = await fetch ( '/api/batch/high' , {
method: 'POST' ,
body: JSON . stringify ( highPriority )
});
results . push ( await res . json ());
}
// Then normal and low together
if ( normalPriority . length + lowPriority . length > 0 ) {
const res = await fetch ( '/api/batch' , {
method: 'POST' ,
body: JSON . stringify ([ ... normalPriority , ... lowPriority ])
});
results . push ( await res . json ());
}
const duration = Date . now () - startTime ;
networkSlow = duration > 3000 ;
pendingRequests -= batch . length ;
return {
success: true ,
batchSize: batch . length ,
duration ,
results
};
} catch ( error ) {
pendingRequests -= batch . length ;
return {
success: false ,
batchSize: batch . length ,
error: error . message
};
}
})
)
)
). subscribe ( result => {
if ( result . success ) {
console . log ( `✓ Batch of ${ result . batchSize } completed in ${ result . duration } ms` );
} else {
console . error ( `✗ Batch of ${ result . batchSize } failed:` , result . error );
}
});
// Usage
requestQueue$ . next ({
id: '1' ,
priority: 'high' ,
endpoint: '/api/critical'
});
State-Based Window Control
import { interval , windowWhen , timer , mergeMap , scan , last } from 'rxjs' ;
interface AppState {
mode : 'fast' | 'normal' | 'slow' ;
itemsProcessed : number ;
}
const appState : AppState = {
mode: 'normal' ,
itemsProcessed: 0
};
const data$ = interval ( 100 ). pipe (
map ( i => ({ id: i , value: Math . random () * 100 }))
);
data$ . pipe (
windowWhen (() => {
// Window duration based on application mode
const durations = { fast: 500 , normal: 1000 , slow: 2000 };
const duration = durations [ appState . mode ];
console . log ( `Window: ${ duration } ms (mode: ${ appState . mode } )` );
return timer ( duration );
}),
mergeMap ( window$ =>
window$ . pipe (
scan (( acc , item ) => acc + 1 , 0 ),
last (),
map ( count => {
appState . itemsProcessed += count ;
// Adjust mode based on throughput
if ( count > 8 ) {
appState . mode = 'fast' ;
} else if ( count < 3 ) {
appState . mode = 'slow' ;
} else {
appState . mode = 'normal' ;
}
return {
count ,
mode: appState . mode ,
totalProcessed: appState . itemsProcessed
};
})
)
)
). subscribe ( stats => {
console . log ( 'Window stats:' , stats );
});
Error-Based Window Adjustment
import { interval , windowWhen , timer , mergeMap , catchError } from 'rxjs' ;
let errorCount = 0 ;
let successCount = 0 ;
const riskyOperation$ = interval ( 200 ). pipe (
map ( i => {
if ( Math . random () < 0.2 ) throw new Error ( 'Random error' );
return i ;
})
);
riskyOperation$ . pipe (
windowWhen (() => {
// If error rate is high, use longer windows to reduce processing frequency
const errorRate = errorCount / ( errorCount + successCount );
let duration : number ;
if ( errorRate > 0.5 ) {
duration = 5000 ;
console . log ( 'High error rate, using long windows' );
} else if ( errorRate > 0.2 ) {
duration = 2000 ;
console . log ( 'Moderate errors, using medium windows' );
} else {
duration = 1000 ;
console . log ( 'Low errors, using short windows' );
}
return timer ( duration );
}),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
map ( values => {
successCount += values . length ;
return { success: true , count: values . length };
}),
catchError ( error => {
errorCount ++ ;
return of ({ success: false , error: error . message });
})
)
)
). subscribe ( result => {
console . log ( 'Window result:' , result );
});
Memory Management
If the closing selector never emits or takes a very long time, the window will continue growing indefinitely, potentially causing memory issues. Always ensure closing selectors eventually emit.
// BAD: Window never closes
windowWhen (() => NEVER ). pipe (
mergeAll ()
)
// GOOD: Window always closes eventually
windowWhen (() => timer ( 1000 )). pipe (
mergeAll ()
)
Using Promises
import { interval , windowWhen , mergeMap , toArray } from 'rxjs' ;
interval ( 100 ). pipe (
windowWhen (() => {
// Simulate async condition check
return new Promise ( resolve => {
setTimeout (() => {
const delay = 1000 + Math . random () * 1000 ;
resolve ( delay );
}, 100 );
});
}),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( values => ({ window: i , count: values . length }))
)
),
take ( 5 )
). subscribe ( result => console . log ( result ));