Overview
mergeWith creates an Observable that subscribes to the source Observable and all provided Observables simultaneously, emitting all values from all sources as they occur. It completes when all sources complete.
Perfect for combining independent event streams like user interactions, where events from any source should be processed as they occur.
Type Signature
export function mergeWith < T , A extends readonly unknown []>(
... otherSources : [ ... ObservableInputTuple < A >]
) : OperatorFunction < T , T | A [ number ]>
Parameters
otherSources
ObservableInputTuple<A>
required
One or more Observable sources to merge with the source Observable. All sources are subscribed to immediately and their values are emitted as they occur.
Returns
OperatorFunction<T, T | A[number]> - An operator function that returns an Observable merging all values from the source and provided Observables.
Usage Examples
Basic Usage
With Intervals
import { fromEvent , map , mergeWith } from 'rxjs' ;
const clicks$ = fromEvent ( document , 'click' ). pipe ( map (() => 'click' ));
const mousemoves$ = fromEvent ( document , 'mousemove' ). pipe ( map (() => 'mousemove' ));
const dblclicks$ = fromEvent ( document , 'dblclick' ). pipe ( map (() => 'dblclick' ));
mousemoves$
. pipe ( mergeWith ( clicks$ , dblclicks$ ))
. subscribe ( x => console . log ( x ));
// Output (based on user interactions):
// 'mousemove'
// 'mousemove'
// 'click'
// 'mousemove'
// 'dblclick'
// 'mousemove'
Real-World Example: Multi-Source Notification System
import { fromEvent , map , mergeWith , scan } from 'rxjs' ;
import { webSocket } from 'rxjs/webSocket' ;
interface Notification {
id : string ;
type : 'email' | 'push' | 'sms' | 'websocket' ;
message : string ;
timestamp : number ;
}
// Email notifications via polling
const emailNotifications$ = interval ( 30000 ). pipe (
switchMap (() => ajax . getJSON < any []>( '/api/notifications/email' )),
mergeMap ( emails => emails ),
map ( email => ({
id: email . id ,
type: 'email' as const ,
message: email . subject ,
timestamp: Date . now ()
}))
);
// Push notifications via service worker
const pushNotifications$ = new Observable < Notification >( subscriber => {
navigator . serviceWorker . addEventListener ( 'message' , event => {
subscriber . next ({
id: event . data . id ,
type: 'push' ,
message: event . data . message ,
timestamp: Date . now ()
});
});
});
// SMS notifications via WebSocket
const wsNotifications$ = webSocket < any >( 'ws://localhost:8080/sms' ). pipe (
map ( sms => ({
id: sms . id ,
type: 'sms' as const ,
message: sms . text ,
timestamp: Date . now ()
}))
);
// In-app notifications via custom events
const appNotifications$ = fromEvent < CustomEvent >( window , 'app-notification' ). pipe (
map ( event => ({
id: event . detail . id ,
type: 'websocket' as const ,
message: event . detail . message ,
timestamp: Date . now ()
}))
);
// Merge all notification sources
emailNotifications$ . pipe (
mergeWith ( pushNotifications$ , wsNotifications$ , appNotifications$ ),
scan (( acc , notification ) => [ ... acc , notification ], [] as Notification [])
). subscribe ( notifications => {
console . log ( 'Total notifications:' , notifications . length );
updateNotificationBadge ( notifications . length );
displayLatestNotification ( notifications [ notifications . length - 1 ]);
});
Multi-Player Game Events
import { fromEvent , map , mergeWith , filter } from 'rxjs' ;
import { webSocket } from 'rxjs/webSocket' ;
interface GameEvent {
player : string ;
action : 'move' | 'shoot' | 'jump' | 'chat' ;
data : any ;
timestamp : number ;
}
// Local player events
const keyboard$ = fromEvent < KeyboardEvent >( document , 'keydown' );
const localMoves$ = keyboard$ . pipe (
filter ( e => [ 'w' , 'a' , 's' , 'd' ]. includes ( e . key )),
map ( e => ({
player: 'local' ,
action: 'move' as const ,
data: { direction: e . key },
timestamp: Date . now ()
}))
);
const localShoots$ = keyboard$ . pipe (
filter ( e => e . key === ' ' ),
map (() => ({
player: 'local' ,
action: 'shoot' as const ,
data: { weapon: 'primary' },
timestamp: Date . now ()
}))
);
// Remote player events via WebSocket
const remoteEvents$ = webSocket < GameEvent >( 'ws://game-server.com/events' );
// Chat messages
const chatMessages$ = fromEvent < CustomEvent >( window , 'chat-message' ). pipe (
map ( e => ({
player: e . detail . player ,
action: 'chat' as const ,
data: { message: e . detail . message },
timestamp: Date . now ()
}))
);
// Merge all game event sources
localMoves$ . pipe (
mergeWith ( localShoots$ , remoteEvents$ , chatMessages$ )
). subscribe (( event : GameEvent ) => {
console . log ( `[ ${ event . player } ] ${ event . action } :` , event . data );
processGameEvent ( event );
});
Real-Time Analytics Dashboard
import { interval , map , mergeWith , scan } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface Metric {
source : string ;
value : number ;
timestamp : number ;
}
interface DashboardData {
pageviews : number ;
activeUsers : number ;
revenue : number ;
errors : number ;
}
// Different metrics from different endpoints
const pageviews$ = interval ( 5000 ). pipe (
switchMap (() => ajax . getJSON < number >( '/api/metrics/pageviews' )),
map ( value => ({ source: 'pageviews' , value , timestamp: Date . now () }))
);
const activeUsers$ = interval ( 10000 ). pipe (
switchMap (() => ajax . getJSON < number >( '/api/metrics/active-users' )),
map ( value => ({ source: 'activeUsers' , value , timestamp: Date . now () }))
);
const revenue$ = interval ( 15000 ). pipe (
switchMap (() => ajax . getJSON < number >( '/api/metrics/revenue' )),
map ( value => ({ source: 'revenue' , value , timestamp: Date . now () }))
);
const errors$ = interval ( 3000 ). pipe (
switchMap (() => ajax . getJSON < number >( '/api/metrics/errors' )),
map ( value => ({ source: 'errors' , value , timestamp: Date . now () }))
);
// Merge all metrics and accumulate
pageviews$ . pipe (
mergeWith ( activeUsers$ , revenue$ , errors$ ),
scan (( dashboard , metric ) => ({
... dashboard ,
[metric.source]: metric . value
}), { pageviews: 0 , activeUsers: 0 , revenue: 0 , errors: 0 } as DashboardData )
). subscribe (( dashboard : DashboardData ) => {
updateDashboard ( dashboard );
console . log ( 'Dashboard updated:' , dashboard );
});
Practical Scenarios
Use mergeWith when you have multiple independent streams that should all be processed, and the order of emissions across streams doesn’t matter.
import { fromEvent , debounceTime , map , mergeWith , distinctUntilChanged } from 'rxjs' ;
const nameInput = document . getElementById ( 'name' ) as HTMLInputElement ;
const emailInput = document . getElementById ( 'email' ) as HTMLInputElement ;
const bioTextarea = document . getElementById ( 'bio' ) as HTMLTextAreaElement ;
const saveButton = document . getElementById ( 'save' ) as HTMLButtonElement ;
// Save on input changes (debounced)
const nameChanges$ = fromEvent ( nameInput , 'input' ). pipe (
debounceTime ( 1000 ),
map (() => 'auto' )
);
const emailChanges$ = fromEvent ( emailInput , 'input' ). pipe (
debounceTime ( 1000 ),
map (() => 'auto' )
);
const bioChanges$ = fromEvent ( bioTextarea , 'input' ). pipe (
debounceTime ( 1000 ),
map (() => 'auto' )
);
// Save on button click
const manualSave$ = fromEvent ( saveButton , 'click' ). pipe (
map (() => 'manual' )
);
// Save on window unload
const unloadSave$ = fromEvent ( window , 'beforeunload' ). pipe (
map (() => 'unload' )
);
// Merge all save triggers
nameChanges$ . pipe (
mergeWith ( emailChanges$ , bioChanges$ , manualSave$ , unloadSave$ ),
distinctUntilChanged ()
). subscribe ( trigger => {
console . log ( `Saving form (trigger: ${ trigger } )...` );
saveFormData ({
name: nameInput . value ,
email: emailInput . value ,
bio: bioTextarea . value
});
});
Scenario 2: Multi-Device Sync
import { mergeWith , tap } from 'rxjs' ;
import { webSocket } from 'rxjs/webSocket' ;
interface SyncEvent {
device : string ;
type : 'update' | 'delete' ;
resource : string ;
data : any ;
}
// Changes from local device
const localChanges$ = new Subject < SyncEvent >();
// Changes from other devices via WebSocket
const remoteChanges$ = webSocket < SyncEvent >( 'ws://sync.example.com' );
// Changes from background sync
const backgroundSync$ = interval ( 60000 ). pipe (
switchMap (() => ajax . getJSON < SyncEvent []>( '/api/sync/changes' )),
mergeMap ( changes => from ( changes ))
);
localChanges$ . pipe (
tap ( event => {
// Send local changes to server
ajax . post ( '/api/sync/push' , event ). subscribe ();
}),
mergeWith ( remoteChanges$ , backgroundSync$ )
). subscribe (( event : SyncEvent ) => {
console . log ( `Sync event from ${ event . device } :` , event . type );
applySyncEvent ( event );
});
Behavior Details
Subscription Timing
All sources are subscribed to immediately when the output Observable is subscribed
Values are emitted as soon as any source emits
No buffering or waiting occurs
Completion and Error Handling
The output Observable completes only when ALL source Observables complete. If any source errors, the output Observable errors immediately.
import { interval , take , mergeWith , throwError , delay } from 'rxjs' ;
const source1$ = interval ( 1000 ). pipe ( take ( 3 ));
const source2$ = throwError (() => new Error ( 'Failed' )). pipe ( delay ( 2500 ));
const source3$ = interval ( 500 ). pipe ( take ( 10 ));
source1$ . pipe (
mergeWith ( source2$ , source3$ )
). subscribe ({
next: console . log ,
error : err => console . error ( 'Error:' , err . message ),
complete : () => console . log ( 'Complete' )
});
// Emits values from source1 and source3, then errors after 2.5s
Comparison with Other Combinators
Operator Subscription Emission Use Case mergeWithAll immediately All values as they occur Independent streams combineLatestWithAll immediately Latest from all after each emit Dependent calculations concatWithSequential All values in order Ordered sequences zipWithAll immediately Paired by index Synchronized streams
merge - Static creation operator for merging Observables
mergeAll - Flattens a higher-order Observable with concurrency
mergeMap - Maps and merges in one operator
concatWith - Combines sequentially instead of concurrently
combineLatestWith - Combines latest values instead of all values