Skip to main content

Overview

zipAll waits for the source Observable to complete, then subscribes to all collected inner Observables and combines their values by index, emitting arrays. It completes when any inner Observable completes.
Use with caution on infinite or very long Observables. zipAll buffers all values until corresponding indices are available, which can cause memory issues if streams emit at very different rates.

Type Signature

export function zipAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
export function zipAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

Parameters

project
(...values: T[]) => R
Optional projection function to transform arrays of zipped values. Receives values from all inner Observables at the same index.

Returns

OperatorFunction<ObservableInput<T>, T[] | R> - An operator function that returns an Observable emitting arrays (or projected values) of values from all inner Observables combined by index.

Usage Examples

Basic Example: Zipping Multiple Intervals

import { of, interval, take, zipAll } from 'rxjs';

const source$ = of(
  interval(1000).pipe(take(3)),  // 0, 1, 2
  interval(700).pipe(take(3)),   // 0, 1, 2
  interval(500).pipe(take(3))    // 0, 1, 2
);

source$.pipe(zipAll()).subscribe(console.log);

// Output (pairing by index):
// [0, 0, 0] (after ~1000ms - waiting for slowest)
// [1, 1, 1] (after ~2000ms)
// [2, 2, 2] (after ~3000ms)
// Then completes

Real-World Example: Parallel API Calls with Ordered Results

import { of, zipAll, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface UserData {
  profile: any;
  posts: any[];
  followers: any[];
}

function fetchUserData(userId: string): Observable<UserData> {
  const requests$ = of(
    ajax.getJSON(`/api/users/${userId}/profile`),
    ajax.getJSON(`/api/users/${userId}/posts`),
    ajax.getJSON(`/api/users/${userId}/followers`)
  );

  return requests$.pipe(
    zipAll((profile, posts, followers) => ({
      profile,
      posts,
      followers
    }))
  );
}

fetchUserData('user123').subscribe((userData: UserData) => {
  console.log('User data loaded:', userData);
  displayUserProfile(userData);
});

Synchronized Animation Sequences

import { of, interval, take, map, zipAll } from 'rxjs';

interface AnimationFrame {
  element: string;
  frame: number;
  transform: string;
}

function animateElement(elementId: string, duration: number, frames: number) {
  return interval(duration / frames).pipe(
    take(frames),
    map(frame => ({
      element: elementId,
      frame,
      transform: `translateX(${(frame / frames) * 100}px)`
    }))
  );
}

const animations$ = of(
  animateElement('box1', 2000, 10),
  animateElement('box2', 2000, 10),
  animateElement('box3', 2000, 10)
);

animations$.pipe(
  zipAll()
).subscribe((frames: AnimationFrame[]) => {
  // All elements animate in sync
  frames.forEach(frame => {
    const element = document.getElementById(frame.element);
    if (element) {
      element.style.transform = frame.transform;
    }
  });
});

Multi-Step Form Wizard

import { of, zipAll, map } from 'rxjs';

interface FormStep {
  stepNumber: number;
  data: any;
  isValid: boolean;
}

interface CompleteFormData {
  personalInfo: any;
  address: any;
  payment: any;
}

function submitFormStep(stepNumber: number): Observable<FormStep> {
  return new Observable(subscriber => {
    const stepData = getFormStepData(stepNumber);
    
    ajax.post(`/api/form/step/${stepNumber}`, stepData).subscribe(
      response => {
        subscriber.next({
          stepNumber,
          data: response.response,
          isValid: true
        });
        subscriber.complete();
      },
      err => subscriber.error(err)
    );
  });
}

function submitCompleteForm() {
  const steps$ = of(
    submitFormStep(1),
    submitFormStep(2),
    submitFormStep(3)
  );

  return steps$.pipe(
    zipAll((personal, address, payment) => ({
      personalInfo: personal.data,
      address: address.data,
      payment: payment.data
    }))
  );
}

submitCompleteForm().subscribe({
  next: (formData: CompleteFormData) => {
    console.log('All form steps completed:', formData);
    showSuccessMessage();
  },
  error: err => {
    console.error('Form submission failed:', err);
    showErrorMessage(err.message);
  }
});

Practical Scenarios

zipAll completes when the shortest inner Observable completes. If you need all values from all streams, ensure they emit the same number of values.

Scenario 1: Synchronized Data Processing

import { of, from, map, zipAll, delay } from 'rxjs';

interface ProcessedData {
  stage1: string;
  stage2: string;
  stage3: string;
}

function processInStages(data: string[]) {
  const stage1$ = from(data).pipe(
    map(item => `Stage1: ${item}`),
    delay(100)
  );

  const stage2$ = from(data).pipe(
    map(item => `Stage2: ${item}`),
    delay(200)
  );

  const stage3$ = from(data).pipe(
    map(item => `Stage3: ${item}`),
    delay(300)
  );

  return of(stage1$, stage2$, stage3$).pipe(
    zipAll((s1, s2, s3) => ({
      stage1: s1,
      stage2: s2,
      stage3: s3
    }))
  );
}

processInStages(['item1', 'item2', 'item3']).subscribe(
  (processed: ProcessedData) => {
    console.log('Processed:', processed);
  }
);

Scenario 2: Multi-Language Translation

import { of, zipAll, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface Translation {
  language: string;
  text: string;
}

interface MultilingualContent {
  en: string;
  es: string;
  fr: string;
  de: string;
}

function translateText(text: string, languages: string[]): Observable<MultilingualContent> {
  const translations$ = of(
    ...languages.map(lang => 
      ajax.post<Translation>('/api/translate', { text, targetLang: lang }).pipe(
        map(response => response.response.text)
      )
    )
  );

  return translations$.pipe(
    zipAll((...translations) => {
      const result: any = {};
      languages.forEach((lang, index) => {
        result[lang] = translations[index];
      });
      return result as MultilingualContent;
    })
  );
}

translateText('Hello, World!', ['en', 'es', 'fr', 'de']).subscribe(
  (translations: MultilingualContent) => {
    console.log('Translations:', translations);
  }
);

Scenario 3: Batch Image Processing

import { of, zipAll, map } from 'rxjs';

interface ImageProcessingResult {
  original: string;
  thumbnail: string;
  compressed: string;
}

function processImage(file: File): Observable<ImageProcessingResult> {
  const operations$ = of(
    uploadOriginal(file),
    generateThumbnail(file),
    compressImage(file)
  );

  return operations$.pipe(
    zipAll((original, thumbnail, compressed) => ({
      original,
      thumbnail,
      compressed
    }))
  );
}

function uploadOriginal(file: File): Observable<string> {
  return ajax.post('/api/images/upload', file).pipe(
    map(response => response.response.url)
  );
}

function generateThumbnail(file: File): Observable<string> {
  return ajax.post('/api/images/thumbnail', file).pipe(
    map(response => response.response.url)
  );
}

function compressImage(file: File): Observable<string> {
  return ajax.post('/api/images/compress', file).pipe(
    map(response => response.response.url)
  );
}

const fileInput = document.getElementById('file') as HTMLInputElement;
const selectedFile = fileInput.files![0];

processImage(selectedFile).subscribe({
  next: (result: ImageProcessingResult) => {
    console.log('Image processing complete:', result);
    displayProcessedImages(result);
  },
  error: err => console.error('Processing failed:', err)
});

Behavior Details

Buffering and Memory

If inner Observables emit at very different rates, zipAll will buffer faster emissions waiting for slower ones. This can cause memory issues with long-running or infinite streams.
import { of, interval, take, zipAll } from 'rxjs';

const source$ = of(
  interval(100).pipe(take(100)),  // Fast: emits 100 values quickly
  interval(5000).pipe(take(3))    // Slow: emits 3 values slowly
);

source$.pipe(zipAll()).subscribe(console.log);

// The fast stream's values are buffered waiting for the slow stream
// Memory usage grows as unmatched values accumulate

Completion Behavior

  • Waits for source Observable to complete before subscribing to inner Observables
  • Emits arrays of values paired by index
  • Completes when any inner Observable completes
  • If any inner Observable errors, the error propagates immediately

Comparison with Similar Operators

OperatorPairing StrategyCompletion
zipAllBy indexWhen shortest completes
combineLatestAllLatest valuesWhen all complete
forkJoinFinal values onlyWhen all complete
mergeAllNo pairingWhen all complete
  • zip - Static creation operator for zipping Observables
  • zipWith - Instance operator for zipping with specific streams
  • combineLatestAll - Combines latest values instead of by index
  • forkJoin - Waits for all to complete, emits final values once
  • mergeAll - Flattens without pairing values