An Observer is a consumer of values delivered by an Observable. It’s a collection of callbacks that know how to handle the different types of notifications sent by an Observable.
What is an Observer?
Observers are simply objects with three callbacks - one for each type of notification that an Observable can deliver:
next: Called when the Observable emits a value
error: Called when the Observable encounters an error
complete: Called when the Observable completes successfully
Think of an Observer as a listener with three different event handlers - one for data, one for errors, and one for completion.
Type Signature
interface Observer < T > {
next : ( value : T ) => void ;
error : ( err : any ) => void ;
complete : () => void ;
}
// Partial observers are also valid
type PartialObserver < T > =
| { next : ( value : T ) => void ; error ?: ( err : any ) => void ; complete ?: () => void ; }
| { next ?: ( value : T ) => void ; error : ( err : any ) => void ; complete ?: () => void ; }
| { next ?: ( value : T ) => void ; error ?: ( err : any ) => void ; complete : () => void ; }
Creating Observers
Full Observer Object
import { of } from 'rxjs' ;
const observer = {
next : ( x : number ) => console . log ( 'Observer got a next value: ' + x ),
error : ( err : any ) => console . error ( 'Observer got an error: ' + err ),
complete : () => console . log ( 'Observer got a complete notification' ),
};
of ( 1 , 2 , 3 ). subscribe ( observer );
// Output:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification
Partial Observers
Next Only
Next and Error
All Three
import { interval } from 'rxjs' ;
import { take } from 'rxjs/operators' ;
// Only handle next values
interval ( 1000 )
. pipe ( take ( 3 ))
. subscribe ({
next : x => console . log ( 'Value:' , x )
});
// Output:
// Value: 0
// Value: 1
// Value: 2
import { throwError , of } from 'rxjs' ;
import { catchError } from 'rxjs/operators' ;
// Handle values and errors, ignore completion
throwError (() => new Error ( 'Oops!' ))
. subscribe ({
next : x => console . log ( 'Value:' , x ),
error : err => console . error ( 'Error:' , err . message )
});
// Output:
// Error: Oops!
import { of } from 'rxjs' ;
of ( 1 , 2 , 3 ). subscribe ({
next : x => console . log ( 'Value:' , x ),
error : err => console . error ( 'Error:' , err ),
complete : () => console . log ( 'Complete!' )
});
// Output:
// Value: 1
// Value: 2
// Value: 3
// Complete!
If you don’t provide a callback for a notification type, the execution will still happen normally, but that notification will be ignored.
Function Shorthand
For simple cases where you only care about values, you can pass a function directly:
import { of } from 'rxjs' ;
// Instead of: subscribe({ next: x => console.log(x) })
of ( 1 , 2 , 3 ). subscribe ( x => console . log ( x ));
// Internally, RxJS creates an Observer object:
// { next: x => console.log(x) }
Observer Callbacks
next(value)
Called whenever the Observable emits a value.
import { interval } from 'rxjs' ;
import { take } from 'rxjs/operators' ;
interval ( 1000 )
. pipe ( take ( 3 ))
. subscribe ({
next : value => {
console . log ( 'Received:' , value );
// Process the value
}
});
The next callback can be called zero to infinite times, according to the Observable contract.
error(err)
Called when the Observable encounters an error. This terminates the Observable execution.
import { throwError } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
import { catchError } from 'rxjs/operators' ;
ajax ( '/api/data' )
. pipe (
catchError ( err => throwError (() => new Error ( 'Failed to load data' )))
)
. subscribe ({
next : data => console . log ( 'Data:' , data ),
error : err => {
console . error ( 'Error occurred:' , err . message );
// Handle error (show notification, retry, etc.)
}
});
After error is called, no more notifications will be delivered. The complete callback will NOT be called.
complete()
Called when the Observable completes successfully. This terminates the Observable execution.
import { of } from 'rxjs' ;
of ( 1 , 2 , 3 ). subscribe ({
next : x => console . log ( 'Value:' , x ),
complete : () => {
console . log ( 'Stream completed' );
// Perform cleanup or trigger next action
}
});
// Output:
// Value: 1
// Value: 2
// Value: 3
// Stream completed
After complete is called, no more notifications will be delivered. The error callback will NOT be called.
Observer Execution Rules
Observers follow the Observable Contract :
This means:
next can be called 0 to infinite times
Either error or complete can be called once (but not both)
After error or complete, no further notifications are delivered
Valid
Valid with Error
Invalid
Invalid
subscriber . next ( 1 );
subscriber . next ( 2 );
subscriber . next ( 3 );
subscriber . complete ();
// ✓ Valid: multiple next, then complete
Practical Examples
HTTP Request Observer
import { ajax } from 'rxjs/ajax' ;
ajax ( '/api/users' ). subscribe ({
next : response => {
console . log ( 'Users loaded:' , response . response );
// Update UI with user data
},
error : err => {
console . error ( 'Failed to load users:' , err );
// Show error message to user
},
complete : () => {
console . log ( 'Request completed' );
// Hide loading indicator
}
});
import { fromEvent } from 'rxjs' ;
import { map , debounceTime } from 'rxjs/operators' ;
const input = document . querySelector ( '#search-input' );
const search$ = fromEvent ( input , 'input' ). pipe (
map ( event => ( event . target as HTMLInputElement ). value ),
debounceTime ( 300 )
);
search$ . subscribe ({
next : searchTerm => {
console . log ( 'Search for:' , searchTerm );
// Perform search with debounced input
},
error : err => {
console . error ( 'Search error:' , err );
}
});
WebSocket Observer
import { webSocket } from 'rxjs/webSocket' ;
const socket$ = webSocket ( 'ws://localhost:8080' );
socket$ . subscribe ({
next : msg => {
console . log ( 'Message received:' , msg );
// Handle incoming message
},
error : err => {
console . error ( 'WebSocket error:' , err );
// Attempt reconnection
},
complete : () => {
console . log ( 'WebSocket connection closed' );
// Clean up resources
}
});
State Management Observer
import { BehaviorSubject } from 'rxjs' ;
interface AppState {
user : { name : string ; id : number } | null ;
isLoading : boolean ;
}
const state$ = new BehaviorSubject < AppState >({
user: null ,
isLoading: false
});
// Component subscribes to state
state$ . subscribe ({
next : state => {
console . log ( 'State updated:' , state );
// Re-render UI with new state
}
});
// Update state
state$ . next ({ user: { name: 'Alice' , id: 1 }, isLoading: false });
Best Practices
1. Always Handle Errors
Unhandled errors in Observables will crash your application. Always provide an error handler.
// ✗ Bad: No error handling
http . get ( '/api/data' ). subscribe ( data => console . log ( data ));
// ✓ Good: Error handling
http . get ( '/api/data' ). subscribe ({
next : data => console . log ( data ),
error : err => console . error ( 'Failed:' , err )
});
2. Keep Observers Simple
// ✗ Bad: Complex logic in observer
observable$ . subscribe ({
next : data => {
// 100 lines of transformation and business logic
}
});
// ✓ Good: Use operators for transformations
observable$ . pipe (
map ( transform ),
filter ( validate ),
tap ( logData )
). subscribe ({
next : data => updateUI ( data )
});
3. Avoid Side Effects in next
Use the tap operator for side effects instead of putting them in the observer.
// ✗ Discouraged: Side effects in subscribe
data$ . subscribe ({
next : data => {
console . log ( 'Data:' , data );
saveToLocalStorage ( data );
updateMetrics ();
}
});
// ✓ Better: Side effects in pipeline
data$ . pipe (
tap ( data => console . log ( 'Data:' , data )),
tap ( saveToLocalStorage ),
tap ( updateMetrics )
). subscribe ();
4. Use TypeScript Types
import { Observer } from 'rxjs' ;
interface User {
id : number ;
name : string ;
}
const userObserver : Observer < User > = {
next : ( user : User ) => console . log ( 'User:' , user . name ),
error : ( err : Error ) => console . error ( 'Error:' , err . message ),
complete : () => console . log ( 'Done' )
};
users$ . subscribe ( userObserver );
Common Patterns
Conditional Completion Handling
let completedNormally = false ;
observable$ . subscribe ({
next : data => processData ( data ),
error : err => handleError ( err ),
complete : () => {
completedNormally = true ;
finalizeOperation ();
}
});
Multiple Observers
import { share } from 'rxjs/operators' ;
const shared$ = expensive$ . pipe ( share ());
// Observer 1: Update UI
shared$ . subscribe ({
next : data => updateUI ( data )
});
// Observer 2: Log to analytics
shared$ . subscribe ({
next : data => trackAnalytics ( data )
});
// Observer 3: Cache data
shared$ . subscribe ({
next : data => cacheData ( data )
});
When to Use Different Observer Styles
Style Use When Full Object You need to handle multiple notification types Partial Object You only care about specific notifications Function Shorthand You only need to process values (next)
Observable - The data source that Observers consume
Subscription - Manages the connection between Observable and Observer
Operators - Transform data before it reaches the Observer
Subject - Acts as both Observable and Observer