Skip to main content

Overview

Branches out the source Observable values as nested Observables (“windows”) based on opening and closing signals. When openings emits, a new window starts. For each opening, closingSelector is called to get an Observable that determines when that specific window closes.
windowToggle provides fine-grained control over window lifecycles, allowing multiple concurrent windows with independent durations.

Type Signature

function windowToggle<T, O>(
  openings: ObservableInput<O>,
  closingSelector: (openValue: O) => ObservableInput<any>
): OperatorFunction<T, Observable<T>>

Parameters

openings
ObservableInput<O>
required
An Observable (or Promise, Array, etc.) that signals when to start new windows. Each emission triggers the creation of a new window.
closingSelector
(openValue: O) => ObservableInput<any>
required
A function that receives the value emitted by openings and returns an Observable (or other ObservableInput). When this returned Observable emits its first value, the associated window completes.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values between its opening and closing signals.

Usage Examples

Basic Example: Toggle Windows with Intervals

import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const openings = interval(1000);

// Every other second, capture clicks from the next 500ms
const result = clicks.pipe(
  windowToggle(
    openings,
    i => i % 2 ? interval(500) : EMPTY
  ),
  mergeAll()
);

result.subscribe(x => console.log(x));
// At 1s (i=0): EMPTY closes immediately
// At 2s (i=1): window open for 500ms, captures clicks
// At 3s (i=2): EMPTY closes immediately  
// At 4s (i=3): window open for 500ms, captures clicks

Dynamic Window Duration

import { fromEvent, windowToggle, timer, mergeMap, toArray } from 'rxjs';

const mouseMoves = fromEvent<MouseEvent>(document, 'mousemove');
const clicks = fromEvent<MouseEvent>(document, 'click');

let clickCount = 0;

mouseMoves.pipe(
  windowToggle(
    clicks,
    () => {
      clickCount++;
      // First click: 1s window, second: 2s, third: 3s, etc.
      const duration = clickCount * 1000;
      console.log(`Window ${clickCount} will last ${duration}ms`);
      return timer(duration);
    }
  ),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(moves => ({
        window: i,
        moveCount: moves.length,
        duration: clickCount * 1000
      }))
    )
  )
).subscribe(result => {
  console.log('Window result:', result);
});

Multiple Concurrent Windows

import { interval, windowToggle, timer, mergeMap, map } from 'rxjs';

const source = interval(100).pipe(
  map(i => ({ value: i, timestamp: Date.now() }))
);

// Start a new window every 500ms, each lasts 1 second
const openings = interval(500);

source.pipe(
  windowToggle(
    openings,
    (openIndex) => {
      console.log(`Window ${openIndex} opened`);
      return timer(1000);
    }
  ),
  mergeMap((window$, windowId) => 
    window$.pipe(
      toArray(),
      map(items => ({
        windowId,
        itemCount: items.length,
        values: items.map(i => i.value)
      }))
    )
  ),
  take(5)
).subscribe(result => {
  console.log('Window closed:', result);
});

// At ~0.5s: window 0 opens
// At ~1.0s: window 1 opens (window 0 still active)
// At ~1.5s: window 0 closes (1000ms elapsed), window 2 opens
// Multiple windows active simultaneously!

Marble Diagram

Source:   --a--b--c--d--e--f--g--h--i--j--|
Openings: -----O-----------O-----------O--|
Closing:      |--300ms--|  |--300ms--|
Window1:  -----b--c--d|
Window2:                  --g--h--i|
Window3:                            (closes at end)
Result:   ----------W1----------W2----------W3--|

Common Use Cases

  1. User-Controlled Recording: Start/stop recording based on user actions
  2. Conditional Monitoring: Monitor different durations based on conditions
  3. Overlapping Time Windows: Analyze data with multiple active windows
  4. Event Correlation: Collect related events within dynamic windows
  5. Sampling Strategies: Sample data differently based on triggers
  6. A/B Testing Windows: Different window durations for different scenarios
Multiple windows can be active at the same time. All active windows receive all emitted values from the source Observable.

Advanced Example: Performance Profiling

import { fromEvent, windowToggle, mergeMap, reduce, timer } from 'rxjs';

interface PerformanceEvent {
  type: string;
  duration: number;
  timestamp: number;
}

const performanceEvents$ = new Subject<PerformanceEvent>();
const startProfiling$ = new Subject<string>();
const profileDuration = 10000; // 10 seconds

performanceEvents$.pipe(
  windowToggle(
    startProfiling$,
    (profileId) => {
      console.log(`Started profiling: ${profileId}`);
      return timer(profileDuration);
    }
  ),
  mergeMap((window$, index) => {
    const startTime = Date.now();
    
    return window$.pipe(
      reduce((acc, event) => {
        acc.totalEvents++;
        acc.totalDuration += event.duration;
        
        if (!acc.byType[event.type]) {
          acc.byType[event.type] = { count: 0, totalDuration: 0 };
        }
        acc.byType[event.type].count++;
        acc.byType[event.type].totalDuration += event.duration;
        
        acc.maxDuration = Math.max(acc.maxDuration, event.duration);
        
        return acc;
      }, {
        profileIndex: index,
        totalEvents: 0,
        totalDuration: 0,
        maxDuration: 0,
        byType: {} as Record<string, { count: number; totalDuration: number }>
      }),
      map(stats => ({
        ...stats,
        avgDuration: stats.totalDuration / stats.totalEvents,
        profileDuration: Date.now() - startTime,
        byType: Object.entries(stats.byType).map(([type, data]) => ({
          type,
          count: data.count,
          avgDuration: data.totalDuration / data.count
        }))
      }))
    );
  })
).subscribe(report => {
  console.log('Performance report:', report);
  if (report.avgDuration > 100) {
    console.warn('Performance degradation detected!');
  }
});

// Start profiling sessions
startProfiling$.next('session-1');
setTimeout(() => startProfiling$.next('session-2'), 5000);

Traffic Monitoring with Peak Detection

import { interval, windowToggle, timer, mergeMap, scan, last } from 'rxjs';

interface Request {
  endpoint: string;
  duration: number;
  statusCode: number;
}

const requests$ = interval(100).pipe(
  map((): Request => ({
    endpoint: ['/api/users', '/api/posts', '/api/comments'][Math.floor(Math.random() * 3)],
    duration: Math.random() * 500,
    statusCode: Math.random() > 0.9 ? 500 : 200
  }))
);

// Monitor peaks: when error rate is high, open longer monitoring window
const checkInterval$ = interval(5000);
let consecutiveErrors = 0;

requests$.pipe(
  windowToggle(
    checkInterval$,
    () => {
      // If we've seen many errors, monitor for longer
      const duration = consecutiveErrors > 3 ? 10000 : 3000;
      console.log(`Monitoring window: ${duration}ms`);
      return timer(duration);
    }
  ),
  mergeMap(window$ => 
    window$.pipe(
      scan((acc, req) => {
        acc.total++;
        if (req.statusCode >= 500) acc.errors++;
        acc.totalDuration += req.duration;
        return acc;
      }, { total: 0, errors: 0, totalDuration: 0 }),
      last(),
      map(stats => {
        const errorRate = stats.errors / stats.total;
        consecutiveErrors = errorRate > 0.1 ? consecutiveErrors + 1 : 0;
        
        return {
          total: stats.total,
          errors: stats.errors,
          errorRate: (errorRate * 100).toFixed(2) + '%',
          avgDuration: (stats.totalDuration / stats.total).toFixed(2),
          alert: errorRate > 0.1
        };
      })
    )
  )
).subscribe(stats => {
  console.log('Window stats:', stats);
  if (stats.alert) {
    console.error('HIGH ERROR RATE DETECTED!');
  }
});

User Session Analysis

import { merge, windowToggle, mergeMap, groupBy, reduce } from 'rxjs';

const sessionStarts$ = new Subject<string>(); // userId
const sessionEnds$ = new Subject<string>();   // userId
const userActions$ = new Subject<{ userId: string; action: string }>();

const activeSessions = new Map<string, Subject<any>>();

userActions$.pipe(
  windowToggle(
    sessionStarts$,
    userId => {
      console.log(`Session started: ${userId}`);
      activeSessions.set(userId, new Subject());
      
      // Window closes when that specific user's session ends
      return sessionEnds$.pipe(
        filter(endUserId => endUserId === userId),
        take(1),
        tap(() => {
          console.log(`Session ended: ${userId}`);
          activeSessions.delete(userId);
        })
      );
    }
  ),
  mergeMap((window$) => 
    window$.pipe(
      filter(action => activeSessions.has(action.userId)),
      groupBy(action => action.userId),
      mergeMap(userGroup$ => 
        userGroup$.pipe(
          reduce((acc, action) => {
            acc.actions.push(action.action);
            acc.count++;
            return acc;
          }, {
            userId: userGroup$.key,
            actions: [] as string[],
            count: 0
          })
        )
      )
    )
  )
).subscribe(sessionSummary => {
  console.log('Session summary:', sessionSummary);
});

// Usage
sessionStarts$.next('user1');
sessionStarts$.next('user2');
userActions$.next({ userId: 'user1', action: 'view-page' });
userActions$.next({ userId: 'user2', action: 'click-button' });
sessionEnds$.next('user1');

Error Handling

If either the openings Observable or any closing Observable errors, the error is propagated to all active windows and the output Observable.
import { interval, windowToggle, throwError, catchError } from 'rxjs';

const source = interval(100);
const openings = interval(1000);

source.pipe(
  windowToggle(
    openings,
    i => i === 2 ? throwError(() => new Error('Closing error')) : timer(300)
  ),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(values => ({ window: i, values })),
      catchError(error => {
        console.error(`Error in window ${i}:`, error.message);
        return of({ window: i, values: [], error: error.message });
      })
    )
  )
).subscribe(result => console.log(result));