Skip to main content

Overview

Projects each source value to an Observable (the “inner” Observable) and merges all resulting Observables concurrently. Also known as flatMap, it’s one of the most powerful operators for handling asynchronous operations.
mergeMap is ideal when you need to execute multiple asynchronous operations in parallel and you care about all their results.

Type Signature

function mergeMap<T, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
  concurrent: number = Infinity
): OperatorFunction<T, ObservedValueOf<O>>

Parameters

project
(value: T, index: number) => O
required
A function that maps each source value to an Observable (or Promise, Array, etc.). The function receives:
  • value: The emitted value from the source
  • index: The zero-based index of the emission
Must return an ObservableInput that will be merged into the output.
concurrent
number
default:"Infinity"
Maximum number of inner Observables being subscribed to concurrently. If omitted or Infinity, all inner Observables are subscribed to simultaneously. If set to 1, behaves like concatMap.

Returns

return
OperatorFunction<T, ObservedValueOf<O>>
A function that returns an Observable that emits values from all projected inner Observables merged together.

Usage Examples

Basic Example: Parallel API Calls

import { of, mergeMap, interval, map } from 'rxjs';

const letters = of('a', 'b', 'c');
const result = letters.pipe(
  mergeMap(x => interval(1000).pipe(
    map(i => x + i),
    take(3)
  ))
);

result.subscribe(x => console.log(x));
// All intervals run concurrently:
// a0, b0, c0 (at ~1s)
// a1, b1, c1 (at ~2s)
// a2, b2, c2 (at ~3s)

Controlled Concurrency

import { from, mergeMap, delay, of } from 'rxjs';

const urls = [
  '/api/data/1',
  '/api/data/2', 
  '/api/data/3',
  '/api/data/4',
  '/api/data/5',
  '/api/data/6'
];

// Only 2 concurrent requests at a time
from(urls).pipe(
  mergeMap(
    url => {
      console.log('Fetching:', url);
      return from(
        fetch(url).then(res => res.json())
      );
    },
    2 // Concurrency limit
  )
).subscribe(
  data => console.log('Data received:', data),
  err => console.error('Error:', err),
  () => console.log('All requests complete')
);

// At t=0: Start request 1 and 2
// When 1 completes: Start request 3
// When 2 completes: Start request 4
// etc.

With Promises

import { fromEvent, mergeMap, from } from 'rxjs';

interface SearchResult {
  query: string;
  results: any[];
}

const searchInput = document.getElementById('search') as HTMLInputElement;
const searches = fromEvent(searchInput, 'input');

searches.pipe(
  debounceTime(300),
  map(event => (event.target as HTMLInputElement).value),
  filter(query => query.length > 2),
  mergeMap(query => 
    from(
      fetch(`/api/search?q=${encodeURIComponent(query)}`)
        .then(res => res.json())
    ).pipe(
      map(results => ({ query, results }))
    )
  )
).subscribe(({ query, results }) => {
  console.log(`Results for "${query}":`, results);
  displayResults(results);
});

Marble Diagram

Source:     --1-------2-------3-------|
Project(1):   a-b-c|
Project(2):           d-e-f|
Project(3):                   g-h-i|
Result:     --a-b-c---d-e-f---g-h-i---|
             (all merged concurrently)

With Concurrency = 2

Source:     --1--2--3--4--5--|
Limit: 2 concurrent at a time

Project(1):   a-b|
Project(2):      c-d|
Project(3):        (waits)e-f|
Project(4):              (waits)g-h|
Project(5):                    (waits)i-j|
Result:     --a-bc-de-fg-hi-j--|

Common Use Cases

  1. Parallel API Calls: Fetch multiple resources concurrently
  2. Database Queries: Execute multiple queries in parallel
  3. File Operations: Read/write multiple files simultaneously
  4. Search Autocomplete: Handle multiple rapid search requests
  5. Batch Processing: Process multiple items with async operations
  6. Real-time Updates: Subscribe to multiple real-time data streams
Without a concurrency limit, mergeMap can create many concurrent subscriptions with fast sources, potentially overwhelming the system. Use the concurrent parameter to control this.

Advanced Example: Parallel File Processing

import { from, mergeMap, map, catchError, of, tap } from 'rxjs';

interface FileMetadata {
  path: string;
  size: number;
  type: string;
}

interface ProcessedFile {
  path: string;
  success: boolean;
  result?: any;
  error?: string;
  duration: number;
}

const files: FileMetadata[] = [
  { path: '/data/file1.json', size: 1024, type: 'json' },
  { path: '/data/file2.csv', size: 2048, type: 'csv' },
  { path: '/data/file3.xml', size: 512, type: 'xml' },
  // ... many more files
];

function processFile(file: FileMetadata): Promise<any> {
  return fetch(file.path)
    .then(res => res.text())
    .then(content => {
      // Process based on type
      switch (file.type) {
        case 'json': return JSON.parse(content);
        case 'csv': return parseCSV(content);
        case 'xml': return parseXML(content);
        default: return content;
      }
    });
}

from(files).pipe(
  mergeMap(
    file => {
      const startTime = Date.now();
      console.log(`Processing: ${file.path}`);
      
      return from(processFile(file)).pipe(
        map(result => ({
          path: file.path,
          success: true,
          result,
          duration: Date.now() - startTime
        } as ProcessedFile)),
        catchError(error => {
          console.error(`Failed: ${file.path}`, error);
          return of({
            path: file.path,
            success: false,
            error: error.message,
            duration: Date.now() - startTime
          } as ProcessedFile);
        })
      );
    },
    5 // Process 5 files concurrently
  ),
  tap(result => {
    if (result.success) {
      console.log(`✓ ${result.path} (${result.duration}ms)`);
    } else {
      console.log(`✗ ${result.path}: ${result.error}`);
    }
  }),
  // Collect all results
  toArray()
).subscribe(
  results => {
    const successful = results.filter(r => r.success).length;
    const failed = results.filter(r => !r.success).length;
    console.log(`Complete: ${successful} succeeded, ${failed} failed`);
  }
);

Dynamic Concurrency Based on System Load

import { from, mergeMap, defer } from 'rxjs';

let currentLoad = 0;
const maxLoad = 10;

function getConcurrency(): number {
  const cpuUsage = getCurrentCPUUsage();
  if (cpuUsage > 80) return 2;
  if (cpuUsage > 60) return 4;
  return 8;
}

from(tasks).pipe(
  mergeMap(
    task => defer(() => {
      currentLoad++;
      console.log(`Current load: ${currentLoad}`);
      
      return from(processTask(task)).pipe(
        finalize(() => {
          currentLoad--;
        })
      );
    }),
    getConcurrency()
  )
).subscribe(result => {
  console.log('Task complete:', result);
});

Comparison with Other Flattening Operators

import { of, mergeMap, delay } from 'rxjs';

of(1, 2, 3).pipe(
  mergeMap(x => of(x).pipe(delay(1000)))
).subscribe(x => console.log(x));
// All 3 execute in parallel
// After ~1s: 1, 2, 3 (order may vary)
Use mergeMap when you want all operations to execute and you need all results. Use concatMap when order matters. Use switchMap when only the latest matters. Use exhaustMap when you want to ignore rapid triggers.
  • concatMap - Sequential flattening (concurrency = 1)
  • switchMap - Cancel previous on new emission
  • exhaustMap - Ignore new while inner is active
  • mergeAll - Flatten higher-order Observable
  • merge - Merge multiple Observables
  • forkJoin - Wait for all to complete, emit last values