Overview
zipWith subscribes to the source Observable and all provided Observables, combining their values by index into arrays. It waits for all sources to emit at each index before emitting the combined array. The resulting Observable completes when any source completes.
Values from faster-emitting Observables are buffered while waiting for slower ones. This can cause memory issues if emission rates differ significantly.
Type Signature
export function zipWith < T , A extends readonly unknown []>(
... otherInputs : [ ... ObservableInputTuple < A >]
) : OperatorFunction < T , Cons < T , A >>
Parameters
otherInputs
ObservableInputTuple<A>
required
One or more Observable inputs to zip with the source Observable. Values are combined by their emission index.
Returns
OperatorFunction<T, Cons<T, A>> - An operator function that returns an Observable emitting arrays where each array contains values from all sources at the same index.
Usage Examples
Basic Example: Pairing Timer Values
Basic Usage
With Transformation
import { interval , take , zipWith } from 'rxjs' ;
const source$ = interval ( 1000 ). pipe ( take ( 3 ));
const other1$ = interval ( 1500 ). pipe ( take ( 3 ));
const other2$ = interval ( 500 ). pipe ( take ( 3 ));
source$ . pipe (
zipWith ( other1$ , other2$ )
). subscribe ( console . log );
// Output (waits for all at each index):
// [0, 0, 0] (after ~1500ms)
// [1, 1, 1] (after ~3000ms)
// [2, 2, 2] (after ~4500ms)
Real-World Example: Synchronized Data Loading
import { from , zipWith , map } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface User {
id : string ;
name : string ;
}
interface Post {
id : string ;
userId : string ;
title : string ;
}
interface Comment {
id : string ;
postId : string ;
text : string ;
}
interface EnrichedPost {
post : Post ;
author : User ;
commentCount : number ;
}
const userIds = [ 'user1' , 'user2' , 'user3' ];
const postIds = [ 'post1' , 'post2' , 'post3' ];
const commentCounts = [ 5 , 12 , 3 ];
function loadPosts () : Observable < Post > {
return from ( postIds ). pipe (
map ( id => ajax . getJSON < Post >( `/api/posts/ ${ id } ` )),
concatAll ()
);
}
function loadUsers () : Observable < User > {
return from ( userIds ). pipe (
map ( id => ajax . getJSON < User >( `/api/users/ ${ id } ` )),
concatAll ()
);
}
function getCommentCounts () : Observable < number > {
return from ( commentCounts );
}
loadPosts (). pipe (
zipWith ( loadUsers (), getCommentCounts ()),
map (([ post , author , comments ]) => ({
post ,
author ,
commentCount: comments
}))
). subscribe (( enrichedPost : EnrichedPost ) => {
console . log ( 'Post with metadata:' , enrichedPost );
displayEnrichedPost ( enrichedPost );
});
CSV Column Alignment
import { from , zipWith , map } from 'rxjs' ;
interface CSVRow {
name : string ;
age : number ;
email : string ;
}
const names$ = from ([ 'Alice' , 'Bob' , 'Charlie' , 'Diana' ]);
const ages$ = from ([ 30 , 25 , 35 , 28 ]);
const emails$ = from ([ 'alice@example.com' , 'bob@example.com' , 'charlie@example.com' , 'diana@example.com' ]);
names$ . pipe (
zipWith ( ages$ , emails$ ),
map (([ name , age , email ]) => ({ name , age , email }))
). subscribe (( row : CSVRow ) => {
console . log ( 'CSV Row:' , row );
appendToTable ( row );
});
// Output:
// CSV Row: { name: 'Alice', age: 30, email: 'alice@example.com' }
// CSV Row: { name: 'Bob', age: 25, email: 'bob@example.com' }
// CSV Row: { name: 'Charlie', age: 35, email: 'charlie@example.com' }
// CSV Row: { name: 'Diana', age: 28, email: 'diana@example.com' }
Synchronized Video Subtitles
import { interval , zipWith , map , take } from 'rxjs' ;
interface VideoFrame {
frame : number ;
timestamp : number ;
}
interface Subtitle {
text : string ;
startTime : number ;
}
interface SyncedFrame {
frame : VideoFrame ;
subtitle : Subtitle ;
}
const videoFrames$ = interval ( 1000 / 30 ). pipe ( // 30 FPS
take ( 90 ), // 3 seconds
map ( frame => ({
frame ,
timestamp: frame * ( 1000 / 30 )
}))
);
const subtitles$ = interval ( 1000 ). pipe (
take ( 3 ),
map ( index => ({
text: `Subtitle ${ index + 1 } ` ,
startTime: index * 1000
}))
);
// Pair each second of video with its subtitle
subtitles$ . pipe (
zipWith ( videoFrames$ ),
map (([ subtitle , frame ]) => ({ frame , subtitle }))
). subscribe (( syncedFrame : SyncedFrame ) => {
displaySubtitle ( syncedFrame . subtitle . text , syncedFrame . frame . timestamp );
});
Practical Scenarios
In many cases, combineLatestWith is more appropriate than zipWith. Use zipWith only when you specifically need to pair values by index, not by time.
Scenario 1: Test Results Pairing
import { from , zipWith , map } from 'rxjs' ;
interface TestResult {
testName : string ;
expected : any ;
actual : any ;
passed : boolean ;
}
const testNames$ = from ([
'User Login' ,
'Data Fetch' ,
'Form Validation' ,
'API Response'
]);
const expectedResults$ = from ([ true , { data: 'ok' }, true , 200 ]);
const actualResults$ = from ([ true , { data: 'ok' }, false , 200 ]);
testNames$ . pipe (
zipWith ( expectedResults$ , actualResults$ ),
map (([ name , expected , actual ]) => ({
testName: name ,
expected ,
actual ,
passed: JSON . stringify ( expected ) === JSON . stringify ( actual )
}))
). subscribe (( result : TestResult ) => {
console . log (
` ${ result . passed ? '✓' : '✗' } ${ result . testName } :` ,
`Expected: ${ JSON . stringify ( result . expected ) } ,` ,
`Got: ${ JSON . stringify ( result . actual ) } `
);
});
Scenario 2: Multi-Language Content Assembly
import { from , zipWith , map } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface MultilingualPage {
id : string ;
title : { en : string ; es : string ; fr : string };
content : { en : string ; es : string ; fr : string };
}
const pageIds = [ 'page1' , 'page2' , 'page3' ];
const englishContent$ = from ( pageIds ). pipe (
map ( id => ajax . getJSON ( `/api/content/ ${ id } /en` )),
concatAll ()
);
const spanishContent$ = from ( pageIds ). pipe (
map ( id => ajax . getJSON ( `/api/content/ ${ id } /es` )),
concatAll ()
);
const frenchContent$ = from ( pageIds ). pipe (
map ( id => ajax . getJSON ( `/api/content/ ${ id } /fr` )),
concatAll ()
);
englishContent$ . pipe (
zipWith ( spanishContent$ , frenchContent$ ),
map (([ en , es , fr ], index ) => ({
id: pageIds [ index ],
title: {
en: en . title ,
es: es . title ,
fr: fr . title
},
content: {
en: en . content ,
es: es . content ,
fr: fr . content
}
}))
). subscribe (( page : MultilingualPage ) => {
console . log ( 'Multilingual page ready:' , page . id );
storePage ( page );
});
Scenario 3: Race Results with Timestamps
import { from , zipWith , map } from 'rxjs' ;
interface RaceResult {
runner : string ;
finishTime : number ;
position : number ;
}
const runners$ = from ([ 'Alice' , 'Bob' , 'Charlie' , 'Diana' , 'Eve' ]);
const finishTimes$ = from ([ 125.3 , 128.7 , 126.1 , 130.2 , 127.5 ]);
const positions$ = from ([ 1 , 4 , 2 , 5 , 3 ]);
runners$ . pipe (
zipWith ( finishTimes$ , positions$ ),
map (([ runner , time , position ]) => ({
runner ,
finishTime: time ,
position
}))
). subscribe (( result : RaceResult ) => {
console . log ( `Position ${ result . position } : ${ result . runner } - ${ result . finishTime } s` );
});
// Output:
// Position 1: Alice - 125.3s
// Position 4: Bob - 128.7s
// Position 2: Charlie - 126.1s
// Position 5: Diana - 130.2s
// Position 3: Eve - 127.5s
Scenario 4: Batch Data Processing Pipeline
import { from , zipWith , map } from 'rxjs' ;
interface RawData {
id : string ;
value : number ;
}
interface ProcessedData {
id : string ;
rawValue : number ;
normalized : number ;
validated : boolean ;
}
function normalizeData ( data : RawData []) : Observable < number > {
return from ( data ). pipe (
map ( item => item . value / 100 )
);
}
function validateData ( data : RawData []) : Observable < boolean > {
return from ( data ). pipe (
map ( item => item . value > 0 && item . value < 1000 )
);
}
const rawData : RawData [] = [
{ id: 'A' , value: 150 },
{ id: 'B' , value: 250 },
{ id: 'C' , value: 350 }
];
from ( rawData ). pipe (
zipWith ( normalizeData ( rawData ), validateData ( rawData )),
map (([ raw , normalized , valid ]) => ({
id: raw . id ,
rawValue: raw . value ,
normalized ,
validated: valid
}))
). subscribe (( processed : ProcessedData ) => {
console . log ( 'Processed:' , processed );
if ( processed . validated ) {
saveToDatabase ( processed );
}
});
Behavior Details
Buffering Behavior
Fast-emitting Observables buffer values while waiting for slow ones. With significantly different emission rates, this can cause high memory usage.
import { interval , take , zipWith } from 'rxjs' ;
const fast$ = interval ( 100 ). pipe ( take ( 100 )); // Emits 100 values quickly
const slow$ = interval ( 5000 ). pipe ( take ( 3 )); // Emits 3 values slowly
fast$ . pipe (
zipWith ( slow$ )
). subscribe ( console . log );
// fast$ emits 0-99 quickly, but they're buffered
// Output only when slow$ emits:
// [0, 0] after 5s
// [1, 1] after 10s
// [2, 2] after 15s
// Completes (because slow$ completes)
// Values 3-99 from fast$ are discarded
Completion Behavior
Completes when any source Observable completes
Remaining buffered values from other sources are discarded
If any source errors, the error propagates immediately
import { of , zipWith , delay } from 'rxjs' ;
const short$ = of ( 1 , 2 , 3 );
const long$ = of ( 'a' , 'b' , 'c' , 'd' , 'e' , 'f' ). pipe ( delay ( 100 ));
short$ . pipe (
zipWith ( long$ )
). subscribe ({
next: console . log ,
complete : () => console . log ( 'Complete!' )
});
// Output:
// [1, 'a']
// [2, 'b']
// [3, 'c']
// Complete!
// 'd', 'e', 'f' are never emitted
Comparison with Other Operators
Operator Combination Strategy Completion zipWithBy index (paired) When any completes combineLatestWithLatest values When all complete withLatestFromLatest (source-driven) When source completes mergeWithNo combination When all complete
zip - Static creation operator for zipping Observables
zipAll - Zips inner Observables after source completes
combineLatestWith - Combines latest values instead of by index
withLatestFrom - Samples other streams when source emits
forkJoin - Waits for all to complete, emits final values