Skip to main content

Overview

mergeWith creates an Observable that subscribes to the source Observable and all provided Observables simultaneously, emitting all values from all sources as they occur. It completes when all sources complete.
Perfect for combining independent event streams like user interactions, where events from any source should be processed as they occur.

Type Signature

export function mergeWith<T, A extends readonly unknown[]>(
  ...otherSources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>

Parameters

otherSources
ObservableInputTuple<A>
required
One or more Observable sources to merge with the source Observable. All sources are subscribed to immediately and their values are emitted as they occur.

Returns

OperatorFunction<T, T | A[number]> - An operator function that returns an Observable merging all values from the source and provided Observables.

Usage Examples

Basic Example: Merging User Input Events

import { fromEvent, map, mergeWith } from 'rxjs';

const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
const mousemoves$ = fromEvent(document, 'mousemove').pipe(map(() => 'mousemove'));
const dblclicks$ = fromEvent(document, 'dblclick').pipe(map(() => 'dblclick'));

mousemoves$
  .pipe(mergeWith(clicks$, dblclicks$))
  .subscribe(x => console.log(x));

// Output (based on user interactions):
// 'mousemove'
// 'mousemove'
// 'click'
// 'mousemove'
// 'dblclick'
// 'mousemove'

Real-World Example: Multi-Source Notification System

import { fromEvent, map, mergeWith, scan } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

interface Notification {
  id: string;
  type: 'email' | 'push' | 'sms' | 'websocket';
  message: string;
  timestamp: number;
}

// Email notifications via polling
const emailNotifications$ = interval(30000).pipe(
  switchMap(() => ajax.getJSON<any[]>('/api/notifications/email')),
  mergeMap(emails => emails),
  map(email => ({
    id: email.id,
    type: 'email' as const,
    message: email.subject,
    timestamp: Date.now()
  }))
);

// Push notifications via service worker
const pushNotifications$ = new Observable<Notification>(subscriber => {
  navigator.serviceWorker.addEventListener('message', event => {
    subscriber.next({
      id: event.data.id,
      type: 'push',
      message: event.data.message,
      timestamp: Date.now()
    });
  });
});

// SMS notifications via WebSocket
const wsNotifications$ = webSocket<any>('ws://localhost:8080/sms').pipe(
  map(sms => ({
    id: sms.id,
    type: 'sms' as const,
    message: sms.text,
    timestamp: Date.now()
  }))
);

// In-app notifications via custom events
const appNotifications$ = fromEvent<CustomEvent>(window, 'app-notification').pipe(
  map(event => ({
    id: event.detail.id,
    type: 'websocket' as const,
    message: event.detail.message,
    timestamp: Date.now()
  }))
);

// Merge all notification sources
emailNotifications$.pipe(
  mergeWith(pushNotifications$, wsNotifications$, appNotifications$),
  scan((acc, notification) => [...acc, notification], [] as Notification[])
).subscribe(notifications => {
  console.log('Total notifications:', notifications.length);
  updateNotificationBadge(notifications.length);
  displayLatestNotification(notifications[notifications.length - 1]);
});

Multi-Player Game Events

import { fromEvent, map, mergeWith, filter } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

interface GameEvent {
  player: string;
  action: 'move' | 'shoot' | 'jump' | 'chat';
  data: any;
  timestamp: number;
}

// Local player events
const keyboard$ = fromEvent<KeyboardEvent>(document, 'keydown');

const localMoves$ = keyboard$.pipe(
  filter(e => ['w', 'a', 's', 'd'].includes(e.key)),
  map(e => ({
    player: 'local',
    action: 'move' as const,
    data: { direction: e.key },
    timestamp: Date.now()
  }))
);

const localShoots$ = keyboard$.pipe(
  filter(e => e.key === ' '),
  map(() => ({
    player: 'local',
    action: 'shoot' as const,
    data: { weapon: 'primary' },
    timestamp: Date.now()
  }))
);

// Remote player events via WebSocket
const remoteEvents$ = webSocket<GameEvent>('ws://game-server.com/events');

// Chat messages
const chatMessages$ = fromEvent<CustomEvent>(window, 'chat-message').pipe(
  map(e => ({
    player: e.detail.player,
    action: 'chat' as const,
    data: { message: e.detail.message },
    timestamp: Date.now()
  }))
);

// Merge all game event sources
localMoves$.pipe(
  mergeWith(localShoots$, remoteEvents$, chatMessages$)
).subscribe((event: GameEvent) => {
  console.log(`[${event.player}] ${event.action}:`, event.data);
  processGameEvent(event);
});

Real-Time Analytics Dashboard

import { interval, map, mergeWith, scan } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface Metric {
  source: string;
  value: number;
  timestamp: number;
}

interface DashboardData {
  pageviews: number;
  activeUsers: number;
  revenue: number;
  errors: number;
}

// Different metrics from different endpoints
const pageviews$ = interval(5000).pipe(
  switchMap(() => ajax.getJSON<number>('/api/metrics/pageviews')),
  map(value => ({ source: 'pageviews', value, timestamp: Date.now() }))
);

const activeUsers$ = interval(10000).pipe(
  switchMap(() => ajax.getJSON<number>('/api/metrics/active-users')),
  map(value => ({ source: 'activeUsers', value, timestamp: Date.now() }))
);

const revenue$ = interval(15000).pipe(
  switchMap(() => ajax.getJSON<number>('/api/metrics/revenue')),
  map(value => ({ source: 'revenue', value, timestamp: Date.now() }))
);

const errors$ = interval(3000).pipe(
  switchMap(() => ajax.getJSON<number>('/api/metrics/errors')),
  map(value => ({ source: 'errors', value, timestamp: Date.now() }))
);

// Merge all metrics and accumulate
pageviews$.pipe(
  mergeWith(activeUsers$, revenue$, errors$),
  scan((dashboard, metric) => ({
    ...dashboard,
    [metric.source]: metric.value
  }), { pageviews: 0, activeUsers: 0, revenue: 0, errors: 0 } as DashboardData)
).subscribe((dashboard: DashboardData) => {
  updateDashboard(dashboard);
  console.log('Dashboard updated:', dashboard);
});

Practical Scenarios

Use mergeWith when you have multiple independent streams that should all be processed, and the order of emissions across streams doesn’t matter.

Scenario 1: Form Autosave from Multiple Triggers

import { fromEvent, debounceTime, map, mergeWith, distinctUntilChanged } from 'rxjs';

const nameInput = document.getElementById('name') as HTMLInputElement;
const emailInput = document.getElementById('email') as HTMLInputElement;
const bioTextarea = document.getElementById('bio') as HTMLTextAreaElement;
const saveButton = document.getElementById('save') as HTMLButtonElement;

// Save on input changes (debounced)
const nameChanges$ = fromEvent(nameInput, 'input').pipe(
  debounceTime(1000),
  map(() => 'auto')
);

const emailChanges$ = fromEvent(emailInput, 'input').pipe(
  debounceTime(1000),
  map(() => 'auto')
);

const bioChanges$ = fromEvent(bioTextarea, 'input').pipe(
  debounceTime(1000),
  map(() => 'auto')
);

// Save on button click
const manualSave$ = fromEvent(saveButton, 'click').pipe(
  map(() => 'manual')
);

// Save on window unload
const unloadSave$ = fromEvent(window, 'beforeunload').pipe(
  map(() => 'unload')
);

// Merge all save triggers
nameChanges$.pipe(
  mergeWith(emailChanges$, bioChanges$, manualSave$, unloadSave$),
  distinctUntilChanged()
).subscribe(trigger => {
  console.log(`Saving form (trigger: ${trigger})...`);
  saveFormData({
    name: nameInput.value,
    email: emailInput.value,
    bio: bioTextarea.value
  });
});

Scenario 2: Multi-Device Sync

import { mergeWith, tap } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

interface SyncEvent {
  device: string;
  type: 'update' | 'delete';
  resource: string;
  data: any;
}

// Changes from local device
const localChanges$ = new Subject<SyncEvent>();

// Changes from other devices via WebSocket
const remoteChanges$ = webSocket<SyncEvent>('ws://sync.example.com');

// Changes from background sync
const backgroundSync$ = interval(60000).pipe(
  switchMap(() => ajax.getJSON<SyncEvent[]>('/api/sync/changes')),
  mergeMap(changes => from(changes))
);

localChanges$.pipe(
  tap(event => {
    // Send local changes to server
    ajax.post('/api/sync/push', event).subscribe();
  }),
  mergeWith(remoteChanges$, backgroundSync$)
).subscribe((event: SyncEvent) => {
  console.log(`Sync event from ${event.device}:`, event.type);
  applySyncEvent(event);
});

Behavior Details

Subscription Timing

  • All sources are subscribed to immediately when the output Observable is subscribed
  • Values are emitted as soon as any source emits
  • No buffering or waiting occurs

Completion and Error Handling

The output Observable completes only when ALL source Observables complete. If any source errors, the output Observable errors immediately.
import { interval, take, mergeWith, throwError, delay } from 'rxjs';

const source1$ = interval(1000).pipe(take(3));
const source2$ = throwError(() => new Error('Failed')).pipe(delay(2500));
const source3$ = interval(500).pipe(take(10));

source1$.pipe(
  mergeWith(source2$, source3$)
).subscribe({
  next: console.log,
  error: err => console.error('Error:', err.message),
  complete: () => console.log('Complete')
});
// Emits values from source1 and source3, then errors after 2.5s

Comparison with Other Combinators

OperatorSubscriptionEmissionUse Case
mergeWithAll immediatelyAll values as they occurIndependent streams
combineLatestWithAll immediatelyLatest from all after each emitDependent calculations
concatWithSequentialAll values in orderOrdered sequences
zipWithAll immediatelyPaired by indexSynchronized streams
  • merge - Static creation operator for merging Observables
  • mergeAll - Flattens a higher-order Observable with concurrency
  • mergeMap - Maps and merges in one operator
  • concatWith - Combines sequentially instead of concurrently
  • combineLatestWith - Combines latest values instead of all values