Overview
Branches out the source Observable values as nested Observables (“windows”) periodically based on time. Like bufferTime, but emits Observables instead of arrays, allowing you to apply operators to each time-based window.
windowTime is perfect for time-series analysis, real-time monitoring, and any scenario where you need to process data within time-based windows.
Type Signature
function windowTime < T >(
windowTimeSpan : number ,
scheduler ?: SchedulerLike
) : OperatorFunction < T , Observable < T >>
function windowTime < T >(
windowTimeSpan : number ,
windowCreationInterval : number ,
scheduler ?: SchedulerLike
) : OperatorFunction < T , Observable < T >>
function windowTime < T >(
windowTimeSpan : number ,
windowCreationInterval : number | null | void ,
maxWindowSize : number ,
scheduler ?: SchedulerLike
) : OperatorFunction < T , Observable < T >>
Parameters
The amount of time (in milliseconds) each window should remain open before completing.
The interval (in milliseconds) at which to start new windows. If not provided, a new window starts when the previous one completes. If provided, windows can overlap or have gaps.
Maximum number of values each window can emit. When reached, the window completes immediately even if windowTimeSpan hasn’t elapsed.
scheduler
SchedulerLike
default: "asyncScheduler"
The scheduler to use for timing the windows.
Returns
return
OperatorFunction<T, Observable<T>>
A function that returns an Observable of window Observables. Each window emits values from the source for the specified time span.
Usage Examples
Basic Example: Time-Based Windows
Simple Time Windows
Count Items Per Window
import { fromEvent , windowTime , map , take , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const result = clicks . pipe (
windowTime ( 1000 ),
map ( win => win . pipe ( take ( 2 ))),
mergeAll ()
);
result . subscribe ( x => console . log ( x ));
// Every second, emit at most 2 click events from that window
Overlapping Windows
import { fromEvent , windowTime , map , mergeAll , toArray } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
// 1-second windows, starting every 5 seconds
const result = clicks . pipe (
windowTime ( 1000 , 5000 ),
mergeMap (( window$ , i ) =>
window$ . pipe (
toArray (),
map ( clicks => ({ window: i , clicks: clicks . length }))
)
)
);
result . subscribe ( x => console . log ( x ));
// At 0s: window 0 opens (duration 0-1s)
// At 1s: window 0 closes, emits count
// At 5s: window 1 opens (duration 5-6s)
// At 6s: window 1 closes, emits count
With Max Window Size
import { interval , windowTime , mergeMap , toArray } from 'rxjs' ;
interval ( 100 ). pipe (
windowTime ( 1000 , null , 5 ),
mergeMap (( window$ , index ) =>
window$ . pipe (
toArray (),
map ( values => ({
window: index ,
count: values . length ,
reason: values . length === 5 ? 'max size' : 'time elapsed'
}))
)
),
take ( 10 )
). subscribe ( x => console . log ( x ));
// Windows close when reaching 5 items OR 1 second
Marble Diagram
windowTime(50ms)
Source: --1--2--3--4--5--6--7--8--9--|
|----50ms----|
Window1: --1--2--3|
Window2: --4--5--6|
Window3: --7--8--9|
Result: -----W1------W2------W3------|
windowTime(30ms, 50ms) - Overlapping
Source: --1--2--3--4--5--6--7--8--|
|--30ms--|
|----50ms----|
Window1: --1--2|
Window2: --3--4|
Window3: --5--6|
Result: -----W1--W2--W3--W4------|
Common Use Cases
Real-time Metrics : Calculate statistics over time windows
Rate Monitoring : Track events per time period
Throttled Processing : Process data in time-based batches
Analytics : Aggregate user actions within time frames
Alert Systems : Monitor thresholds over time windows
Performance Monitoring : Track system metrics periodically
When the source completes, all active windows complete immediately and are emitted.
Advanced Example: Real-time Monitoring Dashboard
import { interval , windowTime , mergeMap , reduce , map } from 'rxjs' ;
interface SystemMetric {
timestamp : number ;
cpu : number ;
memory : number ;
requests : number ;
}
const metrics$ = interval ( 100 ). pipe (
map (() : SystemMetric => ({
timestamp: Date . now (),
cpu: Math . random () * 100 ,
memory: 50 + Math . random () * 40 ,
requests: Math . floor ( Math . random () * 1000 )
}))
);
// 5-second windows
metrics$ . pipe (
windowTime ( 5000 ),
mergeMap (( window$ , windowIndex ) => {
const startTime = Date . now ();
return window$ . pipe (
reduce (( acc , metric ) => {
acc . count ++ ;
acc . cpuSum += metric . cpu ;
acc . memorySum += metric . memory ;
acc . requestsSum += metric . requests ;
acc . cpuMax = Math . max ( acc . cpuMax , metric . cpu );
acc . memoryMax = Math . max ( acc . memoryMax , metric . memory );
return acc ;
}, {
count: 0 ,
cpuSum: 0 ,
memorySum: 0 ,
requestsSum: 0 ,
cpuMax: 0 ,
memoryMax: 0
}),
map ( acc => ({
windowIndex ,
period: ` ${ new Date ( startTime ). toISOString () } - ${ new Date (). toISOString () } ` ,
sampleCount: acc . count ,
avgCPU: ( acc . cpuSum / acc . count ). toFixed ( 2 ),
maxCPU: acc . cpuMax . toFixed ( 2 ),
avgMemory: ( acc . memorySum / acc . count ). toFixed ( 2 ),
maxMemory: acc . memoryMax . toFixed ( 2 ),
totalRequests: acc . requestsSum ,
requestsPerSecond: ( acc . requestsSum / 5 ). toFixed ( 0 )
}))
);
})
). subscribe ( stats => {
console . log ( '5-second window stats:' , stats );
updateMonitoringDashboard ( stats );
});
Traffic Analysis
import { fromEvent , windowTime , mergeMap , groupBy , reduce } from 'rxjs' ;
const pageViews$ = new Subject <{
page : string ;
userId : string ;
timestamp : number ;
}>();
// Analyze traffic every 60 seconds
pageViews$ . pipe (
windowTime ( 60000 ),
mergeMap (( window$ , minute ) =>
window$ . pipe (
groupBy ( view => view . page ),
mergeMap ( page$ =>
page$ . pipe (
reduce (( acc , view ) => {
acc . views ++ ;
acc . uniqueUsers . add ( view . userId );
return acc ;
}, {
page: page$ . key ,
views: 0 ,
uniqueUsers: new Set < string >()
})
)
),
toArray (),
map ( pages => ({
minute ,
timestamp: new Date (). toISOString (),
pages: pages . map ( p => ({
page: p . page ,
views: p . views ,
uniqueUsers: p . uniqueUsers . size
})),
totalViews: pages . reduce (( sum , p ) => sum + p . views , 0 )
}))
)
)
). subscribe ( analysis => {
console . log ( 'Minute analysis:' , analysis );
sendToAnalytics ( analysis );
});
Alert System with Threshold Detection
import { interval , windowTime , mergeMap , filter , toArray } from 'rxjs' ;
interface SensorReading {
sensorId : string ;
value : number ;
timestamp : number ;
}
const sensorData$ = interval ( 500 ). pipe (
map (() : SensorReading => ({
sensorId: 'TEMP-01' ,
value: 20 + Math . random () * 15 ,
timestamp: Date . now ()
}))
);
// Monitor 10-second windows
sensorData$ . pipe (
windowTime ( 10000 ),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
filter ( readings => readings . length > 0 ),
map ( readings => {
const avg = readings . reduce (( sum , r ) => sum + r . value , 0 ) / readings . length ;
const max = Math . max ( ... readings . map ( r => r . value ));
const min = Math . min ( ... readings . map ( r => r . value ));
const exceedThreshold = readings . filter ( r => r . value > 30 ). length ;
return {
sensorId: readings [ 0 ]. sensorId ,
avg: avg . toFixed ( 2 ),
max: max . toFixed ( 2 ),
min: min . toFixed ( 2 ),
readingsCount: readings . length ,
exceedThreshold ,
alert: exceedThreshold > readings . length * 0.5
};
}),
filter ( stats => stats . alert )
)
)
). subscribe ( alert => {
console . warn ( 'ALERT:' , alert );
sendAlert ( alert );
});
Sliding Window for Trend Detection
import { interval , windowTime , mergeMap , toArray , pairwise } from 'rxjs' ;
const stockPrices$ = interval ( 1000 ). pipe (
map (() => ({
symbol: 'ACME' ,
price: 100 + ( Math . random () - 0.5 ) * 10 ,
timestamp: Date . now ()
}))
);
// 5-second overlapping windows every 2 seconds
stockPrices$ . pipe (
windowTime ( 5000 , 2000 ),
mergeMap ( window$ =>
window$ . pipe (
toArray (),
filter ( prices => prices . length > 1 ),
map ( prices => {
const first = prices [ 0 ]. price ;
const last = prices [ prices . length - 1 ]. price ;
const change = last - first ;
const changePercent = ( change / first ) * 100 ;
return {
symbol: prices [ 0 ]. symbol ,
windowStart: new Date ( prices [ 0 ]. timestamp ). toISOString (),
windowEnd: new Date ( prices [ prices . length - 1 ]. timestamp ). toISOString (),
startPrice: first . toFixed ( 2 ),
endPrice: last . toFixed ( 2 ),
change: change . toFixed ( 2 ),
changePercent: changePercent . toFixed ( 2 ),
trend: change > 0 ? 'up' : change < 0 ? 'down' : 'flat'
};
})
)
)
). subscribe ( analysis => {
console . log ( 'Trend analysis:' , analysis );
if ( Math . abs ( parseFloat ( analysis . changePercent )) > 2 ) {
console . warn ( 'Significant price movement detected!' );
}
});
Use maxWindowSize to prevent memory issues with high-frequency sources. Windows will close early when reaching the max size, providing backpressure.
// Prevent unbounded growth
fastSource$ . pipe (
windowTime ( 5000 , null , 1000 ),
mergeMap ( win => win . pipe ( toArray ()))
)
Each active window maintains its own subscription. With overlapping windows (windowCreationInterval < windowTimeSpan), multiple windows can be active simultaneously, increasing memory usage.