Skip to main content

Overview

concatWith emits all values from the source Observable, and once it completes, subscribes to each provided Observable in sequence, emitting their values. It waits for each Observable to complete before moving to the next one.
concat(a$, b$, c$) is equivalent to a$.pipe(concatWith(b$, c$)). The operator version is more convenient for chaining.

Type Signature

export function concatWith<T, A extends readonly unknown[]>(
  ...otherSources: [...ObservableInputTuple<A>]
): OperatorFunction<T, T | A[number]>

Parameters

otherSources
ObservableInputTuple<A>
required
One or more Observable sources to subscribe to sequentially after the source Observable completes. Each will be subscribed to only after the previous completes.

Returns

OperatorFunction<T, T | A[number]> - An operator function that returns an Observable concatenating subscriptions to the source and provided Observables, subscribing to the next only once the current subscription completes.

Usage Examples

Basic Example: Mouse Click then Mouse Moves

import { fromEvent, map, take, concatWith } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
const moves$ = fromEvent(document, 'mousemove');

clicks$.pipe(
  map(() => 'click'),
  take(1),
  concatWith(
    moves$.pipe(
      map(() => 'move')
    )
  )
).subscribe(x => console.log(x));

// Output:
// 'click'
// 'move'
// 'move'
// 'move'
// ...

Real-World Example: Multi-Step Onboarding Flow

import { fromEvent, map, take, concatWith, delay } from 'rxjs';

interface OnboardingStep {
  step: number;
  title: string;
  message: string;
}

function showOnboardingStep(step: OnboardingStep) {
  return new Observable<string>(subscriber => {
    const dialog = document.createElement('div');
    dialog.innerHTML = `
      <h2>${step.title}</h2>
      <p>${step.message}</p>
      <button id="next-${step.step}">Next</button>
    `;
    document.body.appendChild(dialog);

    const button = document.getElementById(`next-${step.step}`);
    fromEvent(button!, 'click').pipe(take(1)).subscribe(() => {
      document.body.removeChild(dialog);
      subscriber.next(`Step ${step.step} completed`);
      subscriber.complete();
    });
  });
}

const step1$ = showOnboardingStep({
  step: 1,
  title: 'Welcome!',
  message: 'Let\'s get you started'
});

const step2$ = showOnboardingStep({
  step: 2,
  title: 'Profile Setup',
  message: 'Tell us about yourself'
});

const step3$ = showOnboardingStep({
  step: 3,
  title: 'All Set!',
  message: 'You\'re ready to go'
});

step1$.pipe(
  concatWith(step2$, step3$)
).subscribe({
  next: msg => console.log(msg),
  complete: () => console.log('Onboarding completed!')
});

Sequential API Operations

import { from, concatWith, tap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface CreateUserRequest {
  name: string;
  email: string;
}

interface User {
  id: string;
  name: string;
  email: string;
}

function createUser(userData: CreateUserRequest) {
  return ajax.post<User>('/api/users', userData).pipe(
    tap(response => console.log('User created:', response.response.id))
  );
}

function sendWelcomeEmail(userId: string) {
  return ajax.post('/api/emails/welcome', { userId }).pipe(
    tap(() => console.log('Welcome email sent'))
  );
}

function assignDefaultPermissions(userId: string) {
  return ajax.post('/api/permissions/assign-defaults', { userId }).pipe(
    tap(() => console.log('Default permissions assigned'))
  );
}

function setupNewUser(userData: CreateUserRequest) {
  let userId: string;

  return createUser(userData).pipe(
    tap(response => userId = response.response.id),
    concatWith(
      sendWelcomeEmail(userId),
      assignDefaultPermissions(userId)
    )
  );
}

setupNewUser({
  name: 'John Doe',
  email: 'john@example.com'
}).subscribe({
  complete: () => console.log('User setup complete')
});

Video Player Playlist

import { fromEvent, map, take, concatWith, defer } from 'rxjs';

interface Video {
  id: string;
  title: string;
  url: string;
  duration: number;
}

function playVideo(video: Video) {
  return new Observable<string>(subscriber => {
    console.log(`Playing: ${video.title}`);
    const videoElement = document.querySelector('video') as HTMLVideoElement;
    videoElement.src = video.url;
    videoElement.play();

    const onEnded = () => {
      console.log(`Finished: ${video.title}`);
      subscriber.next(`Completed: ${video.title}`);
      subscriber.complete();
      videoElement.removeEventListener('ended', onEnded);
    };

    videoElement.addEventListener('ended', onEnded);

    return () => {
      videoElement.pause();
      videoElement.removeEventListener('ended', onEnded);
    };
  });
}

const playlist: Video[] = [
  { id: '1', title: 'Intro', url: '/videos/intro.mp4', duration: 30 },
  { id: '2', title: 'Tutorial 1', url: '/videos/tutorial1.mp4', duration: 300 },
  { id: '3', title: 'Tutorial 2', url: '/videos/tutorial2.mp4', duration: 450 },
];

const [first, ...rest] = playlist;

playVideo(first).pipe(
  concatWith(...rest.map(video => playVideo(video)))
).subscribe({
  next: msg => console.log(msg),
  complete: () => console.log('Playlist finished')
});

Practical Scenarios

Use concatWith when you need to ensure a specific sequence of operations completes in order, such as initialization sequences, sequential workflows, or ordered event handlers.

Scenario 1: Application Initialization

import { of, delay, concatWith, tap } from 'rxjs';

function loadConfiguration() {
  return ajax.getJSON('/api/config').pipe(
    delay(500),
    tap(config => console.log('Configuration loaded:', config))
  );
}

function authenticateUser() {
  return ajax.post('/api/auth/validate', {}).pipe(
    delay(300),
    tap(response => console.log('User authenticated:', response))
  );
}

function loadUserPreferences() {
  return ajax.getJSON('/api/user/preferences').pipe(
    delay(200),
    tap(prefs => console.log('Preferences loaded:', prefs))
  );
}

function initializeWebSocket() {
  return new Observable(subscriber => {
    console.log('WebSocket connecting...');
    const ws = new WebSocket('ws://localhost:8080');
    ws.onopen = () => {
      console.log('WebSocket connected');
      subscriber.next('connected');
      subscriber.complete();
    };
  });
}

loadConfiguration().pipe(
  concatWith(
    authenticateUser(),
    loadUserPreferences(),
    initializeWebSocket()
  )
).subscribe({
  complete: () => {
    console.log('Application initialized successfully');
    showMainUI();
  }
});

Scenario 2: Form Submission with Confirmation

import { fromEvent, take, concatWith, switchMap, tap } from 'rxjs';

const submitButton = document.getElementById('submit') as HTMLButtonElement;
const confirmButton = document.getElementById('confirm') as HTMLButtonElement;

function showConfirmationDialog() {
  return new Observable<boolean>(subscriber => {
    const dialog = document.getElementById('confirmation-dialog')!;
    dialog.style.display = 'block';

    const confirmClick = fromEvent(confirmButton, 'click').pipe(
      take(1),
      tap(() => dialog.style.display = 'none')
    );

    confirmClick.subscribe(() => {
      subscriber.next(true);
      subscriber.complete();
    });
  });
}

function submitForm(data: any) {
  return ajax.post('/api/form/submit', data).pipe(
    tap(() => console.log('Form submitted successfully'))
  );
}

function showSuccessMessage() {
  return of('Success!').pipe(
    tap(msg => {
      const alert = document.createElement('div');
      alert.textContent = msg;
      document.body.appendChild(alert);
      setTimeout(() => document.body.removeChild(alert), 3000);
    })
  );
}

fromEvent(submitButton, 'click').pipe(
  switchMap(() => 
    showConfirmationDialog().pipe(
      concatWith(
        submitForm({ data: 'form data' }),
        showSuccessMessage()
      )
    )
  )
).subscribe();

Behavior Details

Subscription Timing

  • The source Observable runs to completion first
  • Each additional Observable is subscribed to only after the previous one completes
  • Values are emitted in the order they occur across all Observables

Completion and Error Handling

If any Observable in the sequence errors, the error is immediately propagated and subsequent Observables are not subscribed to.
import { of, throwError, concatWith, catchError } from 'rxjs';

const source$ = of(1, 2, 3);
const error$ = throwError(() => new Error('Failed'));
const never$ = of(4, 5, 6); // Never subscribed to

source$.pipe(
  concatWith(error$, never$),
  catchError(err => {
    console.error('Error caught:', err.message);
    return of('recovered');
  })
).subscribe(console.log);

// Output: 1, 2, 3, recovered

Comparison with Other Operators

OperatorBehavior
concatWithSubscribes sequentially, waits for each to complete
mergeWithSubscribes to all immediately, emits concurrently
switchMapCancels previous when new arrives
raceTakes the first to emit, unsubscribes from others
  • concat - Static creation operator for concatenating Observables
  • concatAll - Flattens a higher-order Observable sequentially
  • concatMap - Maps and concatenates in one operator
  • mergeWith - Merges Observables concurrently instead of sequentially
  • startWith - Prepends values before source emissions
  • endWith - Appends values after source completes