Overview
Projects each source value to an Observable (the “inner” Observable) and merges all resulting Observables concurrently. Also known as flatMap, it’s one of the most powerful operators for handling asynchronous operations.
mergeMap is ideal when you need to execute multiple asynchronous operations in parallel and you care about all their results.
Type Signature
function mergeMap < T , O extends ObservableInput < any >>(
project : ( value : T , index : number ) => O ,
concurrent : number = Infinity
) : OperatorFunction < T , ObservedValueOf < O >>
Parameters
project
(value: T, index: number) => O
required
A function that maps each source value to an Observable (or Promise, Array, etc.). The function receives:
value: The emitted value from the source
index: The zero-based index of the emission
Must return an ObservableInput that will be merged into the output.
Maximum number of inner Observables being subscribed to concurrently. If omitted or Infinity, all inner Observables are subscribed to simultaneously. If set to 1, behaves like concatMap.
Returns
return
OperatorFunction<T, ObservedValueOf<O>>
A function that returns an Observable that emits values from all projected inner Observables merged together.
Usage Examples
Basic Example: Parallel API Calls
Simple Merge
HTTP Requests
import { of , mergeMap , interval , map } from 'rxjs' ;
const letters = of ( 'a' , 'b' , 'c' );
const result = letters . pipe (
mergeMap ( x => interval ( 1000 ). pipe (
map ( i => x + i ),
take ( 3 )
))
);
result . subscribe ( x => console . log ( x ));
// All intervals run concurrently:
// a0, b0, c0 (at ~1s)
// a1, b1, c1 (at ~2s)
// a2, b2, c2 (at ~3s)
Controlled Concurrency
import { from , mergeMap , delay , of } from 'rxjs' ;
const urls = [
'/api/data/1' ,
'/api/data/2' ,
'/api/data/3' ,
'/api/data/4' ,
'/api/data/5' ,
'/api/data/6'
];
// Only 2 concurrent requests at a time
from ( urls ). pipe (
mergeMap (
url => {
console . log ( 'Fetching:' , url );
return from (
fetch ( url ). then ( res => res . json ())
);
},
2 // Concurrency limit
)
). subscribe (
data => console . log ( 'Data received:' , data ),
err => console . error ( 'Error:' , err ),
() => console . log ( 'All requests complete' )
);
// At t=0: Start request 1 and 2
// When 1 completes: Start request 3
// When 2 completes: Start request 4
// etc.
With Promises
import { fromEvent , mergeMap , from } from 'rxjs' ;
interface SearchResult {
query : string ;
results : any [];
}
const searchInput = document . getElementById ( 'search' ) as HTMLInputElement ;
const searches = fromEvent ( searchInput , 'input' );
searches . pipe (
debounceTime ( 300 ),
map ( event => ( event . target as HTMLInputElement ). value ),
filter ( query => query . length > 2 ),
mergeMap ( query =>
from (
fetch ( `/api/search?q= ${ encodeURIComponent ( query ) } ` )
. then ( res => res . json ())
). pipe (
map ( results => ({ query , results }))
)
)
). subscribe (({ query , results }) => {
console . log ( `Results for " ${ query } ":` , results );
displayResults ( results );
});
Marble Diagram
Source: --1-------2-------3-------|
Project(1): a-b-c|
Project(2): d-e-f|
Project(3): g-h-i|
Result: --a-b-c---d-e-f---g-h-i---|
(all merged concurrently)
With Concurrency = 2
Source: --1--2--3--4--5--|
Limit: 2 concurrent at a time
Project(1): a-b|
Project(2): c-d|
Project(3): (waits)e-f|
Project(4): (waits)g-h|
Project(5): (waits)i-j|
Result: --a-bc-de-fg-hi-j--|
Common Use Cases
Parallel API Calls : Fetch multiple resources concurrently
Database Queries : Execute multiple queries in parallel
File Operations : Read/write multiple files simultaneously
Search Autocomplete : Handle multiple rapid search requests
Batch Processing : Process multiple items with async operations
Real-time Updates : Subscribe to multiple real-time data streams
Without a concurrency limit, mergeMap can create many concurrent subscriptions with fast sources, potentially overwhelming the system. Use the concurrent parameter to control this.
Advanced Example: Parallel File Processing
import { from , mergeMap , map , catchError , of , tap } from 'rxjs' ;
interface FileMetadata {
path : string ;
size : number ;
type : string ;
}
interface ProcessedFile {
path : string ;
success : boolean ;
result ?: any ;
error ?: string ;
duration : number ;
}
const files : FileMetadata [] = [
{ path: '/data/file1.json' , size: 1024 , type: 'json' },
{ path: '/data/file2.csv' , size: 2048 , type: 'csv' },
{ path: '/data/file3.xml' , size: 512 , type: 'xml' },
// ... many more files
];
function processFile ( file : FileMetadata ) : Promise < any > {
return fetch ( file . path )
. then ( res => res . text ())
. then ( content => {
// Process based on type
switch ( file . type ) {
case 'json' : return JSON . parse ( content );
case 'csv' : return parseCSV ( content );
case 'xml' : return parseXML ( content );
default : return content ;
}
});
}
from ( files ). pipe (
mergeMap (
file => {
const startTime = Date . now ();
console . log ( `Processing: ${ file . path } ` );
return from ( processFile ( file )). pipe (
map ( result => ({
path: file . path ,
success: true ,
result ,
duration: Date . now () - startTime
} as ProcessedFile )),
catchError ( error => {
console . error ( `Failed: ${ file . path } ` , error );
return of ({
path: file . path ,
success: false ,
error: error . message ,
duration: Date . now () - startTime
} as ProcessedFile );
})
);
},
5 // Process 5 files concurrently
),
tap ( result => {
if ( result . success ) {
console . log ( `✓ ${ result . path } ( ${ result . duration } ms)` );
} else {
console . log ( `✗ ${ result . path } : ${ result . error } ` );
}
}),
// Collect all results
toArray ()
). subscribe (
results => {
const successful = results . filter ( r => r . success ). length ;
const failed = results . filter ( r => ! r . success ). length ;
console . log ( `Complete: ${ successful } succeeded, ${ failed } failed` );
}
);
Dynamic Concurrency Based on System Load
import { from , mergeMap , defer } from 'rxjs' ;
let currentLoad = 0 ;
const maxLoad = 10 ;
function getConcurrency () : number {
const cpuUsage = getCurrentCPUUsage ();
if ( cpuUsage > 80 ) return 2 ;
if ( cpuUsage > 60 ) return 4 ;
return 8 ;
}
from ( tasks ). pipe (
mergeMap (
task => defer (() => {
currentLoad ++ ;
console . log ( `Current load: ${ currentLoad } ` );
return from ( processTask ( task )). pipe (
finalize (() => {
currentLoad -- ;
})
);
}),
getConcurrency ()
)
). subscribe ( result => {
console . log ( 'Task complete:' , result );
});
Comparison with Other Flattening Operators
mergeMap (Parallel)
concatMap (Sequential)
switchMap (Cancel Previous)
exhaustMap (Ignore New)
import { of , mergeMap , delay } from 'rxjs' ;
of ( 1 , 2 , 3 ). pipe (
mergeMap ( x => of ( x ). pipe ( delay ( 1000 )))
). subscribe ( x => console . log ( x ));
// All 3 execute in parallel
// After ~1s: 1, 2, 3 (order may vary)
Use mergeMap when you want all operations to execute and you need all results. Use concatMap when order matters. Use switchMap when only the latest matters. Use exhaustMap when you want to ignore rapid triggers.
concatMap - Sequential flattening (concurrency = 1)
switchMap - Cancel previous on new emission
exhaustMap - Ignore new while inner is active
mergeAll - Flatten higher-order Observable
merge - Merge multiple Observables
forkJoin - Wait for all to complete, emit last values