Overview
A Subject is a special type of Observable that allows values to be multicasted to many Observers. Unlike regular Observables (which are unicast), Subjects maintain a list of Observers and notify all of them when values are emitted.
Every Subject is both an Observable (you can subscribe to it) and an Observer (you can call next, error, and complete on it).
Class Signature
class Subject < T > extends Observable < T > implements SubscriptionLike {
closed : boolean ;
observers : Observer < T >[];
next ( value : T ) : void ;
error ( err : any ) : void ;
complete () : void ;
unsubscribe () : void ;
asObservable () : Observable < T >;
}
Constructor
const subject = new Subject < T >();
No parameters required. Creates a new Subject that can emit values of type T.
Properties
Returns true if this Subject has been closed and is no longer accepting new values.
Array of current observers subscribed to this Subject.
Returns true if the Subject has at least one Observer.
Methods
Emit a new value to all subscribers.
Emit an error notification to all subscribers and close the Subject.
Emit a complete notification to all subscribers and close the Subject.
Close the Subject and remove all observers.
Create a new Observable that subscribes to this Subject. Useful for hiding Subject methods from consumers.
Usage Examples
Basic Subject
Multicasting values to multiple subscribers:
Simple Subject
As Observer
import { Subject } from 'rxjs' ;
const subject = new Subject < number >();
// Subscribe first observer
subject . subscribe ({
next : ( v ) => console . log ( `Observer A: ${ v } ` )
});
// Subscribe second observer
subject . subscribe ({
next : ( v ) => console . log ( `Observer B: ${ v } ` )
});
// Emit values
subject . next ( 1 );
subject . next ( 2 );
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
Event Bus Pattern
Implement a simple event bus:
import { Subject } from 'rxjs' ;
import { filter , map } from 'rxjs/operators' ;
interface AppEvent {
type : string ;
payload : any ;
}
class EventBus {
private eventBus = new Subject < AppEvent >();
emit ( event : AppEvent ) : void {
this . eventBus . next ( event );
}
on < T >( eventType : string ) {
return this . eventBus . pipe (
filter ( event => event . type === eventType ),
map ( event => event . payload as T )
);
}
}
const bus = new EventBus ();
// Subscribe to specific events
bus . on <{ username : string }>( 'user-login' ). subscribe ( payload => {
console . log ( `User logged in: ${ payload . username } ` );
});
bus . on <{ id : number }>( 'data-updated' ). subscribe ( payload => {
console . log ( `Data updated: ${ payload . id } ` );
});
// Emit events
bus . emit ({ type: 'user-login' , payload: { username: 'Alice' } });
bus . emit ({ type: 'data-updated' , payload: { id: 123 } });
Convert Unicast to Multicast
Share a single subscription among multiple observers:
import { Subject , interval } from 'rxjs' ;
import { take } from 'rxjs/operators' ;
const source$ = interval ( 1000 ). pipe ( take ( 3 ));
const subject = new Subject < number >();
// Multiple observers on Subject
subject . subscribe ( x => console . log ( `A: ${ x } ` ));
subject . subscribe ( x => console . log ( `B: ${ x } ` ));
// Single subscription to source
source$ . subscribe ( subject );
// Output:
// A: 0
// B: 0
// A: 1
// B: 1
// A: 2
// B: 2
Hide Subject Methods
Expose only Observable interface using asObservable():
import { Subject , Observable } from 'rxjs' ;
class DataService {
private dataSubject = new Subject < string >();
// Expose as Observable (read-only)
public data$ : Observable < string > = this . dataSubject . asObservable ();
// Internal method to update data
updateData ( newData : string ) : void {
this . dataSubject . next ( newData );
}
}
const service = new DataService ();
// Consumers can subscribe
service . data$ . subscribe ( data => console . log ( 'Data:' , data ));
// But cannot emit
// service.data$.next('foo'); // Error: Property 'next' does not exist
// Only service can emit
service . updateData ( 'Hello' ); // Works
Manage form state with Subjects:
import { Subject , combineLatest } from 'rxjs' ;
import { map , startWith } from 'rxjs/operators' ;
interface FormState {
username : string ;
email : string ;
isValid : boolean ;
}
class FormController {
private usernameSubject = new Subject < string >();
private emailSubject = new Subject < string >();
username$ = this . usernameSubject . asObservable ();
email$ = this . emailSubject . asObservable ();
formState$ = combineLatest ([
this . username$ . pipe ( startWith ( '' )),
this . email$ . pipe ( startWith ( '' ))
]). pipe (
map (([ username , email ]) => ({
username ,
email ,
isValid: username . length > 0 && email . includes ( '@' )
}))
);
setUsername ( username : string ) : void {
this . usernameSubject . next ( username );
}
setEmail ( email : string ) : void {
this . emailSubject . next ( email );
}
}
const form = new FormController ();
form . formState$ . subscribe (( state : FormState ) => {
console . log ( 'Form valid:' , state . isValid );
updateSubmitButton ( state . isValid );
});
form . setUsername ( 'alice' );
form . setEmail ( 'alice@example.com' );
Key Characteristics
Multicast : One execution shared among all subscribersHot : Emissions happen regardless of subscribersStateless : New subscribers don’t receive past values
Multicast vs Unicast
import { Subject , Observable } from 'rxjs' ;
// Unicast (regular Observable)
const unicast$ = new Observable ( subscriber => {
const value = Math . random ();
subscriber . next ( value );
});
unicast$ . subscribe ( x => console . log ( 'A:' , x )); // A: 0.123
unicast$ . subscribe ( x => console . log ( 'B:' , x )); // B: 0.456 (different!)
// Multicast (Subject)
const multicast = new Subject < number >();
multicast . subscribe ( x => console . log ( 'A:' , x ));
multicast . subscribe ( x => console . log ( 'B:' , x ));
multicast . next ( Math . random ());
// A: 0.789
// B: 0.789 (same value!)
Subject Lifecycle
Once a Subject has completed or errored, it cannot be reused. New subscriptions will immediately receive the terminal notification.
import { Subject } from 'rxjs' ;
const subject = new Subject < number >();
subject . subscribe ({
next : v => console . log ( 'Observer 1:' , v ),
complete : () => console . log ( 'Observer 1 complete' )
});
subject . next ( 1 );
subject . complete ();
// Subscribe after completion
subject . subscribe ({
next : v => console . log ( 'Observer 2:' , v ), // Never called
complete : () => console . log ( 'Observer 2 complete' ) // Called immediately
});
// Output:
// Observer 1: 1
// Observer 1 complete
// Observer 2 complete
Late Subscribers Miss Values
import { Subject } from 'rxjs' ;
const subject = new Subject < number >();
subject . next ( 1 ); // No subscribers yet - value is lost
subject . subscribe ( x => console . log ( 'Observer:' , x ));
subject . next ( 2 ); // Observer receives this
// Output:
// Observer: 2
// (1 was missed)
Common Patterns
Manual Cleanup
import { Subject } from 'rxjs' ;
const subject = new Subject < number >();
const sub1 = subject . subscribe ( x => console . log ( 'A:' , x ));
const sub2 = subject . subscribe ( x => console . log ( 'B:' , x ));
subject . next ( 1 );
// Unsubscribe individual observer
sub1 . unsubscribe ();
subject . next ( 2 ); // Only B receives this
// Unsubscribe all and close Subject
subject . unsubscribe ();
subject . next ( 3 ); // No effect - Subject is closed
Error Handling
import { Subject } from 'rxjs' ;
const subject = new Subject < number >();
subject . subscribe ({
next : v => console . log ( 'Received:' , v ),
error : err => console . error ( 'Error:' , err . message )
});
subject . next ( 1 );
subject . error ( new Error ( 'Something went wrong' ));
subject . next ( 2 ); // Never delivered - Subject is closed
// Output:
// Received: 1
// Error: Something went wrong
When to Use Subject
Event Buses : Application-wide event system
State Management : Manual state updates
Bridging : Connect callback APIs to RxJS
Multicasting : Share one source among many subscribers
Imperative Emissions : Need to call next() manually
Subject Variants
Choose the right Subject variant for your use case:
Subject : Basic multicast, no replay
BehaviorSubject : Stores current value, emits to new subscribers
ReplaySubject : Buffers N values, replays to new subscribers
AsyncSubject : Emits only last value on completion
See Also