Overview
Branches out the source Observable values as nested Observables (“windows”) based on opening and closing signals. When openings emits, a new window starts. For each opening, closingSelector is called to get an Observable that determines when that specific window closes.
windowToggle provides fine-grained control over window lifecycles, allowing multiple concurrent windows with independent durations.
Type Signature
function windowToggle < T , O >(
openings : ObservableInput < O >,
closingSelector : ( openValue : O ) => ObservableInput < any >
) : OperatorFunction < T , Observable < T >>
Parameters
openings
ObservableInput<O>
required
An Observable (or Promise, Array, etc.) that signals when to start new windows. Each emission triggers the creation of a new window.
closingSelector
(openValue: O) => ObservableInput<any>
required
A function that receives the value emitted by openings and returns an Observable (or other ObservableInput). When this returned Observable emits its first value, the associated window completes.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values between its opening and closing signals.
Usage Examples
Basic Example: Toggle Windows with Intervals
Simple Toggle
Count Window Items
import { fromEvent , interval , windowToggle , EMPTY , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const openings = interval ( 1000 );
// Every other second, capture clicks from the next 500ms
const result = clicks . pipe (
windowToggle (
openings ,
i => i % 2 ? interval ( 500 ) : EMPTY
),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// At 1s (i=0): EMPTY closes immediately
// At 2s (i=1): window open for 500ms, captures clicks
// At 3s (i=2): EMPTY closes immediately
// At 4s (i=3): window open for 500ms, captures clicks
Dynamic Window Duration
import { fromEvent , windowToggle , timer , mergeMap , toArray } from 'rxjs' ;
const mouseMoves = fromEvent < MouseEvent >( document , 'mousemove' );
const clicks = fromEvent < MouseEvent >( document , 'click' );
let clickCount = 0 ;
mouseMoves . pipe (
windowToggle (
clicks ,
() => {
clickCount ++ ;
// First click: 1s window, second: 2s, third: 3s, etc.
const duration = clickCount * 1000 ;
console . log ( `Window ${ clickCount } will last ${ duration } ms` );
return timer ( duration );
}
),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( moves => ({
window: i ,
moveCount: moves . length ,
duration: clickCount * 1000
}))
)
)
). subscribe ( result => {
console . log ( 'Window result:' , result );
});
Multiple Concurrent Windows
import { interval , windowToggle , timer , mergeMap , map } from 'rxjs' ;
const source = interval ( 100 ). pipe (
map ( i => ({ value: i , timestamp: Date . now () }))
);
// Start a new window every 500ms, each lasts 1 second
const openings = interval ( 500 );
source . pipe (
windowToggle (
openings ,
( openIndex ) => {
console . log ( `Window ${ openIndex } opened` );
return timer ( 1000 );
}
),
mergeMap (( window$ , windowId ) =>
window$ . pipe (
toArray (),
map ( items => ({
windowId ,
itemCount: items . length ,
values: items . map ( i => i . value )
}))
)
),
take ( 5 )
). subscribe ( result => {
console . log ( 'Window closed:' , result );
});
// At ~0.5s: window 0 opens
// At ~1.0s: window 1 opens (window 0 still active)
// At ~1.5s: window 0 closes (1000ms elapsed), window 2 opens
// Multiple windows active simultaneously!
Marble Diagram
Source: --a--b--c--d--e--f--g--h--i--j--|
Openings: -----O-----------O-----------O--|
Closing: |--300ms--| |--300ms--|
Window1: -----b--c--d|
Window2: --g--h--i|
Window3: (closes at end)
Result: ----------W1----------W2----------W3--|
Common Use Cases
User-Controlled Recording : Start/stop recording based on user actions
Conditional Monitoring : Monitor different durations based on conditions
Overlapping Time Windows : Analyze data with multiple active windows
Event Correlation : Collect related events within dynamic windows
Sampling Strategies : Sample data differently based on triggers
A/B Testing Windows : Different window durations for different scenarios
Multiple windows can be active at the same time. All active windows receive all emitted values from the source Observable.
import { fromEvent , windowToggle , mergeMap , reduce , timer } from 'rxjs' ;
interface PerformanceEvent {
type : string ;
duration : number ;
timestamp : number ;
}
const performanceEvents$ = new Subject < PerformanceEvent >();
const startProfiling$ = new Subject < string >();
const profileDuration = 10000 ; // 10 seconds
performanceEvents$ . pipe (
windowToggle (
startProfiling$ ,
( profileId ) => {
console . log ( `Started profiling: ${ profileId } ` );
return timer ( profileDuration );
}
),
mergeMap (( window$ , index ) => {
const startTime = Date . now ();
return window$ . pipe (
reduce (( acc , event ) => {
acc . totalEvents ++ ;
acc . totalDuration += event . duration ;
if ( ! acc . byType [ event . type ]) {
acc . byType [ event . type ] = { count: 0 , totalDuration: 0 };
}
acc . byType [ event . type ]. count ++ ;
acc . byType [ event . type ]. totalDuration += event . duration ;
acc . maxDuration = Math . max ( acc . maxDuration , event . duration );
return acc ;
}, {
profileIndex: index ,
totalEvents: 0 ,
totalDuration: 0 ,
maxDuration: 0 ,
byType: {} as Record < string , { count : number ; totalDuration : number }>
}),
map ( stats => ({
... stats ,
avgDuration: stats . totalDuration / stats . totalEvents ,
profileDuration: Date . now () - startTime ,
byType: Object . entries ( stats . byType ). map (([ type , data ]) => ({
type ,
count: data . count ,
avgDuration: data . totalDuration / data . count
}))
}))
);
})
). subscribe ( report => {
console . log ( 'Performance report:' , report );
if ( report . avgDuration > 100 ) {
console . warn ( 'Performance degradation detected!' );
}
});
// Start profiling sessions
startProfiling$ . next ( 'session-1' );
setTimeout (() => startProfiling$ . next ( 'session-2' ), 5000 );
Traffic Monitoring with Peak Detection
import { interval , windowToggle , timer , mergeMap , scan , last } from 'rxjs' ;
interface Request {
endpoint : string ;
duration : number ;
statusCode : number ;
}
const requests$ = interval ( 100 ). pipe (
map (() : Request => ({
endpoint: [ '/api/users' , '/api/posts' , '/api/comments' ][ Math . floor ( Math . random () * 3 )],
duration: Math . random () * 500 ,
statusCode: Math . random () > 0.9 ? 500 : 200
}))
);
// Monitor peaks: when error rate is high, open longer monitoring window
const checkInterval$ = interval ( 5000 );
let consecutiveErrors = 0 ;
requests$ . pipe (
windowToggle (
checkInterval$ ,
() => {
// If we've seen many errors, monitor for longer
const duration = consecutiveErrors > 3 ? 10000 : 3000 ;
console . log ( `Monitoring window: ${ duration } ms` );
return timer ( duration );
}
),
mergeMap ( window$ =>
window$ . pipe (
scan (( acc , req ) => {
acc . total ++ ;
if ( req . statusCode >= 500 ) acc . errors ++ ;
acc . totalDuration += req . duration ;
return acc ;
}, { total: 0 , errors: 0 , totalDuration: 0 }),
last (),
map ( stats => {
const errorRate = stats . errors / stats . total ;
consecutiveErrors = errorRate > 0.1 ? consecutiveErrors + 1 : 0 ;
return {
total: stats . total ,
errors: stats . errors ,
errorRate: ( errorRate * 100 ). toFixed ( 2 ) + '%' ,
avgDuration: ( stats . totalDuration / stats . total ). toFixed ( 2 ),
alert: errorRate > 0.1
};
})
)
)
). subscribe ( stats => {
console . log ( 'Window stats:' , stats );
if ( stats . alert ) {
console . error ( 'HIGH ERROR RATE DETECTED!' );
}
});
User Session Analysis
import { merge , windowToggle , mergeMap , groupBy , reduce } from 'rxjs' ;
const sessionStarts$ = new Subject < string >(); // userId
const sessionEnds$ = new Subject < string >(); // userId
const userActions$ = new Subject <{ userId : string ; action : string }>();
const activeSessions = new Map < string , Subject < any >>();
userActions$ . pipe (
windowToggle (
sessionStarts$ ,
userId => {
console . log ( `Session started: ${ userId } ` );
activeSessions . set ( userId , new Subject ());
// Window closes when that specific user's session ends
return sessionEnds$ . pipe (
filter ( endUserId => endUserId === userId ),
take ( 1 ),
tap (() => {
console . log ( `Session ended: ${ userId } ` );
activeSessions . delete ( userId );
})
);
}
),
mergeMap (( window$ ) =>
window$ . pipe (
filter ( action => activeSessions . has ( action . userId )),
groupBy ( action => action . userId ),
mergeMap ( userGroup$ =>
userGroup$ . pipe (
reduce (( acc , action ) => {
acc . actions . push ( action . action );
acc . count ++ ;
return acc ;
}, {
userId: userGroup$ . key ,
actions: [] as string [],
count: 0
})
)
)
)
)
). subscribe ( sessionSummary => {
console . log ( 'Session summary:' , sessionSummary );
});
// Usage
sessionStarts$ . next ( 'user1' );
sessionStarts$ . next ( 'user2' );
userActions$ . next ({ userId: 'user1' , action: 'view-page' });
userActions$ . next ({ userId: 'user2' , action: 'click-button' });
sessionEnds$ . next ( 'user1' );
Error Handling
If either the openings Observable or any closing Observable errors, the error is propagated to all active windows and the output Observable.
import { interval , windowToggle , throwError , catchError } from 'rxjs' ;
const source = interval ( 100 );
const openings = interval ( 1000 );
source . pipe (
windowToggle (
openings ,
i => i === 2 ? throwError (() => new Error ( 'Closing error' )) : timer ( 300 )
),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( values => ({ window: i , values })),
catchError ( error => {
console . error ( `Error in window ${ i } :` , error . message );
return of ({ window: i , values: [], error: error . message });
})
)
)
). subscribe ( result => console . log ( result ));