A Subject is a special type of Observable that allows values to be multicasted to many Observers. While plain Observables are unicast (each subscriber gets independent execution), Subjects are multicast (all subscribers share the same execution).
What is a Subject?
A Subject is both an Observable and an Observer:
- As an Observable: You can subscribe to it
- As an Observer: You can call
next(), error(), and complete() to emit values
Think of a Subject like an event emitter that can have multiple listeners, combined with the power of Observable operators.
Type Signature
class Subject<T> extends Observable<T> implements SubscriptionLike {
observers: Observer<T>[];
closed: boolean;
next(value: T): void;
error(err: any): void;
complete(): void;
asObservable(): Observable<T>;
unsubscribe(): void;
}
Unicast vs Multicast
Unicast (Observable)
Multicast (Subject)
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
console.log('Execution started');
subscriber.next(Math.random());
});
observable.subscribe(x => console.log('A:', x));
observable.subscribe(x => console.log('B:', x));
// Output:
// Execution started
// A: 0.123
// Execution started
// B: 0.456
// Two separate executions!
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(x => console.log('A:', x));
subject.subscribe(x => console.log('B:', x));
subject.next(Math.random());
// Output:
// A: 0.123
// B: 0.123
// Same value for both!
Creating and Using Subjects
Basic Subject
import { Subject } from 'rxjs';
const subject = new Subject<number>();
// Subscribe observers
subject.subscribe({
next: v => console.log(`Observer A: ${v}`)
});
subject.subscribe({
next: v => console.log(`Observer B: ${v}`)
});
// Emit values
subject.next(1);
subject.next(2);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
Subject as Observer
You can pass a Subject as an observer to subscribe to an Observable:
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: v => console.log(`Observer A: ${v}`)
});
subject.subscribe({
next: v => console.log(`Observer B: ${v}`)
});
const observable = from([1, 2, 3]);
// Subject subscribes to Observable
observable.subscribe(subject);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
// Observer A: 3
// Observer B: 3
Subject Variants
BehaviorSubject
Stores the latest value and emits it immediately to new subscribers.
import { BehaviorSubject } from 'rxjs';
// Requires an initial value
const subject = new BehaviorSubject<number>(0);
subject.subscribe(v => console.log(`Observer A: ${v}`));
// Output: Observer A: 0 (initial value)
subject.next(1);
subject.next(2);
// New subscriber gets latest value immediately
subject.subscribe(v => console.log(`Observer B: ${v}`));
// Output: Observer B: 2
subject.next(3);
// Output:
// Observer A: 3
// Observer B: 3
Type Signature:
class BehaviorSubject<T> extends Subject<T> {
constructor(private _value: T);
get value(): T;
getValue(): T;
}
Use BehaviorSubject for state management - It always has a current value and is perfect for representing application state.
ReplaySubject
Records multiple values and replays them to new subscribers.
import { ReplaySubject } from 'rxjs';
// Buffer last 3 values
const subject = new ReplaySubject<number>(3);
subject.subscribe(v => console.log(`Observer A: ${v}`));
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
// New subscriber gets last 3 values
subject.subscribe(v => console.log(`Observer B: ${v}`));
// Output:
// Observer B: 2
// Observer B: 3
// Observer B: 4
subject.next(5);
// Output:
// Observer A: 5
// Observer B: 5
Type Signature:
class ReplaySubject<T> extends Subject<T> {
constructor(
bufferSize?: number,
windowTime?: number,
timestampProvider?: TimestampProvider
);
}
With time window:
import { ReplaySubject } from 'rxjs';
// Buffer 100 values, but only for 500ms
const subject = new ReplaySubject<number>(100, 500);
subject.subscribe(v => console.log(`Observer A: ${v}`));
let i = 1;
const interval = setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe(v => console.log(`Observer B: ${v}`));
clearInterval(interval);
}, 1000);
// Observer B only gets values from last 500ms
Use ReplaySubject for caching - Perfect for caching HTTP responses or recent events.
AsyncSubject
Only emits the last value when the sequence completes.
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject<number>();
subject.subscribe(v => console.log(`Observer A: ${v}`));
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe(v => console.log(`Observer B: ${v}`));
subject.next(5);
subject.complete();
// Output (only after complete):
// Observer A: 5
// Observer B: 5
Type Signature:
class AsyncSubject<T> extends Subject<T> {
// Only emits last value on complete()
}
AsyncSubject is similar to Promise - It only emits one final value when complete.
Void Subject
When the value doesn’t matter, only the event:
import { Subject } from 'rxjs';
const subject = new Subject<void>();
subject.subscribe(() => console.log('Event occurred!'));
setTimeout(() => subject.next(), 1000);
// Output (after 1 second): Event occurred!
Practical Examples
Event Bus
import { Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
interface AppEvent {
type: string;
payload: any;
}
class EventBus {
private subject = new Subject<AppEvent>();
emit(event: AppEvent): void {
this.subject.next(event);
}
on(eventType: string) {
return this.subject.pipe(
filter(event => event.type === eventType)
);
}
}
const bus = new EventBus();
// Subscribe to specific events
bus.on('user:login').subscribe(event => {
console.log('User logged in:', event.payload);
});
bus.on('user:logout').subscribe(event => {
console.log('User logged out:', event.payload);
});
// Emit events
bus.emit({ type: 'user:login', payload: { id: 1, name: 'Alice' } });
bus.emit({ type: 'user:logout', payload: { id: 1 } });
State Store
import { BehaviorSubject } from 'rxjs';
import { map } from 'rxjs/operators';
interface AppState {
user: { name: string; id: number } | null;
items: string[];
loading: boolean;
}
class Store {
private state$ = new BehaviorSubject<AppState>({
user: null,
items: [],
loading: false
});
// Expose as Observable (read-only)
readonly state = this.state$.asObservable();
// Selectors
readonly user$ = this.state.pipe(map(state => state.user));
readonly items$ = this.state.pipe(map(state => state.items));
readonly loading$ = this.state.pipe(map(state => state.loading));
// Update state
setState(partial: Partial<AppState>): void {
this.state$.next({ ...this.state$.value, ...partial });
}
// Get current state
getState(): AppState {
return this.state$.value;
}
}
const store = new Store();
// Subscribe to state changes
store.user$.subscribe(user => console.log('User:', user));
store.items$.subscribe(items => console.log('Items:', items));
// Update state
store.setState({ user: { name: 'Alice', id: 1 } });
store.setState({ items: ['item1', 'item2'] });
Component Communication
import { Subject } from 'rxjs';
// Service for component communication
class MessageService {
private messageSubject = new Subject<string>();
message$ = this.messageSubject.asObservable();
sendMessage(message: string): void {
this.messageSubject.next(message);
}
}
const messageService = new MessageService();
// Component A listens
messageService.message$.subscribe(msg => {
console.log('Component A received:', msg);
});
// Component B listens
messageService.message$.subscribe(msg => {
console.log('Component B received:', msg);
});
// Component C sends
messageService.sendMessage('Hello from C!');
// Output:
// Component A received: Hello from C!
// Component B received: Hello from C!
Caching HTTP Requests
import { ReplaySubject, Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';
class DataCache {
private cache = new Map<string, ReplaySubject<any>>();
get(url: string): Observable<any> {
if (!this.cache.has(url)) {
const subject = new ReplaySubject(1);
this.cache.set(url, subject);
ajax(url).subscribe(
response => {
subject.next(response);
subject.complete();
},
error => subject.error(error)
);
}
return this.cache.get(url)!.asObservable();
}
clear(url?: string): void {
if (url) {
this.cache.delete(url);
} else {
this.cache.clear();
}
}
}
const cache = new DataCache();
// First call - fetches from API
cache.get('/api/users').subscribe(data => console.log('Call 1:', data));
// Second call - returns cached value
cache.get('/api/users').subscribe(data => console.log('Call 2:', data));
// Only one HTTP request made!
Real-time Data Stream
import { Subject, interval } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
class SensorData {
private dataStream$ = new Subject<number>();
// Throttled stream for UI updates
readonly throttled$ = this.dataStream$.pipe(
throttleTime(1000) // Max 1 update per second
);
// Raw stream for logging
readonly raw$ = this.dataStream$.asObservable();
pushData(value: number): void {
this.dataStream$.next(value);
}
}
const sensor = new SensorData();
// UI subscribes to throttled stream
sensor.throttled$.subscribe(value => {
updateChart(value);
});
// Logger subscribes to raw stream
sensor.raw$.subscribe(value => {
logToDatabase(value);
});
// Simulate sensor readings
interval(100).subscribe(i => {
sensor.pushData(Math.random() * 100);
});
Best Practices
1. Use asObservable() for Encapsulation
import { Subject } from 'rxjs';
// ✗ BAD: Exposing Subject directly
class Service {
data$ = new Subject<string>();
}
const service = new Service();
service.data$.next('hack'); // Anyone can emit!
// ✓ GOOD: Expose as Observable
class BetterService {
private dataSubject = new Subject<string>();
// Read-only Observable
readonly data$ = this.dataSubject.asObservable();
// Controlled emission
updateData(value: string): void {
this.dataSubject.next(value);
}
}
const betterService = new BetterService();
// betterService.data$.next('hack'); // Error: not assignable
betterService.updateData('safe'); // ✓ OK
2. Complete Subjects When Done
import { Subject } from 'rxjs';
class Component {
private destroy$ = new Subject<void>();
ngOnInit() {
// Use subject
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete(); // Don't forget to complete!
}
}
3. Choose the Right Subject Type
Subject Selection Guide:
Subject - Events, no initial value needed
BehaviorSubject - State, always has current value
ReplaySubject - History, caching, late subscribers need past values
AsyncSubject - Final result, Promise-like behavior
4. Avoid Memory Leaks
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
class Component {
private destroy$ = new Subject<void>();
private data$ = new Subject<string>();
ngOnInit() {
this.data$
.pipe(takeUntil(this.destroy$))
.subscribe(data => console.log(data));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
5. Type Safety
import { Subject, BehaviorSubject } from 'rxjs';
interface User {
id: number;
name: string;
}
const userSubject = new BehaviorSubject<User | null>(null);
// TypeScript ensures type safety
userSubject.next({ id: 1, name: 'Alice' }); // ✓ OK
userSubject.next({ id: 2 }); // ✗ Error: missing 'name'
Common Patterns
Request/Response Pattern
import { Subject, ReplaySubject } from 'rxjs';
import { filter, first } from 'rxjs/operators';
interface Request {
id: string;
action: string;
}
interface Response {
id: string;
result: any;
}
class RequestHandler {
private requests$ = new Subject<Request>();
private responses$ = new Subject<Response>();
request(action: string): Promise<any> {
const id = Math.random().toString();
const response = this.responses$.pipe(
filter(r => r.id === id),
first()
).toPromise();
this.requests$.next({ id, action });
return response.then(r => r.result);
}
// Simulate processing
constructor() {
this.requests$.subscribe(req => {
setTimeout(() => {
this.responses$.next({
id: req.id,
result: `Result for ${req.action}`
});
}, 1000);
});
}
}
Loading State Management
import { BehaviorSubject, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';
class LoadingService {
private loaders = new Map<string, BehaviorSubject<boolean>>();
// Overall loading state
readonly loading$ = new BehaviorSubject<boolean>(false);
startLoading(key: string): void {
if (!this.loaders.has(key)) {
this.loaders.set(key, new BehaviorSubject<boolean>(false));
}
this.loaders.get(key)!.next(true);
this.updateGlobalState();
}
stopLoading(key: string): void {
this.loaders.get(key)?.next(false);
this.updateGlobalState();
}
private updateGlobalState(): void {
const isLoading = Array.from(this.loaders.values())
.some(loader => loader.value);
this.loading$.next(isLoading);
}
}
Subject vs Observable
| Feature | Observable | Subject |
|---|
| Unicast/Multicast | Unicast | Multicast |
| Can emit values | No | Yes (via next) |
| Hot/Cold | Cold | Hot |
| Execution | Per subscriber | Shared |
| Use case | Data streams | Event bus, state |
When to Use Subjects
Use Subjects when you need:
- Event bus pattern
- State management
- Multicasting to multiple subscribers
- Manual control over emissions
- Component communication
Avoid Subjects when:
- Regular Observable would work
- You don’t need multicasting
- Data source is already an Observable