Skip to main content

Overview

zipWith subscribes to the source Observable and all provided Observables, combining their values by index into arrays. It waits for all sources to emit at each index before emitting the combined array. The resulting Observable completes when any source completes.
Values from faster-emitting Observables are buffered while waiting for slower ones. This can cause memory issues if emission rates differ significantly.

Type Signature

export function zipWith<T, A extends readonly unknown[]>(
  ...otherInputs: [...ObservableInputTuple<A>]
): OperatorFunction<T, Cons<T, A>>

Parameters

otherInputs
ObservableInputTuple<A>
required
One or more Observable inputs to zip with the source Observable. Values are combined by their emission index.

Returns

OperatorFunction<T, Cons<T, A>> - An operator function that returns an Observable emitting arrays where each array contains values from all sources at the same index.

Usage Examples

Basic Example: Pairing Timer Values

import { interval, take, zipWith } from 'rxjs';

const source$ = interval(1000).pipe(take(3));
const other1$ = interval(1500).pipe(take(3));
const other2$ = interval(500).pipe(take(3));

source$.pipe(
  zipWith(other1$, other2$)
).subscribe(console.log);

// Output (waits for all at each index):
// [0, 0, 0] (after ~1500ms)
// [1, 1, 1] (after ~3000ms)
// [2, 2, 2] (after ~4500ms)

Real-World Example: Synchronized Data Loading

import { from, zipWith, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface User {
  id: string;
  name: string;
}

interface Post {
  id: string;
  userId: string;
  title: string;
}

interface Comment {
  id: string;
  postId: string;
  text: string;
}

interface EnrichedPost {
  post: Post;
  author: User;
  commentCount: number;
}

const userIds = ['user1', 'user2', 'user3'];
const postIds = ['post1', 'post2', 'post3'];
const commentCounts = [5, 12, 3];

function loadPosts(): Observable<Post> {
  return from(postIds).pipe(
    map(id => ajax.getJSON<Post>(`/api/posts/${id}`)),
    concatAll()
  );
}

function loadUsers(): Observable<User> {
  return from(userIds).pipe(
    map(id => ajax.getJSON<User>(`/api/users/${id}`)),
    concatAll()
  );
}

function getCommentCounts(): Observable<number> {
  return from(commentCounts);
}

loadPosts().pipe(
  zipWith(loadUsers(), getCommentCounts()),
  map(([post, author, comments]) => ({
    post,
    author,
    commentCount: comments
  }))
).subscribe((enrichedPost: EnrichedPost) => {
  console.log('Post with metadata:', enrichedPost);
  displayEnrichedPost(enrichedPost);
});

CSV Column Alignment

import { from, zipWith, map } from 'rxjs';

interface CSVRow {
  name: string;
  age: number;
  email: string;
}

const names$ = from(['Alice', 'Bob', 'Charlie', 'Diana']);
const ages$ = from([30, 25, 35, 28]);
const emails$ = from(['alice@example.com', 'bob@example.com', 'charlie@example.com', 'diana@example.com']);

names$.pipe(
  zipWith(ages$, emails$),
  map(([name, age, email]) => ({ name, age, email }))
).subscribe((row: CSVRow) => {
  console.log('CSV Row:', row);
  appendToTable(row);
});

// Output:
// CSV Row: { name: 'Alice', age: 30, email: 'alice@example.com' }
// CSV Row: { name: 'Bob', age: 25, email: 'bob@example.com' }
// CSV Row: { name: 'Charlie', age: 35, email: 'charlie@example.com' }
// CSV Row: { name: 'Diana', age: 28, email: 'diana@example.com' }

Synchronized Video Subtitles

import { interval, zipWith, map, take } from 'rxjs';

interface VideoFrame {
  frame: number;
  timestamp: number;
}

interface Subtitle {
  text: string;
  startTime: number;
}

interface SyncedFrame {
  frame: VideoFrame;
  subtitle: Subtitle;
}

const videoFrames$ = interval(1000 / 30).pipe( // 30 FPS
  take(90), // 3 seconds
  map(frame => ({
    frame,
    timestamp: frame * (1000 / 30)
  }))
);

const subtitles$ = interval(1000).pipe(
  take(3),
  map(index => ({
    text: `Subtitle ${index + 1}`,
    startTime: index * 1000
  }))
);

// Pair each second of video with its subtitle
subtitles$.pipe(
  zipWith(videoFrames$),
  map(([subtitle, frame]) => ({ frame, subtitle }))
).subscribe((syncedFrame: SyncedFrame) => {
  displaySubtitle(syncedFrame.subtitle.text, syncedFrame.frame.timestamp);
});

Practical Scenarios

In many cases, combineLatestWith is more appropriate than zipWith. Use zipWith only when you specifically need to pair values by index, not by time.

Scenario 1: Test Results Pairing

import { from, zipWith, map } from 'rxjs';

interface TestResult {
  testName: string;
  expected: any;
  actual: any;
  passed: boolean;
}

const testNames$ = from([
  'User Login',
  'Data Fetch',
  'Form Validation',
  'API Response'
]);

const expectedResults$ = from([true, { data: 'ok' }, true, 200]);
const actualResults$ = from([true, { data: 'ok' }, false, 200]);

testNames$.pipe(
  zipWith(expectedResults$, actualResults$),
  map(([name, expected, actual]) => ({
    testName: name,
    expected,
    actual,
    passed: JSON.stringify(expected) === JSON.stringify(actual)
  }))
).subscribe((result: TestResult) => {
  console.log(
    `${result.passed ? '✓' : '✗'} ${result.testName}:`,
    `Expected: ${JSON.stringify(result.expected)},`,
    `Got: ${JSON.stringify(result.actual)}`
  );
});

Scenario 2: Multi-Language Content Assembly

import { from, zipWith, map } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface MultilingualPage {
  id: string;
  title: { en: string; es: string; fr: string };
  content: { en: string; es: string; fr: string };
}

const pageIds = ['page1', 'page2', 'page3'];

const englishContent$ = from(pageIds).pipe(
  map(id => ajax.getJSON(`/api/content/${id}/en`)),
  concatAll()
);

const spanishContent$ = from(pageIds).pipe(
  map(id => ajax.getJSON(`/api/content/${id}/es`)),
  concatAll()
);

const frenchContent$ = from(pageIds).pipe(
  map(id => ajax.getJSON(`/api/content/${id}/fr`)),
  concatAll()
);

englishContent$.pipe(
  zipWith(spanishContent$, frenchContent$),
  map(([en, es, fr], index) => ({
    id: pageIds[index],
    title: {
      en: en.title,
      es: es.title,
      fr: fr.title
    },
    content: {
      en: en.content,
      es: es.content,
      fr: fr.content
    }
  }))
).subscribe((page: MultilingualPage) => {
  console.log('Multilingual page ready:', page.id);
  storePage(page);
});

Scenario 3: Race Results with Timestamps

import { from, zipWith, map } from 'rxjs';

interface RaceResult {
  runner: string;
  finishTime: number;
  position: number;
}

const runners$ = from(['Alice', 'Bob', 'Charlie', 'Diana', 'Eve']);
const finishTimes$ = from([125.3, 128.7, 126.1, 130.2, 127.5]);
const positions$ = from([1, 4, 2, 5, 3]);

runners$.pipe(
  zipWith(finishTimes$, positions$),
  map(([runner, time, position]) => ({
    runner,
    finishTime: time,
    position
  }))
).subscribe((result: RaceResult) => {
  console.log(`Position ${result.position}: ${result.runner} - ${result.finishTime}s`);
});

// Output:
// Position 1: Alice - 125.3s
// Position 4: Bob - 128.7s
// Position 2: Charlie - 126.1s
// Position 5: Diana - 130.2s
// Position 3: Eve - 127.5s

Scenario 4: Batch Data Processing Pipeline

import { from, zipWith, map } from 'rxjs';

interface RawData {
  id: string;
  value: number;
}

interface ProcessedData {
  id: string;
  rawValue: number;
  normalized: number;
  validated: boolean;
}

function normalizeData(data: RawData[]): Observable<number> {
  return from(data).pipe(
    map(item => item.value / 100)
  );
}

function validateData(data: RawData[]): Observable<boolean> {
  return from(data).pipe(
    map(item => item.value > 0 && item.value < 1000)
  );
}

const rawData: RawData[] = [
  { id: 'A', value: 150 },
  { id: 'B', value: 250 },
  { id: 'C', value: 350 }
];

from(rawData).pipe(
  zipWith(normalizeData(rawData), validateData(rawData)),
  map(([raw, normalized, valid]) => ({
    id: raw.id,
    rawValue: raw.value,
    normalized,
    validated: valid
  }))
).subscribe((processed: ProcessedData) => {
  console.log('Processed:', processed);
  if (processed.validated) {
    saveToDatabase(processed);
  }
});

Behavior Details

Buffering Behavior

Fast-emitting Observables buffer values while waiting for slow ones. With significantly different emission rates, this can cause high memory usage.
import { interval, take, zipWith } from 'rxjs';

const fast$ = interval(100).pipe(take(100)); // Emits 100 values quickly
const slow$ = interval(5000).pipe(take(3));  // Emits 3 values slowly

fast$.pipe(
  zipWith(slow$)
).subscribe(console.log);

// fast$ emits 0-99 quickly, but they're buffered
// Output only when slow$ emits:
// [0, 0] after 5s
// [1, 1] after 10s
// [2, 2] after 15s
// Completes (because slow$ completes)
// Values 3-99 from fast$ are discarded

Completion Behavior

  • Completes when any source Observable completes
  • Remaining buffered values from other sources are discarded
  • If any source errors, the error propagates immediately
import { of, zipWith, delay } from 'rxjs';

const short$ = of(1, 2, 3);
const long$ = of('a', 'b', 'c', 'd', 'e', 'f').pipe(delay(100));

short$.pipe(
  zipWith(long$)
).subscribe({
  next: console.log,
  complete: () => console.log('Complete!')
});

// Output:
// [1, 'a']
// [2, 'b']
// [3, 'c']
// Complete!
// 'd', 'e', 'f' are never emitted

Comparison with Other Operators

OperatorCombination StrategyCompletion
zipWithBy index (paired)When any completes
combineLatestWithLatest valuesWhen all complete
withLatestFromLatest (source-driven)When source completes
mergeWithNo combinationWhen all complete
  • zip - Static creation operator for zipping Observables
  • zipAll - Zips inner Observables after source completes
  • combineLatestWith - Combines latest values instead of by index
  • withLatestFrom - Samples other streams when source emits
  • forkJoin - Waits for all to complete, emits final values