Skip to main content

Overview

concatAll flattens an Observable-of-Observables by subscribing to each inner Observable sequentially, one at a time. It subscribes to the next inner Observable only after the previous one completes. This ensures that values are emitted in strict order.
If the source Observable emits inner Observables quickly and they complete slowly, inner Observables will queue up in memory. This can lead to memory issues with fast-emitting sources.

Type Signature

export function concatAll<O extends ObservableInput<any>>(): OperatorFunction<O, ObservedValueOf<O>>
concatAll is equivalent to mergeAll(1) - it’s mergeAll with a concurrency limit of 1.

Parameters

This operator takes no parameters.

Returns

OperatorFunction<O, ObservedValueOf<O>> - An operator function that returns an Observable emitting values from all the inner Observables concatenated in order.

Usage Examples

Basic Example: Sequential Intervals

import { fromEvent, map, interval, take, concatAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(() => interval(1000).pipe(take(4)))
);
const firstOrder = higherOrder.pipe(concatAll());

firstOrder.subscribe(x => console.log(x));

// Results (not concurrent):
// After first click: 0 -1s-> 1 -1s-> 2 -1s-> 3
// After second click: 0 -1s-> 1 -1s-> 2 -1s-> 3
// Each inner Observable completes before the next starts

Real-World Example: Sequential API Calls

import { from, map, concatAll, delay } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface User {
  id: string;
  name: string;
}

interface UserDetails {
  userId: string;
  profile: any;
  posts: any[];
}

const userIds = ['user1', 'user2', 'user3'];

function fetchUserWithDetails(userId: string) {
  return ajax.getJSON<User>(`/api/users/${userId}`).pipe(
    delay(1000), // Simulate API rate limiting
    map(user => ({
      userId: user.id,
      profile: user,
      posts: []
    }))
  );
}

// Process users sequentially to respect API rate limits
const users$ = from(
  userIds.map(id => fetchUserWithDetails(id))
);

users$.pipe(
  concatAll()
).subscribe((details: UserDetails) => {
  console.log('User details loaded:', details);
});
// Each user is fetched only after the previous completes

Sequential File Processing

import { from, map, concatAll } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface UploadResult {
  filename: string;
  status: 'success' | 'error';
  url?: string;
}

function uploadFile(file: File) {
  const formData = new FormData();
  formData.append('file', file);

  return ajax({
    url: '/api/upload',
    method: 'POST',
    body: formData
  }).pipe(
    map(response => ({
      filename: file.name,
      status: 'success' as const,
      url: response.response.url
    }))
  );
}

function uploadFilesSequentially(files: File[]) {
  return from(files.map(file => uploadFile(file))).pipe(
    concatAll()
  );
}

const filesToUpload = [
  new File(['content1'], 'file1.txt'),
  new File(['content2'], 'file2.txt'),
  new File(['content3'], 'file3.txt')
];

uploadFilesSequentially(filesToUpload).subscribe(
  (result: UploadResult) => console.log('Uploaded:', result.filename),
  err => console.error('Upload failed:', err),
  () => console.log('All files uploaded successfully')
);

Database Migrations

import { from, map, concatAll, tap } from 'rxjs';

interface Migration {
  version: number;
  description: string;
  execute: () => Observable<void>;
}

const migrations: Migration[] = [
  {
    version: 1,
    description: 'Create users table',
    execute: () => runSQL('CREATE TABLE users...')
  },
  {
    version: 2,
    description: 'Add email column',
    execute: () => runSQL('ALTER TABLE users ADD COLUMN email...')
  },
  {
    version: 3,
    description: 'Create indexes',
    execute: () => runSQL('CREATE INDEX idx_users_email...')
  }
];

function runSQL(query: string) {
  return ajax.post('/api/db/execute', { query });
}

function runMigrations(migrations: Migration[]) {
  return from(
    migrations.map(migration => 
      migration.execute().pipe(
        tap(() => console.log(`✓ Migration ${migration.version}: ${migration.description}`))
      )
    )
  ).pipe(
    concatAll()
  );
}

runMigrations(migrations).subscribe({
  complete: () => console.log('All migrations completed successfully')
});

Practical Scenarios

Use concatAll when the order of operations matters and each operation must complete before the next begins, such as sequential API calls, ordered animations, or database transactions.

Scenario 1: Ordered Animation Sequence

import { from, concatAll } from 'rxjs';

function animateElement(element: HTMLElement, animation: string) {
  return new Observable(subscriber => {
    element.style.animation = animation;
    
    const onEnd = () => {
      subscriber.next(animation);
      subscriber.complete();
      element.removeEventListener('animationend', onEnd);
    };
    
    element.addEventListener('animationend', onEnd);
  });
}

const box = document.querySelector('.box') as HTMLElement;

const animations$ = from([
  animateElement(box, 'fadeIn 1s'),
  animateElement(box, 'slideRight 0.5s'),
  animateElement(box, 'bounce 0.3s'),
  animateElement(box, 'fadeOut 1s')
]);

animations$.pipe(concatAll()).subscribe({
  next: anim => console.log('Animation completed:', anim),
  complete: () => console.log('All animations finished')
});

Scenario 2: Queue Processing

import { Subject, concatAll, delay, tap } from 'rxjs';

interface Task {
  id: string;
  execute: () => Observable<any>;
}

class TaskQueue {
  private taskSubject = new Subject<Observable<any>>();
  
  constructor() {
    this.taskSubject.pipe(
      concatAll()
    ).subscribe({
      next: result => console.log('Task completed:', result),
      error: err => console.error('Task failed:', err)
    });
  }
  
  addTask(task: Task) {
    console.log('Adding task to queue:', task.id);
    this.taskSubject.next(
      task.execute().pipe(
        tap(result => console.log(`Task ${task.id} result:`, result))
      )
    );
  }
}

const queue = new TaskQueue();

queue.addTask({
  id: 'task-1',
  execute: () => of('Result 1').pipe(delay(1000))
});

queue.addTask({
  id: 'task-2',
  execute: () => of('Result 2').pipe(delay(500))
});

queue.addTask({
  id: 'task-3',
  execute: () => of('Result 3').pipe(delay(200))
});
// Tasks execute in order: task-1, then task-2, then task-3

Behavior Details

Subscription Strategy

  • Subscribes to inner Observables one at a time, in order
  • Waits for each inner Observable to complete before subscribing to the next
  • Maintains a queue of pending inner Observables

Completion and Errors

If any inner Observable errors, concatAll immediately propagates the error and unsubscribes from the source, leaving remaining inner Observables unprocessed.
import { from, throwError, concatAll, catchError } from 'rxjs';

const source$ = from([
  of(1, 2),
  throwError(() => new Error('Failed')),
  of(3, 4) // This will never be subscribed to
]);

source$.pipe(
  concatAll(),
  catchError(err => {
    console.error('Error caught:', err.message);
    return of('recovered');
  })
).subscribe(console.log);
// Output: 1, 2, recovered

Memory Considerations

  • Inner Observables emitted before the current one completes are buffered
  • With fast-emitting sources, this buffer can grow large
  • Consider using mergeAll(n) if some concurrency is acceptable
  • mergeAll - Flattens with configurable concurrency (immediate by default)
  • switchAll - Flattens but cancels previous inner Observable
  • exhaustAll - Flattens but ignores new inner Observables while one is active
  • concatMap - Maps and flattens sequentially in one operator
  • concatWith - Concatenates specific Observables after the source completes