Skip to main content

Overview

mergeAll flattens an Observable-of-Observables by subscribing to inner Observables as they arrive and merging their emissions into a single output stream. Unlike concatAll, it can subscribe to multiple inner Observables concurrently.
Use mergeAll when you want to flatten nested Observables and order doesn’t matter. Control concurrency to limit simultaneous subscriptions.

Type Signature

export function mergeAll<O extends ObservableInput<any>>(
  concurrent: number = Infinity
): OperatorFunction<O, ObservedValueOf<O>>

Parameters

concurrent
number
default:"Infinity"
Maximum number of inner Observables being subscribed to concurrently. When set to 1, it behaves like concatAll. When set to Infinity (default), all inner Observables are subscribed to immediately.

Returns

OperatorFunction<O, ObservedValueOf<O>> - An operator function that returns an Observable emitting values from all inner Observables concurrently, up to the specified concurrency limit.

Usage Examples

Basic Example: Concurrent Intervals

import { fromEvent, map, interval, mergeAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(() => interval(1000))
);
const firstOrder = higherOrder.pipe(mergeAll());

firstOrder.subscribe(x => console.log(x));

// Each click starts a new interval that runs concurrently
// Output: 0, 0, 1, 0, 1, 2, 1, 0, 2, 3, ... (interleaved)

Real-World Example: Parallel API Requests

import { from, map, mergeAll, catchError, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface User {
  id: string;
  name: string;
  email: string;
}

interface UserWithDetails extends User {
  posts: any[];
  followers: any[];
}

const userIds = ['user1', 'user2', 'user3', 'user4', 'user5'];

function fetchUserWithDetails(userId: string) {
  return ajax.getJSON<User>(`/api/users/${userId}`).pipe(
    map(user => ({
      ...user,
      posts: [],
      followers: []
    })),
    catchError(err => {
      console.error(`Failed to fetch user ${userId}:`, err);
      return of(null);
    })
  );
}

// Fetch users with max 3 concurrent requests
from(userIds).pipe(
  map(id => fetchUserWithDetails(id)),
  mergeAll(3)
).subscribe({
  next: (user: UserWithDetails | null) => {
    if (user) {
      console.log('User loaded:', user.name);
    }
  },
  complete: () => console.log('All users loaded')
});

Concurrent File Downloads

import { from, map, mergeAll, tap } from 'rxjs';

interface DownloadProgress {
  filename: string;
  progress: number;
  completed: boolean;
}

function downloadFile(url: string): Observable<DownloadProgress> {
  return new Observable(subscriber => {
    const filename = url.split('/').pop() || 'file';
    console.log(`Starting download: ${filename}`);

    fetch(url)
      .then(response => {
        const reader = response.body!.getReader();
        const contentLength = parseInt(response.headers.get('Content-Length') || '0');
        let receivedLength = 0;

        const read = () => {
          reader.read().then(({ done, value }) => {
            if (done) {
              subscriber.next({ filename, progress: 100, completed: true });
              subscriber.complete();
              return;
            }

            receivedLength += value.length;
            const progress = (receivedLength / contentLength) * 100;
            subscriber.next({ filename, progress, completed: false });
            read();
          });
        };

        read();
      })
      .catch(err => subscriber.error(err));
  });
}

const fileUrls = [
  '/files/document1.pdf',
  '/files/document2.pdf',
  '/files/image1.jpg',
  '/files/image2.jpg',
  '/files/video1.mp4'
];

// Download max 2 files concurrently
from(fileUrls).pipe(
  map(url => downloadFile(url)),
  mergeAll(2)
).subscribe({
  next: (progress: DownloadProgress) => {
    if (progress.completed) {
      console.log(`✓ Downloaded: ${progress.filename}`);
    } else {
      console.log(`${progress.filename}: ${progress.progress.toFixed(1)}%`);
    }
  },
  complete: () => console.log('All downloads complete')
});

Real-Time Search Across Multiple Sources

import { fromEvent, map, debounceTime, mergeAll, distinctUntilChanged } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface SearchResult {
  source: string;
  results: any[];
}

const searchInput = document.getElementById('search') as HTMLInputElement;

function searchAPI(query: string, apiName: string, endpoint: string): Observable<SearchResult> {
  return ajax.getJSON(`${endpoint}?q=${query}`).pipe(
    map(results => ({ source: apiName, results: results as any[] })),
    catchError(err => {
      console.error(`Search failed on ${apiName}:`, err);
      return of({ source: apiName, results: [] });
    })
  );
}

fromEvent(searchInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  debounceTime(300),
  distinctUntilChanged(),
  map(query => {
    if (!query.trim()) return of<SearchResult[]>([]);

    // Search multiple APIs concurrently
    return from([
      searchAPI(query, 'Products', '/api/products/search'),
      searchAPI(query, 'Articles', '/api/articles/search'),
      searchAPI(query, 'Users', '/api/users/search')
    ]).pipe(
      mergeAll() // Execute all searches concurrently
    );
  }),
  mergeAll() // Flatten the outer observable
).subscribe((result: SearchResult | SearchResult[]) => {
  if (Array.isArray(result)) {
    console.log('Search cleared');
  } else {
    console.log(`Results from ${result.source}:`, result.results.length);
    updateSearchResults(result.source, result.results);
  }
});

Practical Scenarios

Setting a concurrency limit is important for rate-limiting, connection pooling, or preventing resource exhaustion when dealing with many inner Observables.

Scenario 1: Image Processing Pipeline

import { from, map, mergeAll } from 'rxjs';

interface ImageProcessingResult {
  filename: string;
  thumbnailUrl: string;
  processedUrl: string;
}

function processImage(file: File): Observable<ImageProcessingResult> {
  return new Observable(subscriber => {
    const formData = new FormData();
    formData.append('image', file);

    ajax.post('/api/images/process', formData)
      .subscribe({
        next: response => {
          subscriber.next({
            filename: file.name,
            thumbnailUrl: response.response.thumbnail,
            processedUrl: response.response.url
          });
          subscriber.complete();
        },
        error: err => subscriber.error(err)
      });
  });
}

const imageFiles: File[] = getSelectedFiles();

// Process max 3 images concurrently
from(imageFiles).pipe(
  map(file => processImage(file)),
  mergeAll(3)
).subscribe({
  next: (result: ImageProcessingResult) => {
    console.log('Image processed:', result.filename);
    displayProcessedImage(result);
  },
  error: err => console.error('Processing failed:', err),
  complete: () => console.log('All images processed')
});

Scenario 2: WebSocket Connections Pool

import { from, map, mergeAll, retry } from 'rxjs';

interface WSMessage {
  channel: string;
  data: any;
}

function connectToChannel(channel: string): Observable<WSMessage> {
  return new Observable(subscriber => {
    console.log(`Connecting to ${channel}...`);
    const ws = new WebSocket(`ws://localhost:8080/channels/${channel}`);

    ws.onopen = () => console.log(`Connected to ${channel}`);
    
    ws.onmessage = (event) => {
      subscriber.next({
        channel,
        data: JSON.parse(event.data)
      });
    };

    ws.onerror = (error) => subscriber.error(error);
    ws.onclose = () => subscriber.complete();

    return () => {
      console.log(`Disconnecting from ${channel}`);
      ws.close();
    };
  }).pipe(
    retry({ count: 3, delay: 1000 })
  );
}

const channels = ['news', 'sports', 'weather', 'stocks', 'crypto'];

// Maintain max 3 concurrent WebSocket connections
from(channels).pipe(
  map(channel => connectToChannel(channel)),
  mergeAll(3)
).subscribe({
  next: (message: WSMessage) => {
    console.log(`Message from ${message.channel}:`, message.data);
    updateChannelUI(message.channel, message.data);
  }
});

Behavior Details

Concurrency Control

  • mergeAll() or mergeAll(Infinity): Subscribe to all inner Observables immediately
  • mergeAll(1): Equivalent to concatAll(), subscribe one at a time
  • mergeAll(n): Subscribe to at most n inner Observables concurrently

Completion Behavior

The output Observable completes only when both the source Observable completes AND all inner Observables complete. If any inner Observable errors, the error is immediately propagated.
import { of, delay, mergeAll, throwError } from 'rxjs';

const source$ = of(
  of(1, 2).pipe(delay(100)),
  throwError(() => new Error('Failed')),
  of(3, 4).pipe(delay(200))
);

source$.pipe(
  mergeAll()
).subscribe({
  next: console.log,
  error: err => console.error('Error:', err.message)
});
// Output: 1, 2, Error: Failed
// The third inner observable never emits

Memory and Performance

  • Without concurrency limit, all inner Observables are kept alive simultaneously
  • With concurrency limit, pending inner Observables are queued in memory
  • Consider memory usage when dealing with many long-lived inner Observables
  • concatAll - Flattens sequentially (one at a time)
  • switchAll - Flattens but cancels previous inner Observable
  • exhaustAll - Flattens but ignores new while one is active
  • mergeMap - Maps and merges in one operator
  • mergeWith - Merges specific Observables with the source