Skip to main content

Overview

Branches out the source Observable values as nested Observables (“windows”) periodically based on time. Like bufferTime, but emits Observables instead of arrays, allowing you to apply operators to each time-based window.
windowTime is perfect for time-series analysis, real-time monitoring, and any scenario where you need to process data within time-based windows.

Type Signature

function windowTime<T>(
  windowTimeSpan: number,
  scheduler?: SchedulerLike
): OperatorFunction<T, Observable<T>>

function windowTime<T>(
  windowTimeSpan: number,
  windowCreationInterval: number,
  scheduler?: SchedulerLike
): OperatorFunction<T, Observable<T>>

function windowTime<T>(
  windowTimeSpan: number,
  windowCreationInterval: number | null | void,
  maxWindowSize: number,
  scheduler?: SchedulerLike
): OperatorFunction<T, Observable<T>>

Parameters

windowTimeSpan
number
required
The amount of time (in milliseconds) each window should remain open before completing.
windowCreationInterval
number | null
The interval (in milliseconds) at which to start new windows. If not provided, a new window starts when the previous one completes. If provided, windows can overlap or have gaps.
maxWindowSize
number
Maximum number of values each window can emit. When reached, the window completes immediately even if windowTimeSpan hasn’t elapsed.
scheduler
SchedulerLike
default:"asyncScheduler"
The scheduler to use for timing the windows.

Returns

return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window emits values from the source for the specified time span.

Usage Examples

Basic Example: Time-Based Windows

import { fromEvent, windowTime, map, take, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');

const result = clicks.pipe(
  windowTime(1000),
  map(win => win.pipe(take(2))),
  mergeAll()
);

result.subscribe(x => console.log(x));
// Every second, emit at most 2 click events from that window

Overlapping Windows

import { fromEvent, windowTime, map, mergeAll, toArray } from 'rxjs';

const clicks = fromEvent(document, 'click');

// 1-second windows, starting every 5 seconds
const result = clicks.pipe(
  windowTime(1000, 5000),
  mergeMap((window$, i) => 
    window$.pipe(
      toArray(),
      map(clicks => ({ window: i, clicks: clicks.length }))
    )
  )
);

result.subscribe(x => console.log(x));
// At 0s: window 0 opens (duration 0-1s)
// At 1s: window 0 closes, emits count
// At 5s: window 1 opens (duration 5-6s)
// At 6s: window 1 closes, emits count

With Max Window Size

import { interval, windowTime, mergeMap, toArray } from 'rxjs';

interval(100).pipe(
  windowTime(1000, null, 5),
  mergeMap((window$, index) => 
    window$.pipe(
      toArray(),
      map(values => ({ 
        window: index, 
        count: values.length,
        reason: values.length === 5 ? 'max size' : 'time elapsed'
      }))
    )
  ),
  take(10)
).subscribe(x => console.log(x));
// Windows close when reaching 5 items OR 1 second

Marble Diagram

windowTime(50ms)

Source:  --1--2--3--4--5--6--7--8--9--|
         |----50ms----|
Window1: --1--2--3|
Window2:          --4--5--6|
Window3:                   --7--8--9|
Result:  -----W1------W2------W3------|

windowTime(30ms, 50ms) - Overlapping

Source:  --1--2--3--4--5--6--7--8--|
         |--30ms--|
         |----50ms----|
Window1: --1--2|
Window2:       --3--4|
Window3:             --5--6|
Result:  -----W1--W2--W3--W4------|

Common Use Cases

  1. Real-time Metrics: Calculate statistics over time windows
  2. Rate Monitoring: Track events per time period
  3. Throttled Processing: Process data in time-based batches
  4. Analytics: Aggregate user actions within time frames
  5. Alert Systems: Monitor thresholds over time windows
  6. Performance Monitoring: Track system metrics periodically
When the source completes, all active windows complete immediately and are emitted.

Advanced Example: Real-time Monitoring Dashboard

import { interval, windowTime, mergeMap, reduce, map } from 'rxjs';

interface SystemMetric {
  timestamp: number;
  cpu: number;
  memory: number;
  requests: number;
}

const metrics$ = interval(100).pipe(
  map((): SystemMetric => ({
    timestamp: Date.now(),
    cpu: Math.random() * 100,
    memory: 50 + Math.random() * 40,
    requests: Math.floor(Math.random() * 1000)
  }))
);

// 5-second windows
metrics$.pipe(
  windowTime(5000),
  mergeMap((window$, windowIndex) => {
    const startTime = Date.now();
    
    return window$.pipe(
      reduce((acc, metric) => {
        acc.count++;
        acc.cpuSum += metric.cpu;
        acc.memorySum += metric.memory;
        acc.requestsSum += metric.requests;
        acc.cpuMax = Math.max(acc.cpuMax, metric.cpu);
        acc.memoryMax = Math.max(acc.memoryMax, metric.memory);
        return acc;
      }, {
        count: 0,
        cpuSum: 0,
        memorySum: 0,
        requestsSum: 0,
        cpuMax: 0,
        memoryMax: 0
      }),
      map(acc => ({
        windowIndex,
        period: `${new Date(startTime).toISOString()} - ${new Date().toISOString()}`,
        sampleCount: acc.count,
        avgCPU: (acc.cpuSum / acc.count).toFixed(2),
        maxCPU: acc.cpuMax.toFixed(2),
        avgMemory: (acc.memorySum / acc.count).toFixed(2),
        maxMemory: acc.memoryMax.toFixed(2),
        totalRequests: acc.requestsSum,
        requestsPerSecond: (acc.requestsSum / 5).toFixed(0)
      }))
    );
  })
).subscribe(stats => {
  console.log('5-second window stats:', stats);
  updateMonitoringDashboard(stats);
});

Traffic Analysis

import { fromEvent, windowTime, mergeMap, groupBy, reduce } from 'rxjs';

const pageViews$ = new Subject<{
  page: string;
  userId: string;
  timestamp: number;
}>();

// Analyze traffic every 60 seconds
pageViews$.pipe(
  windowTime(60000),
  mergeMap((window$, minute) => 
    window$.pipe(
      groupBy(view => view.page),
      mergeMap(page$ => 
        page$.pipe(
          reduce((acc, view) => {
            acc.views++;
            acc.uniqueUsers.add(view.userId);
            return acc;
          }, {
            page: page$.key,
            views: 0,
            uniqueUsers: new Set<string>()
          })
        )
      ),
      toArray(),
      map(pages => ({
        minute,
        timestamp: new Date().toISOString(),
        pages: pages.map(p => ({
          page: p.page,
          views: p.views,
          uniqueUsers: p.uniqueUsers.size
        })),
        totalViews: pages.reduce((sum, p) => sum + p.views, 0)
      }))
    )
  )
).subscribe(analysis => {
  console.log('Minute analysis:', analysis);
  sendToAnalytics(analysis);
});

Alert System with Threshold Detection

import { interval, windowTime, mergeMap, filter, toArray } from 'rxjs';

interface SensorReading {
  sensorId: string;
  value: number;
  timestamp: number;
}

const sensorData$ = interval(500).pipe(
  map((): SensorReading => ({
    sensorId: 'TEMP-01',
    value: 20 + Math.random() * 15,
    timestamp: Date.now()
  }))
);

// Monitor 10-second windows
sensorData$.pipe(
  windowTime(10000),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      filter(readings => readings.length > 0),
      map(readings => {
        const avg = readings.reduce((sum, r) => sum + r.value, 0) / readings.length;
        const max = Math.max(...readings.map(r => r.value));
        const min = Math.min(...readings.map(r => r.value));
        const exceedThreshold = readings.filter(r => r.value > 30).length;
        
        return {
          sensorId: readings[0].sensorId,
          avg: avg.toFixed(2),
          max: max.toFixed(2),
          min: min.toFixed(2),
          readingsCount: readings.length,
          exceedThreshold,
          alert: exceedThreshold > readings.length * 0.5
        };
      }),
      filter(stats => stats.alert)
    )
  )
).subscribe(alert => {
  console.warn('ALERT:', alert);
  sendAlert(alert);
});

Sliding Window for Trend Detection

import { interval, windowTime, mergeMap, toArray, pairwise } from 'rxjs';

const stockPrices$ = interval(1000).pipe(
  map(() => ({
    symbol: 'ACME',
    price: 100 + (Math.random() - 0.5) * 10,
    timestamp: Date.now()
  }))
);

// 5-second overlapping windows every 2 seconds
stockPrices$.pipe(
  windowTime(5000, 2000),
  mergeMap(window$ => 
    window$.pipe(
      toArray(),
      filter(prices => prices.length > 1),
      map(prices => {
        const first = prices[0].price;
        const last = prices[prices.length - 1].price;
        const change = last - first;
        const changePercent = (change / first) * 100;
        
        return {
          symbol: prices[0].symbol,
          windowStart: new Date(prices[0].timestamp).toISOString(),
          windowEnd: new Date(prices[prices.length - 1].timestamp).toISOString(),
          startPrice: first.toFixed(2),
          endPrice: last.toFixed(2),
          change: change.toFixed(2),
          changePercent: changePercent.toFixed(2),
          trend: change > 0 ? 'up' : change < 0 ? 'down' : 'flat'
        };
      })
    )
  )
).subscribe(analysis => {
  console.log('Trend analysis:', analysis);
  if (Math.abs(parseFloat(analysis.changePercent)) > 2) {
    console.warn('Significant price movement detected!');
  }
});

Performance Considerations

Use maxWindowSize to prevent memory issues with high-frequency sources. Windows will close early when reaching the max size, providing backpressure.
// Prevent unbounded growth
fastSource$.pipe(
  windowTime(5000, null, 1000),
  mergeMap(win => win.pipe(toArray()))
)
Each active window maintains its own subscription. With overlapping windows (windowCreationInterval < windowTimeSpan), multiple windows can be active simultaneously, increasing memory usage.