Skip to main content

Overview

A Subject is a special type of Observable that allows values to be multicasted to many Observers. Unlike regular Observables (which are unicast), Subjects maintain a list of Observers and notify all of them when values are emitted.
Every Subject is both an Observable (you can subscribe to it) and an Observer (you can call next, error, and complete on it).

Class Signature

class Subject<T> extends Observable<T> implements SubscriptionLike {
  closed: boolean;
  observers: Observer<T>[];
  
  next(value: T): void;
  error(err: any): void;
  complete(): void;
  unsubscribe(): void;
  asObservable(): Observable<T>;
}

Constructor

const subject = new Subject<T>();
No parameters required. Creates a new Subject that can emit values of type T.

Properties

closed
boolean
Returns true if this Subject has been closed and is no longer accepting new values.
observers
Observer<T>[]
Array of current observers subscribed to this Subject.
observed
boolean
Returns true if the Subject has at least one Observer.

Methods

next
(value: T) => void
Emit a new value to all subscribers.
error
(err: any) => void
Emit an error notification to all subscribers and close the Subject.
complete
() => void
Emit a complete notification to all subscribers and close the Subject.
unsubscribe
() => void
Close the Subject and remove all observers.
asObservable
() => Observable<T>
Create a new Observable that subscribes to this Subject. Useful for hiding Subject methods from consumers.

Usage Examples

Basic Subject

Multicasting values to multiple subscribers:
import { Subject } from 'rxjs';

const subject = new Subject<number>();

// Subscribe first observer
subject.subscribe({
  next: (v) => console.log(`Observer A: ${v}`)
});

// Subscribe second observer
subject.subscribe({
  next: (v) => console.log(`Observer B: ${v}`)
});

// Emit values
subject.next(1);
subject.next(2);

// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2

Event Bus Pattern

Implement a simple event bus:
import { Subject } from 'rxjs';
import { filter, map } from 'rxjs/operators';

interface AppEvent {
  type: string;
  payload: any;
}

class EventBus {
  private eventBus = new Subject<AppEvent>();
  
  emit(event: AppEvent): void {
    this.eventBus.next(event);
  }
  
  on<T>(eventType: string) {
    return this.eventBus.pipe(
      filter(event => event.type === eventType),
      map(event => event.payload as T)
    );
  }
}

const bus = new EventBus();

// Subscribe to specific events
bus.on<{ username: string }>('user-login').subscribe(payload => {
  console.log(`User logged in: ${payload.username}`);
});

bus.on<{ id: number }>('data-updated').subscribe(payload => {
  console.log(`Data updated: ${payload.id}`);
});

// Emit events
bus.emit({ type: 'user-login', payload: { username: 'Alice' } });
bus.emit({ type: 'data-updated', payload: { id: 123 } });

Convert Unicast to Multicast

Share a single subscription among multiple observers:
import { Subject, interval } from 'rxjs';
import { take } from 'rxjs/operators';

const source$ = interval(1000).pipe(take(3));
const subject = new Subject<number>();

// Multiple observers on Subject
subject.subscribe(x => console.log(`A: ${x}`));
subject.subscribe(x => console.log(`B: ${x}`));

// Single subscription to source
source$.subscribe(subject);

// Output:
// A: 0
// B: 0
// A: 1
// B: 1
// A: 2
// B: 2

Hide Subject Methods

Expose only Observable interface using asObservable():
import { Subject, Observable } from 'rxjs';

class DataService {
  private dataSubject = new Subject<string>();
  
  // Expose as Observable (read-only)
  public data$: Observable<string> = this.dataSubject.asObservable();
  
  // Internal method to update data
  updateData(newData: string): void {
    this.dataSubject.next(newData);
  }
}

const service = new DataService();

// Consumers can subscribe
service.data$.subscribe(data => console.log('Data:', data));

// But cannot emit
// service.data$.next('foo'); // Error: Property 'next' does not exist

// Only service can emit
service.updateData('Hello'); // Works

Form State Management

Manage form state with Subjects:
import { Subject, combineLatest } from 'rxjs';
import { map, startWith } from 'rxjs/operators';

interface FormState {
  username: string;
  email: string;
  isValid: boolean;
}

class FormController {
  private usernameSubject = new Subject<string>();
  private emailSubject = new Subject<string>();
  
  username$ = this.usernameSubject.asObservable();
  email$ = this.emailSubject.asObservable();
  
  formState$ = combineLatest([
    this.username$.pipe(startWith('')),
    this.email$.pipe(startWith(''))
  ]).pipe(
    map(([username, email]) => ({
      username,
      email,
      isValid: username.length > 0 && email.includes('@')
    }))
  );
  
  setUsername(username: string): void {
    this.usernameSubject.next(username);
  }
  
  setEmail(email: string): void {
    this.emailSubject.next(email);
  }
}

const form = new FormController();

form.formState$.subscribe((state: FormState) => {
  console.log('Form valid:', state.isValid);
  updateSubmitButton(state.isValid);
});

form.setUsername('alice');
form.setEmail('alice@example.com');

Key Characteristics

Multicast: One execution shared among all subscribersHot: Emissions happen regardless of subscribersStateless: New subscribers don’t receive past values

Multicast vs Unicast

import { Subject, Observable } from 'rxjs';

// Unicast (regular Observable)
const unicast$ = new Observable(subscriber => {
  const value = Math.random();
  subscriber.next(value);
});

unicast$.subscribe(x => console.log('A:', x)); // A: 0.123
unicast$.subscribe(x => console.log('B:', x)); // B: 0.456 (different!)

// Multicast (Subject)
const multicast = new Subject<number>();
multicast.subscribe(x => console.log('A:', x));
multicast.subscribe(x => console.log('B:', x));

multicast.next(Math.random()); 
// A: 0.789
// B: 0.789 (same value!)

Subject Lifecycle

Once a Subject has completed or errored, it cannot be reused. New subscriptions will immediately receive the terminal notification.
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: v => console.log('Observer 1:', v),
  complete: () => console.log('Observer 1 complete')
});

subject.next(1);
subject.complete();

// Subscribe after completion
subject.subscribe({
  next: v => console.log('Observer 2:', v), // Never called
  complete: () => console.log('Observer 2 complete') // Called immediately
});

// Output:
// Observer 1: 1
// Observer 1 complete
// Observer 2 complete

Late Subscribers Miss Values

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.next(1); // No subscribers yet - value is lost

subject.subscribe(x => console.log('Observer:', x));

subject.next(2); // Observer receives this

// Output:
// Observer: 2
// (1 was missed)

Common Patterns

Manual Cleanup

import { Subject } from 'rxjs';

const subject = new Subject<number>();
const sub1 = subject.subscribe(x => console.log('A:', x));
const sub2 = subject.subscribe(x => console.log('B:', x));

subject.next(1);

// Unsubscribe individual observer
sub1.unsubscribe();

subject.next(2); // Only B receives this

// Unsubscribe all and close Subject
subject.unsubscribe();

subject.next(3); // No effect - Subject is closed

Error Handling

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: v => console.log('Received:', v),
  error: err => console.error('Error:', err.message)
});

subject.next(1);
subject.error(new Error('Something went wrong'));
subject.next(2); // Never delivered - Subject is closed

// Output:
// Received: 1
// Error: Something went wrong

When to Use Subject

  1. Event Buses: Application-wide event system
  2. State Management: Manual state updates
  3. Bridging: Connect callback APIs to RxJS
  4. Multicasting: Share one source among many subscribers
  5. Imperative Emissions: Need to call next() manually

Subject Variants

Choose the right Subject variant for your use case:
  • Subject: Basic multicast, no replay
  • BehaviorSubject: Stores current value, emits to new subscribers
  • ReplaySubject: Buffers N values, replays to new subscribers
  • AsyncSubject: Emits only last value on completion

See Also