Skip to main content

Overview

ReplaySubject is a variant of Subject that records multiple values and replays them to new subscribers. You can configure how many values to buffer and for how long to keep them.
Key Feature: ReplaySubject buffers a specified number of values and replays them to new subscribers in FIFO (First-In-First-Out) order.

Class Signature

class ReplaySubject<T> extends Subject<T> {
  constructor(
    bufferSize?: number,
    windowTime?: number,
    timestampProvider?: TimestampProvider
  );
  
  next(value: T): void;
}

Constructor

const subject = new ReplaySubject<T>(
  bufferSize?: number,
  windowTime?: number,
  timestampProvider?: TimestampProvider
);
bufferSize
number
default:"Infinity"
The maximum number of values to buffer and replay to new subscribers. If not specified, all values are buffered.
windowTime
number
default:"Infinity"
The amount of time (in milliseconds) to keep a value in the buffer before removing it. If not specified, values are kept indefinitely.
timestampProvider
TimestampProvider
default:"dateTimestampProvider"
An object with a now() method that provides the current timestamp for calculating buffer window time.

Usage Examples

Basic Replay

Replay last N values to new subscribers:
import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(3); // Buffer last 3 values

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

// New subscriber gets last 3 values
subject.subscribe({
  next: (v) => console.log(`Observer A: ${v}`)
});
// Observer A: 2
// Observer A: 3
// Observer A: 4

subject.next(5);
// Observer A: 5

// Another new subscriber
subject.subscribe({
  next: (v) => console.log(`Observer B: ${v}`)
});
// Observer B: 3
// Observer B: 4
// Observer B: 5

Time Window Buffer

Replay values within a time window:
import { ReplaySubject } from 'rxjs';

// Keep last 100 values, but only for 500ms
const subject = new ReplaySubject<number>(100, 500);

subject.next(1);
subject.next(2);
subject.next(3);

setTimeout(() => {
  // These subscribers get values if within 500ms
  subject.subscribe({
    next: (v) => console.log(`Early subscriber: ${v}`)
  });
  // Gets: 1, 2, 3
}, 200);

setTimeout(() => {
  // This subscriber is past the 500ms window
  subject.subscribe({
    next: (v) => console.log(`Late subscriber: ${v}`)
  });
  // Gets: (nothing - values expired)
}, 600);

Event History

Keep history of recent events:
import { ReplaySubject } from 'rxjs';
import { map } from 'rxjs/operators';

interface UserAction {
  type: string;
  timestamp: number;
  details: any;
}

class ActivityTracker {
  // Keep last 50 actions, for last 5 minutes
  private actions = new ReplaySubject<UserAction>(50, 5 * 60 * 1000);
  
  actions$ = this.actions.asObservable();
  
  trackAction(type: string, details: any): void {
    this.actions.next({
      type,
      timestamp: Date.now(),
      details
    });
  }
  
  getRecentActions() {
    const history: UserAction[] = [];
    
    this.actions.subscribe(action => {
      history.push(action);
    }).unsubscribe();
    
    return history;
  }
}

const tracker = new ActivityTracker();

tracker.trackAction('page-view', { url: '/home' });
tracker.trackAction('click', { element: 'button1' });
tracker.trackAction('scroll', { position: 500 });

// New analytics component gets recent history
tracker.actions$.subscribe(action => {
  console.log('Action:', action.type, 'at', new Date(action.timestamp));
});

WebSocket Message Cache

Cache recent WebSocket messages:
import { ReplaySubject } from 'rxjs';

class WebSocketService {
  // Keep last 10 messages
  private messages = new ReplaySubject<any>(10);
  
  messages$ = this.messages.asObservable();
  
  private ws: WebSocket;
  
  connect(url: string): void {
    this.ws = new WebSocket(url);
    
    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.messages.next(data);
    };
    
    this.ws.onerror = (error) => {
      this.messages.error(error);
    };
    
    this.ws.onclose = () => {
      this.messages.complete();
    };
  }
  
  disconnect(): void {
    this.ws?.close();
  }
}

const wsService = new WebSocketService();
wsService.connect('ws://localhost:8080');

// Component subscribes later and gets last 10 messages
setTimeout(() => {
  wsService.messages$.subscribe(message => {
    console.log('Cached message:', message);
  });
}, 5000);

Notification System

Keep recent notifications:
import { ReplaySubject } from 'rxjs';
import { scan } from 'rxjs/operators';

interface Notification {
  id: string;
  message: string;
  type: 'info' | 'warning' | 'error';
  timestamp: number;
}

class NotificationService {
  // Keep last 5 notifications for 30 seconds
  private notifications = new ReplaySubject<Notification>(5, 30000);
  
  notifications$ = this.notifications.asObservable();
  
  notify(message: string, type: Notification['type'] = 'info'): void {
    this.notifications.next({
      id: Math.random().toString(36),
      message,
      type,
      timestamp: Date.now()
    });
  }
  
  info(message: string): void {
    this.notify(message, 'info');
  }
  
  warning(message: string): void {
    this.notify(message, 'warning');
  }
  
  error(message: string): void {
    this.notify(message, 'error');
  }
}

const notificationService = new NotificationService();

notificationService.info('Application started');
notificationService.warning('Low memory');
notificationService.error('Connection failed');

// New component gets recent notifications
notificationService.notifications$.subscribe(notification => {
  showNotificationBanner(notification);
});

Chat Message History

Store recent chat messages:
import { ReplaySubject } from 'rxjs';

interface ChatMessage {
  id: string;
  user: string;
  text: string;
  timestamp: number;
}

class ChatRoom {
  // Keep last 100 messages for 1 hour
  private messages = new ReplaySubject<ChatMessage>(100, 60 * 60 * 1000);
  
  messages$ = this.messages.asObservable();
  
  sendMessage(user: string, text: string): void {
    this.messages.next({
      id: crypto.randomUUID(),
      user,
      text,
      timestamp: Date.now()
    });
  }
}

const chatRoom = new ChatRoom();

chatRoom.sendMessage('Alice', 'Hello!');
chatRoom.sendMessage('Bob', 'Hi there!');
chatRoom.sendMessage('Charlie', 'Hey everyone!');

// User joins later and sees recent messages
setTimeout(() => {
  chatRoom.messages$.subscribe(message => {
    displayMessage(message);
  });
}, 10000);

Buffer Behavior

Buffer Size and Time Window work together:
  • Values are removed if buffer exceeds bufferSize
  • Values are removed if older than windowTime
  • Both limits can apply simultaneously

Buffer Size Only

const subject = new ReplaySubject<number>(3);

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4); // 1 is removed (buffer size = 3)
subject.next(5); // 2 is removed

subject.subscribe(x => console.log(x));
// Output: 3, 4, 5

Time Window Only

const subject = new ReplaySubject<number>(Infinity, 1000);

subject.next(1);
setTimeout(() => subject.next(2), 500);
setTimeout(() => subject.next(3), 1500);

setTimeout(() => {
  subject.subscribe(x => console.log(x));
  // Output: 2, 3 (1 expired after 1000ms)
}, 2000);

Both Limits

const subject = new ReplaySubject<number>(3, 1000);

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4); // 1 removed (buffer size limit)

setTimeout(() => {
  subject.subscribe(x => console.log(x));
  // Gets values that fit both limits
}, 500);

Differences from BehaviorSubject

ReplaySubject vs BehaviorSubject:
  • BehaviorSubject: Replays only 1 value (current), requires initial value
  • ReplaySubject: Replays N values, no initial value required
  • ReplaySubject: Can replay even after errors (BehaviorSubject cannot)
import { BehaviorSubject, ReplaySubject } from 'rxjs';

// BehaviorSubject - need initial value, replays 1
const behavior = new BehaviorSubject<number>(0);
behavior.next(1);
behavior.next(2);
behavior.subscribe(x => console.log('Behavior:', x));
// Output: Behavior: 2 (only latest)

// ReplaySubject - no initial value, replays all
const replay = new ReplaySubject<number>();
replay.next(1);
replay.next(2);
replay.subscribe(x => console.log('Replay:', x));
// Output:
// Replay: 1
// Replay: 2

Memory Considerations

ReplaySubject with large bufferSize or windowTime can consume significant memory. Choose appropriate limits based on your use case.
// ❌ Potentially problematic
const unlimited = new ReplaySubject<LargeObject>(); // Unbounded

// ✅ Better - bounded
const bounded = new ReplaySubject<LargeObject>(10, 5000);

Common Patterns

Snapshot Multiple Values

import { ReplaySubject } from 'rxjs';

const subject = new ReplaySubject<number>(5);

subject.next(1);
subject.next(2);
subject.next(3);

const snapshot: number[] = [];
subject.subscribe(x => snapshot.push(x)).unsubscribe();

console.log('Snapshot:', snapshot); // [1, 2, 3]

Combine with startWith

import { ReplaySubject } from 'rxjs';
import { startWith } from 'rxjs/operators';

const subject = new ReplaySubject<number>(3);

subject.next(1);
subject.next(2);

// Ensure at least one value
const withDefault$ = subject.pipe(
  startWith(0)
);

withDefault$.subscribe(console.log);
// Output: 1, 2 (no need for startWith - has values)

When to Use ReplaySubject

  1. Message History: Chat, notifications, activity logs
  2. Caching: Recent API responses, WebSocket messages
  3. Late Subscribers: Components that mount after events occur
  4. Event Replay: Testing, debugging, analytics
  5. State History: Undo/redo functionality
  6. Real-time Data: Stock prices, sensor readings with history

Implementation Details

The buffer is managed as an array with periodic trimming:
class ReplaySubject<T> extends Subject<T> {
  private _buffer: (T | number)[] = [];
  
  next(value: T): void {
    const { _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
    
    if (!this._closed) {
      _buffer.push(value);
      !_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
    }
    
    this._trimBuffer();
    super.next(value);
  }
}

See Also