Skip to main content

Overview

Branches out the source Observable values as nested Observables (“windows”) using a factory function to determine when each window closes. Opens a window immediately, then calls closingSelector to get an Observable that signals when to close that window and open a new one.
windowWhen is ideal for creating self-renewing windows where the closing condition can be dynamic or based on runtime state.

Type Signature

function windowWhen<T>(
  closingSelector: () => ObservableInput<any>
): OperatorFunction<T, Observable<T>>

Parameters

closingSelector
() => ObservableInput<any>
required
A factory function that takes no arguments and returns an Observable (or Promise, Array, etc.). This is called when each window opens. When the returned Observable emits its first value, the current window closes and a new one immediately opens.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window collects values until the closing selector emits.

Usage Examples

Basic Example: Random Duration Windows

import { fromEvent, windowWhen, interval, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');

// Windows of random duration (1-5 seconds)
const result = clicks.pipe(
  windowWhen(() => interval(1000 + Math.random() * 4000)),
  mergeAll()
);

result.subscribe(x => console.log(x));
// Each window closes after a random interval

Dynamic Window Duration

import { interval, windowWhen, timer, mergeMap, toArray } from 'rxjs';

const source = interval(100);
let windowDuration = 1000;

source.pipe(
  windowWhen(() => {
    const duration = windowDuration;
    console.log(`Opening window with ${duration}ms duration`);
    // Increase duration each time
    windowDuration += 500;
    return timer(duration);
  }),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(values => ({
        window: i,
        duration: 1000 + (i * 500),
        count: values.length,
        values
      }))
    )
  ),
  take(5)
).subscribe(result => {
  console.log('Window result:', result);
});

Adaptive Window Based on Load

import { fromEvent, windowWhen, timer, mergeMap, toArray } from 'rxjs';

const events$ = fromEvent(document, 'click');
const recentWindowSizes: number[] = [];

events$.pipe(
  windowWhen(() => {
    // Calculate average window size
    const avgSize = recentWindowSizes.length > 0
      ? recentWindowSizes.reduce((a, b) => a + b, 0) / recentWindowSizes.length
      : 5;
    
    // If windows are getting large, close them faster
    let duration: number;
    if (avgSize > 10) {
      duration = 500;
      console.log('High activity detected, using 500ms windows');
    } else if (avgSize > 5) {
      duration = 1000;
      console.log('Medium activity, using 1s windows');
    } else {
      duration = 2000;
      console.log('Low activity, using 2s windows');
    }
    
    return timer(duration);
  }),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      tap(values => {
        recentWindowSizes.push(values.length);
        // Keep only last 5 measurements
        if (recentWindowSizes.length > 5) {
          recentWindowSizes.shift();
        }
      }),
      map((values, i) => ({
        window: i,
        count: values.length,
        avgRecentSize: (recentWindowSizes.reduce((a, b) => a + b, 0) / recentWindowSizes.length).toFixed(1)
      }))
    )
  )
).subscribe(result => {
  console.log('Adaptive window:', result);
});

Marble Diagram

Source:   --1--2--3--4--5--6--7--8--9--|
Closing:  |--c1--|  |--c2--|  |--c3--|
Window1:  --1--2--3|
Window2:           --4--5--6|
Window3:                    --7--8--9|
Result:   ----------W1------W2------W3------|

Common Use Cases

  1. Adaptive Batching: Adjust window size based on system load or data volume
  2. Backpressure Management: Control window duration based on processing capacity
  3. Variable Rate Sampling: Change sampling frequency dynamically
  4. Conditional Windowing: Use different strategies based on application state
  5. Resource-Aware Processing: Adjust windows based on available resources
  6. Self-Adjusting Analytics: Windows that adapt to data patterns
The closingSelector is called immediately when subscribing and then again each time a window closes, creating a continuous chain of windows.

Advanced Example: Smart Request Batching

import { Subject, windowWhen, timer, mergeMap, toArray } from 'rxjs';

interface ApiRequest {
  id: string;
  priority: 'high' | 'normal' | 'low';
  endpoint: string;
}

const requestQueue$ = new Subject<ApiRequest>();
let pendingRequests = 0;
let lastBatchSize = 0;
let networkSlow = false;

requestQueue$.pipe(
  windowWhen(() => {
    // Adaptive batching logic
    let delay: number;
    
    if (networkSlow) {
      delay = 5000;
      console.log('Network slow, batching for 5s');
    } else if (pendingRequests > 20) {
      delay = 500;
      console.log('High load, quick 500ms batches');
    } else if (lastBatchSize > 10) {
      delay = 1000;
      console.log('Large batches, using 1s windows');
    } else {
      delay = 2000;
      console.log('Normal operation, 2s windows');
    }
    
    return timer(delay);
  }),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      filter(batch => batch.length > 0),
      mergeMap(async (batch) => {
        lastBatchSize = batch.length;
        pendingRequests += batch.length;
        const startTime = Date.now();
        
        try {
          // Separate by priority
          const highPriority = batch.filter(r => r.priority === 'high');
          const normalPriority = batch.filter(r => r.priority === 'normal');
          const lowPriority = batch.filter(r => r.priority === 'low');
          
          // Process high priority first
          const results = [];
          if (highPriority.length > 0) {
            const res = await fetch('/api/batch/high', {
              method: 'POST',
              body: JSON.stringify(highPriority)
            });
            results.push(await res.json());
          }
          
          // Then normal and low together
          if (normalPriority.length + lowPriority.length > 0) {
            const res = await fetch('/api/batch', {
              method: 'POST',
              body: JSON.stringify([...normalPriority, ...lowPriority])
            });
            results.push(await res.json());
          }
          
          const duration = Date.now() - startTime;
          networkSlow = duration > 3000;
          pendingRequests -= batch.length;
          
          return {
            success: true,
            batchSize: batch.length,
            duration,
            results
          };
        } catch (error) {
          pendingRequests -= batch.length;
          return {
            success: false,
            batchSize: batch.length,
            error: error.message
          };
        }
      })
    )
  )
).subscribe(result => {
  if (result.success) {
    console.log(`✓ Batch of ${result.batchSize} completed in ${result.duration}ms`);
  } else {
    console.error(`✗ Batch of ${result.batchSize} failed:`, result.error);
  }
});

// Usage
requestQueue$.next({
  id: '1',
  priority: 'high',
  endpoint: '/api/critical'
});

State-Based Window Control

import { interval, windowWhen, timer, mergeMap, scan, last } from 'rxjs';

interface AppState {
  mode: 'fast' | 'normal' | 'slow';
  itemsProcessed: number;
}

const appState: AppState = {
  mode: 'normal',
  itemsProcessed: 0
};

const data$ = interval(100).pipe(
  map(i => ({ id: i, value: Math.random() * 100 }))
);

data$.pipe(
  windowWhen(() => {
    // Window duration based on application mode
    const durations = { fast: 500, normal: 1000, slow: 2000 };
    const duration = durations[appState.mode];
    console.log(`Window: ${duration}ms (mode: ${appState.mode})`);
    return timer(duration);
  }),
  mergeMap(window$ => 
    window$.pipe(
      scan((acc, item) => acc + 1, 0),
      last(),
      map(count => {
        appState.itemsProcessed += count;
        
        // Adjust mode based on throughput
        if (count > 8) {
          appState.mode = 'fast';
        } else if (count < 3) {
          appState.mode = 'slow';
        } else {
          appState.mode = 'normal';
        }
        
        return {
          count,
          mode: appState.mode,
          totalProcessed: appState.itemsProcessed
        };
      })
    )
  )
).subscribe(stats => {
  console.log('Window stats:', stats);
});

Error-Based Window Adjustment

import { interval, windowWhen, timer, mergeMap, catchError } from 'rxjs';

let errorCount = 0;
let successCount = 0;

const riskyOperation$ = interval(200).pipe(
  map(i => {
    if (Math.random() < 0.2) throw new Error('Random error');
    return i;
  })
);

riskyOperation$.pipe(
  windowWhen(() => {
    // If error rate is high, use longer windows to reduce processing frequency
    const errorRate = errorCount / (errorCount + successCount);
    let duration: number;
    
    if (errorRate > 0.5) {
      duration = 5000;
      console.log('High error rate, using long windows');
    } else if (errorRate > 0.2) {
      duration = 2000;
      console.log('Moderate errors, using medium windows');
    } else {
      duration = 1000;
      console.log('Low errors, using short windows');
    }
    
    return timer(duration);
  }),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      map(values => {
        successCount += values.length;
        return { success: true, count: values.length };
      }),
      catchError(error => {
        errorCount++;
        return of({ success: false, error: error.message });
      })
    )
  )
).subscribe(result => {
  console.log('Window result:', result);
});

Memory Management

If the closing selector never emits or takes a very long time, the window will continue growing indefinitely, potentially causing memory issues. Always ensure closing selectors eventually emit.
// BAD: Window never closes
windowWhen(() => NEVER).pipe(
  mergeAll()
)

// GOOD: Window always closes eventually
windowWhen(() => timer(1000)).pipe(
  mergeAll()
)

Using Promises

import { interval, windowWhen, mergeMap, toArray } from 'rxjs';

interval(100).pipe(
  windowWhen(() => {
    // Simulate async condition check
    return new Promise(resolve => {
      setTimeout(() => {
        const delay = 1000 + Math.random() * 1000;
        resolve(delay);
      }, 100);
    });
  }),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(values => ({ window: i, count: values.length }))
    )
  ),
  take(5)
).subscribe(result => console.log(result));