Overview
ReplaySubject is a variant of Subject that records multiple values and replays them to new subscribers. You can configure how many values to buffer and for how long to keep them.
Key Feature : ReplaySubject buffers a specified number of values and replays them to new subscribers in FIFO (First-In-First-Out) order.
Class Signature
class ReplaySubject < T > extends Subject < T > {
constructor (
bufferSize ?: number ,
windowTime ?: number ,
timestampProvider ?: TimestampProvider
);
next ( value : T ) : void ;
}
Constructor
const subject = new ReplaySubject < T >(
bufferSize ?: number ,
windowTime ?: number ,
timestampProvider ?: TimestampProvider
);
The maximum number of values to buffer and replay to new subscribers. If not specified, all values are buffered.
The amount of time (in milliseconds) to keep a value in the buffer before removing it. If not specified, values are kept indefinitely.
timestampProvider
TimestampProvider
default: "dateTimestampProvider"
An object with a now() method that provides the current timestamp for calculating buffer window time.
Usage Examples
Basic Replay
Replay last N values to new subscribers:
Replay last 3 values
Replay all values
import { ReplaySubject } from 'rxjs' ;
const subject = new ReplaySubject < number >( 3 ); // Buffer last 3 values
subject . next ( 1 );
subject . next ( 2 );
subject . next ( 3 );
subject . next ( 4 );
// New subscriber gets last 3 values
subject . subscribe ({
next : ( v ) => console . log ( `Observer A: ${ v } ` )
});
// Observer A: 2
// Observer A: 3
// Observer A: 4
subject . next ( 5 );
// Observer A: 5
// Another new subscriber
subject . subscribe ({
next : ( v ) => console . log ( `Observer B: ${ v } ` )
});
// Observer B: 3
// Observer B: 4
// Observer B: 5
Time Window Buffer
Replay values within a time window:
import { ReplaySubject } from 'rxjs' ;
// Keep last 100 values, but only for 500ms
const subject = new ReplaySubject < number >( 100 , 500 );
subject . next ( 1 );
subject . next ( 2 );
subject . next ( 3 );
setTimeout (() => {
// These subscribers get values if within 500ms
subject . subscribe ({
next : ( v ) => console . log ( `Early subscriber: ${ v } ` )
});
// Gets: 1, 2, 3
}, 200 );
setTimeout (() => {
// This subscriber is past the 500ms window
subject . subscribe ({
next : ( v ) => console . log ( `Late subscriber: ${ v } ` )
});
// Gets: (nothing - values expired)
}, 600 );
Event History
Keep history of recent events:
import { ReplaySubject } from 'rxjs' ;
import { map } from 'rxjs/operators' ;
interface UserAction {
type : string ;
timestamp : number ;
details : any ;
}
class ActivityTracker {
// Keep last 50 actions, for last 5 minutes
private actions = new ReplaySubject < UserAction >( 50 , 5 * 60 * 1000 );
actions$ = this . actions . asObservable ();
trackAction ( type : string , details : any ) : void {
this . actions . next ({
type ,
timestamp: Date . now (),
details
});
}
getRecentActions () {
const history : UserAction [] = [];
this . actions . subscribe ( action => {
history . push ( action );
}). unsubscribe ();
return history ;
}
}
const tracker = new ActivityTracker ();
tracker . trackAction ( 'page-view' , { url: '/home' });
tracker . trackAction ( 'click' , { element: 'button1' });
tracker . trackAction ( 'scroll' , { position: 500 });
// New analytics component gets recent history
tracker . actions$ . subscribe ( action => {
console . log ( 'Action:' , action . type , 'at' , new Date ( action . timestamp ));
});
WebSocket Message Cache
Cache recent WebSocket messages:
import { ReplaySubject } from 'rxjs' ;
class WebSocketService {
// Keep last 10 messages
private messages = new ReplaySubject < any >( 10 );
messages$ = this . messages . asObservable ();
private ws : WebSocket ;
connect ( url : string ) : void {
this . ws = new WebSocket ( url );
this . ws . onmessage = ( event ) => {
const data = JSON . parse ( event . data );
this . messages . next ( data );
};
this . ws . onerror = ( error ) => {
this . messages . error ( error );
};
this . ws . onclose = () => {
this . messages . complete ();
};
}
disconnect () : void {
this . ws ?. close ();
}
}
const wsService = new WebSocketService ();
wsService . connect ( 'ws://localhost:8080' );
// Component subscribes later and gets last 10 messages
setTimeout (() => {
wsService . messages$ . subscribe ( message => {
console . log ( 'Cached message:' , message );
});
}, 5000 );
Notification System
Keep recent notifications:
import { ReplaySubject } from 'rxjs' ;
import { scan } from 'rxjs/operators' ;
interface Notification {
id : string ;
message : string ;
type : 'info' | 'warning' | 'error' ;
timestamp : number ;
}
class NotificationService {
// Keep last 5 notifications for 30 seconds
private notifications = new ReplaySubject < Notification >( 5 , 30000 );
notifications$ = this . notifications . asObservable ();
notify ( message : string , type : Notification [ 'type' ] = 'info' ) : void {
this . notifications . next ({
id: Math . random (). toString ( 36 ),
message ,
type ,
timestamp: Date . now ()
});
}
info ( message : string ) : void {
this . notify ( message , 'info' );
}
warning ( message : string ) : void {
this . notify ( message , 'warning' );
}
error ( message : string ) : void {
this . notify ( message , 'error' );
}
}
const notificationService = new NotificationService ();
notificationService . info ( 'Application started' );
notificationService . warning ( 'Low memory' );
notificationService . error ( 'Connection failed' );
// New component gets recent notifications
notificationService . notifications$ . subscribe ( notification => {
showNotificationBanner ( notification );
});
Chat Message History
Store recent chat messages:
import { ReplaySubject } from 'rxjs' ;
interface ChatMessage {
id : string ;
user : string ;
text : string ;
timestamp : number ;
}
class ChatRoom {
// Keep last 100 messages for 1 hour
private messages = new ReplaySubject < ChatMessage >( 100 , 60 * 60 * 1000 );
messages$ = this . messages . asObservable ();
sendMessage ( user : string , text : string ) : void {
this . messages . next ({
id: crypto . randomUUID (),
user ,
text ,
timestamp: Date . now ()
});
}
}
const chatRoom = new ChatRoom ();
chatRoom . sendMessage ( 'Alice' , 'Hello!' );
chatRoom . sendMessage ( 'Bob' , 'Hi there!' );
chatRoom . sendMessage ( 'Charlie' , 'Hey everyone!' );
// User joins later and sees recent messages
setTimeout (() => {
chatRoom . messages$ . subscribe ( message => {
displayMessage ( message );
});
}, 10000 );
Buffer Behavior
Buffer Size and Time Window work together:
Values are removed if buffer exceeds bufferSize
Values are removed if older than windowTime
Both limits can apply simultaneously
Buffer Size Only
const subject = new ReplaySubject < number >( 3 );
subject . next ( 1 );
subject . next ( 2 );
subject . next ( 3 );
subject . next ( 4 ); // 1 is removed (buffer size = 3)
subject . next ( 5 ); // 2 is removed
subject . subscribe ( x => console . log ( x ));
// Output: 3, 4, 5
Time Window Only
const subject = new ReplaySubject < number >( Infinity , 1000 );
subject . next ( 1 );
setTimeout (() => subject . next ( 2 ), 500 );
setTimeout (() => subject . next ( 3 ), 1500 );
setTimeout (() => {
subject . subscribe ( x => console . log ( x ));
// Output: 2, 3 (1 expired after 1000ms)
}, 2000 );
Both Limits
const subject = new ReplaySubject < number >( 3 , 1000 );
subject . next ( 1 );
subject . next ( 2 );
subject . next ( 3 );
subject . next ( 4 ); // 1 removed (buffer size limit)
setTimeout (() => {
subject . subscribe ( x => console . log ( x ));
// Gets values that fit both limits
}, 500 );
Differences from BehaviorSubject
ReplaySubject vs BehaviorSubject:
BehaviorSubject: Replays only 1 value (current), requires initial value
ReplaySubject: Replays N values, no initial value required
ReplaySubject: Can replay even after errors (BehaviorSubject cannot)
import { BehaviorSubject , ReplaySubject } from 'rxjs' ;
// BehaviorSubject - need initial value, replays 1
const behavior = new BehaviorSubject < number >( 0 );
behavior . next ( 1 );
behavior . next ( 2 );
behavior . subscribe ( x => console . log ( 'Behavior:' , x ));
// Output: Behavior: 2 (only latest)
// ReplaySubject - no initial value, replays all
const replay = new ReplaySubject < number >();
replay . next ( 1 );
replay . next ( 2 );
replay . subscribe ( x => console . log ( 'Replay:' , x ));
// Output:
// Replay: 1
// Replay: 2
Memory Considerations
ReplaySubject with large bufferSize or windowTime can consume significant memory. Choose appropriate limits based on your use case.
// ❌ Potentially problematic
const unlimited = new ReplaySubject < LargeObject >(); // Unbounded
// ✅ Better - bounded
const bounded = new ReplaySubject < LargeObject >( 10 , 5000 );
Common Patterns
Snapshot Multiple Values
import { ReplaySubject } from 'rxjs' ;
const subject = new ReplaySubject < number >( 5 );
subject . next ( 1 );
subject . next ( 2 );
subject . next ( 3 );
const snapshot : number [] = [];
subject . subscribe ( x => snapshot . push ( x )). unsubscribe ();
console . log ( 'Snapshot:' , snapshot ); // [1, 2, 3]
Combine with startWith
import { ReplaySubject } from 'rxjs' ;
import { startWith } from 'rxjs/operators' ;
const subject = new ReplaySubject < number >( 3 );
subject . next ( 1 );
subject . next ( 2 );
// Ensure at least one value
const withDefault$ = subject . pipe (
startWith ( 0 )
);
withDefault$ . subscribe ( console . log );
// Output: 1, 2 (no need for startWith - has values)
When to Use ReplaySubject
Message History : Chat, notifications, activity logs
Caching : Recent API responses, WebSocket messages
Late Subscribers : Components that mount after events occur
Event Replay : Testing, debugging, analytics
State History : Undo/redo functionality
Real-time Data : Stock prices, sensor readings with history
Implementation Details
The buffer is managed as an array with periodic trimming:
class ReplaySubject < T > extends Subject < T > {
private _buffer : ( T | number )[] = [];
next ( value : T ) : void {
const { _buffer , _infiniteTimeWindow , _timestampProvider , _windowTime } = this ;
if ( ! this . _closed ) {
_buffer . push ( value );
! _infiniteTimeWindow && _buffer . push ( _timestampProvider . now () + _windowTime );
}
this . _trimBuffer ();
super . next ( value );
}
}
See Also