Skip to main content
Observables most commonly emit ordinary values like strings and numbers, but surprisingly often, it is necessary to handle Observables of Observables, so-called higher-order Observables.

What are Higher-Order Observables?

A higher-order Observable is an Observable that emits other Observables as its values. This pattern commonly occurs when working with asynchronous operations that themselves return Observables.
Just as a higher-order function is a function that returns a function, a higher-order Observable is an Observable that emits Observables.

Common Scenario

Imagine you have an Observable emitting URLs that you want to fetch:
import { of, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const urls$ = of(
  'https://api.example.com/user/1',
  'https://api.example.com/user/2',
  'https://api.example.com/user/3'
);

const fileObservable$ = urls$.pipe(
  map(url => ajax.getJSON(url))
);

// fileObservable$ is now Observable<Observable<any>>
// A higher-order Observable!
In this example, ajax.getJSON() returns an Observable for each URL. The result is an Observable that emits Observables - a higher-order Observable.

The Flattening Concept

The solution to working with higher-order Observables is flattening: converting a higher-order Observable into an ordinary Observable.
Flattening operators subscribe to the inner Observables and emit their values in the outer Observable stream. The key difference between flattening operators is how and when they subscribe to inner Observables.

Flattening Operators

concatMap - Sequential Processing

concatMap subscribes to each inner Observable one at a time, waiting for each to complete before moving to the next.
import { of, concatMap, delay } from 'rxjs';

const source$ = of(1, 2, 3);

const result$ = source$.pipe(
  concatMap(value => 
    of(`Request ${value}`).pipe(delay(1000))
  )
);

result$.subscribe(console.log);
// After 1s: "Request 1"
// After 2s: "Request 2"
// After 3s: "Request 3"
  • Order matters
  • You need sequential execution
  • Each request depends on the previous one completing
  • Example: Multi-step form submissions, sequential file uploads

mergeMap - Concurrent Processing

mergeMap (also known as flatMap) subscribes to all inner Observables concurrently and emits values as they arrive.
import { of, mergeMap, delay } from 'rxjs';

const source$ = of(1, 2, 3);

const result$ = source$.pipe(
  mergeMap(value => 
    of(`Request ${value}`).pipe(delay(1000 * value))
  )
);

result$.subscribe(console.log);
// After 1s: "Request 1"
// After 2s: "Request 2"
// After 3s: "Request 3"
// All started simultaneously!
  • Order doesn’t matter
  • You want maximum concurrency (with optional limit)
  • Independent parallel operations
  • Example: Loading multiple resources, batch API calls, real-time data streams

switchMap - Cancellation Strategy

switchMap subscribes to the latest inner Observable and cancels the previous one when a new inner Observable arrives.
import { fromEvent, switchMap, debounceTime, distinctUntilChanged } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const searchBox = document.getElementById('search');
const search$ = fromEvent(searchBox, 'input');

search$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(event => 
    ajax.getJSON(`https://api.example.com/search?q=${event.target.value}`)
  )
).subscribe(results => displayResults(results));

// Only the latest search request is kept
// Previous requests are automatically cancelled
  • Only the latest result matters
  • You want to cancel previous operations
  • Prevents race conditions
  • Example: Search autocomplete, navigation, real-time updates, cancellable requests

exhaustMap - Ignore While Busy

exhaustMap subscribes to the first inner Observable and ignores new values until the current inner Observable completes.
import { fromEvent, exhaustMap, delay, of } from 'rxjs';

const button = document.getElementById('save-button');
const clicks$ = fromEvent(button, 'click');

clicks$.pipe(
  exhaustMap(() => 
    saveData().pipe(
      delay(2000) // Simulate slow network
    )
  )
).subscribe(() => console.log('Saved!'));

// Rapid clicks are ignored while saving
// Prevents duplicate submissions
  • Prevent duplicate operations
  • Ignore rapid triggers during processing
  • Rate limiting user actions
  • Example: Form submissions, login buttons, save operations, refresh buttons

Comparison Matrix

Choose the right flattening operator based on your concurrency and cancellation needs.
OperatorConcurrencyOrder PreservedCancellationUse Case
concatMapSequential (1 at a time)✅ Yes❌ NoSequential operations
mergeMapParallel (configurable)❌ No❌ NoIndependent parallel tasks
switchMapLatest only❌ No✅ Yes (previous)Autocomplete, navigation
exhaustMapFirst until complete✅ Yes❌ No (ignores new)Prevent duplicate actions

Advanced Patterns

Nested API Calls

import { of, switchMap, mergeMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

// Get user, then get their posts
const userId$ = of(1);

userId$.pipe(
  // Switch to latest user
  switchMap(userId => 
    ajax.getJSON(`https://api.example.com/user/${userId}`)
  ),
  // Load all posts for that user concurrently
  mergeMap(user => 
    ajax.getJSON(`https://api.example.com/posts?userId=${user.id}`)
  )
).subscribe(posts => console.log(posts));

Error Handling in Higher-Order Observables

Errors in inner Observables will propagate to the outer Observable and terminate the stream unless handled.
import { of, mergeMap, catchError } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const userIds$ = of(1, 2, 999, 4); // 999 might fail

userIds$.pipe(
  mergeMap(id => 
    ajax.getJSON(`https://api.example.com/user/${id}`).pipe(
      catchError(error => {
        console.error(`Failed to load user ${id}:`, error);
        return of(null); // Return fallback value
      })
    )
  )
).subscribe(user => {
  if (user) {
    console.log('User loaded:', user);
  }
});

// Stream continues even if some requests fail

Combining Multiple Higher-Order Streams

import { merge, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const searchQuery$ = getSearchQueryStream();
const filterChange$ = getFilterChangeStream();

// React to either search or filter changes
merge(searchQuery$, filterChange$).pipe(
  switchMap(params => 
    ajax.getJSON(`https://api.example.com/search`, params)
  )
).subscribe(results => displayResults(results));

Visual Guide

// Source Observable
const source$ = ---1---2---3---|

// Each number triggers an inner Observable that takes time
const innerObs = (x) => ----x----|

// concatMap: Wait for each to complete
result$ = ---1---------2---------3---------|

// mergeMap: Subscribe to all immediately
result$ = -------1---2---3---|

// switchMap: Cancel previous, keep latest
result$ = ---------------3---|

// exhaustMap: Ignore while busy
result$ = -------1---------------|

Best Practices

1

Choose the Right Operator

Understand the behavior differences and select based on your requirements:
  • Need order? Use concatMap
  • Need speed? Use mergeMap
  • Need latest? Use switchMap
  • Prevent duplicates? Use exhaustMap
2

Handle Errors Properly

Always handle errors in inner Observables to prevent stream termination:
source$.pipe(
  mergeMap(value => 
    innerObservable(value).pipe(
      catchError(err => of(defaultValue))
    )
  )
)
3

Limit Concurrency

When using mergeMap, consider limiting concurrent subscriptions:
source$.pipe(
  mergeMap(value => httpRequest(value), 3) // Max 3 concurrent
)
4

Consider Memory

Be mindful of buffering in concatMap - queued inner Observables consume memory.