Overview
mergeAll flattens an Observable-of-Observables by subscribing to inner Observables as they arrive and merging their emissions into a single output stream. Unlike concatAll, it can subscribe to multiple inner Observables concurrently.
Use mergeAll when you want to flatten nested Observables and order doesn’t matter. Control concurrency to limit simultaneous subscriptions.
Type Signature
export function mergeAll < O extends ObservableInput < any >>(
concurrent : number = Infinity
) : OperatorFunction < O , ObservedValueOf < O >>
Parameters
Maximum number of inner Observables being subscribed to concurrently. When set to 1, it behaves like concatAll. When set to Infinity (default), all inner Observables are subscribed to immediately.
Returns
OperatorFunction<O, ObservedValueOf<O>> - An operator function that returns an Observable emitting values from all inner Observables concurrently, up to the specified concurrency limit.
Usage Examples
Basic Example: Concurrent Intervals
Unlimited Concurrency
Limited Concurrency
import { fromEvent , map , interval , mergeAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const higherOrder = clicks . pipe (
map (() => interval ( 1000 ))
);
const firstOrder = higherOrder . pipe ( mergeAll ());
firstOrder . subscribe ( x => console . log ( x ));
// Each click starts a new interval that runs concurrently
// Output: 0, 0, 1, 0, 1, 2, 1, 0, 2, 3, ... (interleaved)
Real-World Example: Parallel API Requests
import { from , map , mergeAll , catchError , of } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface User {
id : string ;
name : string ;
email : string ;
}
interface UserWithDetails extends User {
posts : any [];
followers : any [];
}
const userIds = [ 'user1' , 'user2' , 'user3' , 'user4' , 'user5' ];
function fetchUserWithDetails ( userId : string ) {
return ajax . getJSON < User >( `/api/users/ ${ userId } ` ). pipe (
map ( user => ({
... user ,
posts: [],
followers: []
})),
catchError ( err => {
console . error ( `Failed to fetch user ${ userId } :` , err );
return of ( null );
})
);
}
// Fetch users with max 3 concurrent requests
from ( userIds ). pipe (
map ( id => fetchUserWithDetails ( id )),
mergeAll ( 3 )
). subscribe ({
next : ( user : UserWithDetails | null ) => {
if ( user ) {
console . log ( 'User loaded:' , user . name );
}
},
complete : () => console . log ( 'All users loaded' )
});
Concurrent File Downloads
import { from , map , mergeAll , tap } from 'rxjs' ;
interface DownloadProgress {
filename : string ;
progress : number ;
completed : boolean ;
}
function downloadFile ( url : string ) : Observable < DownloadProgress > {
return new Observable ( subscriber => {
const filename = url . split ( '/' ). pop () || 'file' ;
console . log ( `Starting download: ${ filename } ` );
fetch ( url )
. then ( response => {
const reader = response . body ! . getReader ();
const contentLength = parseInt ( response . headers . get ( 'Content-Length' ) || '0' );
let receivedLength = 0 ;
const read = () => {
reader . read (). then (({ done , value }) => {
if ( done ) {
subscriber . next ({ filename , progress: 100 , completed: true });
subscriber . complete ();
return ;
}
receivedLength += value . length ;
const progress = ( receivedLength / contentLength ) * 100 ;
subscriber . next ({ filename , progress , completed: false });
read ();
});
};
read ();
})
. catch ( err => subscriber . error ( err ));
});
}
const fileUrls = [
'/files/document1.pdf' ,
'/files/document2.pdf' ,
'/files/image1.jpg' ,
'/files/image2.jpg' ,
'/files/video1.mp4'
];
// Download max 2 files concurrently
from ( fileUrls ). pipe (
map ( url => downloadFile ( url )),
mergeAll ( 2 )
). subscribe ({
next : ( progress : DownloadProgress ) => {
if ( progress . completed ) {
console . log ( `✓ Downloaded: ${ progress . filename } ` );
} else {
console . log ( ` ${ progress . filename } : ${ progress . progress . toFixed ( 1 ) } %` );
}
},
complete : () => console . log ( 'All downloads complete' )
});
Real-Time Search Across Multiple Sources
import { fromEvent , map , debounceTime , mergeAll , distinctUntilChanged } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface SearchResult {
source : string ;
results : any [];
}
const searchInput = document . getElementById ( 'search' ) as HTMLInputElement ;
function searchAPI ( query : string , apiName : string , endpoint : string ) : Observable < SearchResult > {
return ajax . getJSON ( ` ${ endpoint } ?q= ${ query } ` ). pipe (
map ( results => ({ source: apiName , results: results as any [] })),
catchError ( err => {
console . error ( `Search failed on ${ apiName } :` , err );
return of ({ source: apiName , results: [] });
})
);
}
fromEvent ( searchInput , 'input' ). pipe (
map ( e => ( e . target as HTMLInputElement ). value ),
debounceTime ( 300 ),
distinctUntilChanged (),
map ( query => {
if ( ! query . trim ()) return of < SearchResult [] > ([]);
// Search multiple APIs concurrently
return from ([
searchAPI ( query , 'Products' , '/api/products/search' ),
searchAPI ( query , 'Articles' , '/api/articles/search' ),
searchAPI ( query , 'Users' , '/api/users/search' )
]). pipe (
mergeAll () // Execute all searches concurrently
);
}),
mergeAll () // Flatten the outer observable
). subscribe (( result : SearchResult | SearchResult []) => {
if ( Array . isArray ( result )) {
console . log ( 'Search cleared' );
} else {
console . log ( `Results from ${ result . source } :` , result . results . length );
updateSearchResults ( result . source , result . results );
}
});
Practical Scenarios
Setting a concurrency limit is important for rate-limiting, connection pooling, or preventing resource exhaustion when dealing with many inner Observables.
Scenario 1: Image Processing Pipeline
import { from , map , mergeAll } from 'rxjs' ;
interface ImageProcessingResult {
filename : string ;
thumbnailUrl : string ;
processedUrl : string ;
}
function processImage ( file : File ) : Observable < ImageProcessingResult > {
return new Observable ( subscriber => {
const formData = new FormData ();
formData . append ( 'image' , file );
ajax . post ( '/api/images/process' , formData )
. subscribe ({
next : response => {
subscriber . next ({
filename: file . name ,
thumbnailUrl: response . response . thumbnail ,
processedUrl: response . response . url
});
subscriber . complete ();
},
error : err => subscriber . error ( err )
});
});
}
const imageFiles : File [] = getSelectedFiles ();
// Process max 3 images concurrently
from ( imageFiles ). pipe (
map ( file => processImage ( file )),
mergeAll ( 3 )
). subscribe ({
next : ( result : ImageProcessingResult ) => {
console . log ( 'Image processed:' , result . filename );
displayProcessedImage ( result );
},
error : err => console . error ( 'Processing failed:' , err ),
complete : () => console . log ( 'All images processed' )
});
Scenario 2: WebSocket Connections Pool
import { from , map , mergeAll , retry } from 'rxjs' ;
interface WSMessage {
channel : string ;
data : any ;
}
function connectToChannel ( channel : string ) : Observable < WSMessage > {
return new Observable ( subscriber => {
console . log ( `Connecting to ${ channel } ...` );
const ws = new WebSocket ( `ws://localhost:8080/channels/ ${ channel } ` );
ws . onopen = () => console . log ( `Connected to ${ channel } ` );
ws . onmessage = ( event ) => {
subscriber . next ({
channel ,
data: JSON . parse ( event . data )
});
};
ws . onerror = ( error ) => subscriber . error ( error );
ws . onclose = () => subscriber . complete ();
return () => {
console . log ( `Disconnecting from ${ channel } ` );
ws . close ();
};
}). pipe (
retry ({ count: 3 , delay: 1000 })
);
}
const channels = [ 'news' , 'sports' , 'weather' , 'stocks' , 'crypto' ];
// Maintain max 3 concurrent WebSocket connections
from ( channels ). pipe (
map ( channel => connectToChannel ( channel )),
mergeAll ( 3 )
). subscribe ({
next : ( message : WSMessage ) => {
console . log ( `Message from ${ message . channel } :` , message . data );
updateChannelUI ( message . channel , message . data );
}
});
Behavior Details
Concurrency Control
mergeAll() or mergeAll(Infinity): Subscribe to all inner Observables immediately
mergeAll(1): Equivalent to concatAll(), subscribe one at a time
mergeAll(n): Subscribe to at most n inner Observables concurrently
Completion Behavior
The output Observable completes only when both the source Observable completes AND all inner Observables complete. If any inner Observable errors, the error is immediately propagated.
import { of , delay , mergeAll , throwError } from 'rxjs' ;
const source$ = of (
of ( 1 , 2 ). pipe ( delay ( 100 )),
throwError (() => new Error ( 'Failed' )),
of ( 3 , 4 ). pipe ( delay ( 200 ))
);
source$ . pipe (
mergeAll ()
). subscribe ({
next: console . log ,
error : err => console . error ( 'Error:' , err . message )
});
// Output: 1, 2, Error: Failed
// The third inner observable never emits
Without concurrency limit, all inner Observables are kept alive simultaneously
With concurrency limit, pending inner Observables are queued in memory
Consider memory usage when dealing with many long-lived inner Observables
concatAll - Flattens sequentially (one at a time)
switchAll - Flattens but cancels previous inner Observable
exhaustAll - Flattens but ignores new while one is active
mergeMap - Maps and merges in one operator
mergeWith - Merges specific Observables with the source