Observables most commonly emit ordinary values like strings and numbers, but surprisingly often, it is necessary to handle Observables of Observables, so-called higher-order Observables.
What are Higher-Order Observables?
A higher-order Observable is an Observable that emits other Observables as its values. This pattern commonly occurs when working with asynchronous operations that themselves return Observables.
Just as a higher-order function is a function that returns a function, a higher-order Observable is an Observable that emits Observables.
Common Scenario
Imagine you have an Observable emitting URLs that you want to fetch:
import { of , map } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const urls$ = of (
'https://api.example.com/user/1' ,
'https://api.example.com/user/2' ,
'https://api.example.com/user/3'
);
const fileObservable$ = urls$ . pipe (
map ( url => ajax . getJSON ( url ))
);
// fileObservable$ is now Observable<Observable<any>>
// A higher-order Observable!
In this example, ajax.getJSON() returns an Observable for each URL. The result is an Observable that emits Observables - a higher-order Observable.
The Flattening Concept
The solution to working with higher-order Observables is flattening : converting a higher-order Observable into an ordinary Observable.
Flattening operators subscribe to the inner Observables and emit their values in the outer Observable stream. The key difference between flattening operators is how and when they subscribe to inner Observables.
Flattening Operators
concatMap - Sequential Processing
concatMap subscribes to each inner Observable one at a time , waiting for each to complete before moving to the next.
import { of , concatMap , delay } from 'rxjs' ;
const source$ = of ( 1 , 2 , 3 );
const result$ = source$ . pipe (
concatMap ( value =>
of ( `Request ${ value } ` ). pipe ( delay ( 1000 ))
)
);
result$ . subscribe ( console . log );
// After 1s: "Request 1"
// After 2s: "Request 2"
// After 3s: "Request 3"
Order matters
You need sequential execution
Each request depends on the previous one completing
Example: Multi-step form submissions, sequential file uploads
mergeMap - Concurrent Processing
mergeMap (also known as flatMap) subscribes to all inner Observables concurrently and emits values as they arrive.
Basic Usage
Parallel HTTP Requests
import { of , mergeMap , delay } from 'rxjs' ;
const source$ = of ( 1 , 2 , 3 );
const result$ = source$ . pipe (
mergeMap ( value =>
of ( `Request ${ value } ` ). pipe ( delay ( 1000 * value ))
)
);
result$ . subscribe ( console . log );
// After 1s: "Request 1"
// After 2s: "Request 2"
// After 3s: "Request 3"
// All started simultaneously!
Order doesn’t matter
You want maximum concurrency (with optional limit)
Independent parallel operations
Example: Loading multiple resources, batch API calls, real-time data streams
switchMap - Cancellation Strategy
switchMap subscribes to the latest inner Observable and cancels the previous one when a new inner Observable arrives.
Search Autocomplete
Navigation
import { fromEvent , switchMap , debounceTime , distinctUntilChanged } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const searchBox = document . getElementById ( 'search' );
const search$ = fromEvent ( searchBox , 'input' );
search$ . pipe (
debounceTime ( 300 ),
distinctUntilChanged (),
switchMap ( event =>
ajax . getJSON ( `https://api.example.com/search?q= ${ event . target . value } ` )
)
). subscribe ( results => displayResults ( results ));
// Only the latest search request is kept
// Previous requests are automatically cancelled
Only the latest result matters
You want to cancel previous operations
Prevents race conditions
Example: Search autocomplete, navigation, real-time updates, cancellable requests
exhaustMap - Ignore While Busy
exhaustMap subscribes to the first inner Observable and ignores new values until the current inner Observable completes.
Button Clicks
Login Button
import { fromEvent , exhaustMap , delay , of } from 'rxjs' ;
const button = document . getElementById ( 'save-button' );
const clicks$ = fromEvent ( button , 'click' );
clicks$ . pipe (
exhaustMap (() =>
saveData (). pipe (
delay ( 2000 ) // Simulate slow network
)
)
). subscribe (() => console . log ( 'Saved!' ));
// Rapid clicks are ignored while saving
// Prevents duplicate submissions
Prevent duplicate operations
Ignore rapid triggers during processing
Rate limiting user actions
Example: Form submissions, login buttons, save operations, refresh buttons
Comparison Matrix
Choose the right flattening operator based on your concurrency and cancellation needs.
Operator Concurrency Order Preserved Cancellation Use Case concatMapSequential (1 at a time) ✅ Yes ❌ No Sequential operations mergeMapParallel (configurable) ❌ No ❌ No Independent parallel tasks switchMapLatest only ❌ No ✅ Yes (previous) Autocomplete, navigation exhaustMapFirst until complete ✅ Yes ❌ No (ignores new) Prevent duplicate actions
Advanced Patterns
Nested API Calls
import { of , switchMap , mergeMap } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
// Get user, then get their posts
const userId$ = of ( 1 );
userId$ . pipe (
// Switch to latest user
switchMap ( userId =>
ajax . getJSON ( `https://api.example.com/user/ ${ userId } ` )
),
// Load all posts for that user concurrently
mergeMap ( user =>
ajax . getJSON ( `https://api.example.com/posts?userId= ${ user . id } ` )
)
). subscribe ( posts => console . log ( posts ));
Error Handling in Higher-Order Observables
Errors in inner Observables will propagate to the outer Observable and terminate the stream unless handled.
import { of , mergeMap , catchError } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const userIds$ = of ( 1 , 2 , 999 , 4 ); // 999 might fail
userIds$ . pipe (
mergeMap ( id =>
ajax . getJSON ( `https://api.example.com/user/ ${ id } ` ). pipe (
catchError ( error => {
console . error ( `Failed to load user ${ id } :` , error );
return of ( null ); // Return fallback value
})
)
)
). subscribe ( user => {
if ( user ) {
console . log ( 'User loaded:' , user );
}
});
// Stream continues even if some requests fail
Combining Multiple Higher-Order Streams
import { merge , switchMap } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const searchQuery$ = getSearchQueryStream ();
const filterChange$ = getFilterChangeStream ();
// React to either search or filter changes
merge ( searchQuery$ , filterChange$ ). pipe (
switchMap ( params =>
ajax . getJSON ( `https://api.example.com/search` , params )
)
). subscribe ( results => displayResults ( results ));
Visual Guide
// Source Observable
const source$ = --- 1 --- 2 --- 3 ---|
// Each number triggers an inner Observable that takes time
const innerObs = ( x ) => ---- x ----|
// concatMap: Wait for each to complete
result$ = --- 1 --------- 2 --------- 3 ---------|
// mergeMap: Subscribe to all immediately
result$ = ------- 1 --- 2 --- 3 ---|
// switchMap: Cancel previous, keep latest
result$ = --------------- 3 ---|
// exhaustMap: Ignore while busy
result$ = ------- 1 ---------------|
Best Practices
Choose the Right Operator
Understand the behavior differences and select based on your requirements:
Need order? Use concatMap
Need speed? Use mergeMap
Need latest? Use switchMap
Prevent duplicates? Use exhaustMap
Handle Errors Properly
Always handle errors in inner Observables to prevent stream termination: source$ . pipe (
mergeMap ( value =>
innerObservable ( value ). pipe (
catchError ( err => of ( defaultValue ))
)
)
)
Limit Concurrency
When using mergeMap, consider limiting concurrent subscriptions: source$ . pipe (
mergeMap ( value => httpRequest ( value ), 3 ) // Max 3 concurrent
)
Consider Memory
Be mindful of buffering in concatMap - queued inner Observables consume memory.