Skip to main content

Overview

combineLatestAll takes an Observable of Observables, waits for the outer Observable to complete, then subscribes to all collected Observables and combines their values using the combineLatest strategy. Every time an inner Observable emits, the output Observable emits the latest values from all inner Observables.
This operator waits for the outer Observable to complete before subscribing to any inner Observables. If you need immediate subscription, consider using mergeAll or switchAll.

Type Signature

export function combineLatestAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
export function combineLatestAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

Parameters

project
(...values: T[]) => R
Optional function to map the most recent values from each inner Observable into a new result. Takes each of the most recent values from each collected inner Observable as arguments, in order.

Returns

OperatorFunction<ObservableInput<T>, T[] | R> - An operator function that returns an Observable that flattens Observables emitted by the source Observable, emitting arrays of the latest values from all inner Observables.

Usage Examples

Basic Example: Combining Multiple Intervals

import { fromEvent, map, interval, take, combineLatestAll } from 'rxjs';

const clicks = fromEvent(document, 'click');
const higherOrder = clicks.pipe(
  map(() => interval(Math.random() * 2000).pipe(take(3))),
  take(2)
);
const result = higherOrder.pipe(combineLatestAll());

result.subscribe(x => console.log(x));
// Output: [2, 0], [2, 1], [2, 2]
// Each inner interval emits at different rates

Real-World Example: Aggregating API Responses

import { of, delay, combineLatestAll } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface UserData {
  profile: any;
  settings: any;
  permissions: any;
}

function loadUserData(userId: string) {
  const apiCalls$ = of(
    ajax.getJSON(`/api/users/${userId}/profile`),
    ajax.getJSON(`/api/users/${userId}/settings`),
    ajax.getJSON(`/api/users/${userId}/permissions`)
  );

  return apiCalls$.pipe(
    combineLatestAll((profile, settings, permissions) => ({
      profile,
      settings,
      permissions
    }))
  );
}

loadUserData('123').subscribe((userData: UserData) => {
  console.log('Complete user data loaded:', userData);
});

Combining Form Field Streams

import { of, fromEvent, map, combineLatestAll } from 'rxjs';

const nameInput = document.getElementById('name') as HTMLInputElement;
const emailInput = document.getElementById('email') as HTMLInputElement;
const ageInput = document.getElementById('age') as HTMLInputElement;

const formFields$ = of(
  fromEvent(nameInput, 'input').pipe(map(e => (e.target as HTMLInputElement).value)),
  fromEvent(emailInput, 'input').pipe(map(e => (e.target as HTMLInputElement).value)),
  fromEvent(ageInput, 'input').pipe(map(e => (e.target as HTMLInputElement).value))
);

formFields$.pipe(
  combineLatestAll((name, email, age) => ({ name, email, age }))
).subscribe(formData => {
  console.log('Form data:', formData);
});

Practical Scenarios

Use combineLatestAll when you have a dynamic number of streams that all need to emit at least once before you start processing their combined values.

Scenario 1: Multi-Source Data Aggregation

Combine data from multiple sensors or data sources where you need the latest reading from all sources:
import { of, interval, map, take, combineLatestAll } from 'rxjs';

const sensors = ['temperature', 'humidity', 'pressure'];

const sensorStreams$ = of(
  ...sensors.map(sensor => 
    interval(1000).pipe(
      map(() => Math.random() * 100),
      take(5)
    )
  )
);

sensorStreams$.pipe(
  combineLatestAll((temp, humidity, pressure) => ({
    temperature: temp,
    humidity: humidity,
    pressure: pressure,
    timestamp: Date.now()
  }))
).subscribe(readings => {
  console.log('Latest sensor readings:', readings);
});

Scenario 2: Parallel Task Completion

Wait for multiple async tasks to complete and combine their results:
import { of, delay, combineLatestAll } from 'rxjs';

function processImages(imageUrls: string[]) {
  const processingTasks$ = of(
    ...imageUrls.map(url => 
      of({ url, processed: true }).pipe(
        delay(Math.random() * 2000)
      )
    )
  );

  return processingTasks$.pipe(
    combineLatestAll((...results) => results)
  );
}

processImages(['img1.jpg', 'img2.jpg', 'img3.jpg'])
  .subscribe(results => {
    console.log('All images processed:', results);
  });

Behavior Details

If the source Observable emits Observables that never complete, combineLatestAll will continue emitting indefinitely. Make sure your inner Observables complete if you need the combined Observable to complete.

Emission Timing

  • Waits for the outer Observable to complete
  • Only then subscribes to all collected inner Observables
  • Emits only after ALL inner Observables have emitted at least once
  • Subsequently emits whenever ANY inner Observable emits

Completion Behavior

  • Completes when all inner Observables complete
  • If any inner Observable errors, the output Observable errors immediately
  • combineLatest - Combines multiple Observables without flattening
  • combineLatestWith - Instance operator version for combining with specific streams
  • mergeAll - Flattens without waiting for all emissions
  • zipAll - Combines by index instead of latest values
  • forkJoin - Waits for all Observables to complete, then emits once