Overview
combineLatestAll takes an Observable of Observables, waits for the outer Observable to complete, then subscribes to all collected Observables and combines their values using the combineLatest strategy. Every time an inner Observable emits, the output Observable emits the latest values from all inner Observables.
This operator waits for the outer Observable to complete before subscribing to any inner Observables. If you need immediate subscription, consider using mergeAll or switchAll.
Type Signature
export function combineLatestAll < T >() : OperatorFunction < ObservableInput < T >, T []>;
export function combineLatestAll < T , R >( project : ( ... values : T []) => R ) : OperatorFunction < ObservableInput < T >, R >;
Parameters
Optional function to map the most recent values from each inner Observable into a new result. Takes each of the most recent values from each collected inner Observable as arguments, in order.
Returns
OperatorFunction<ObservableInput<T>, T[] | R> - An operator function that returns an Observable that flattens Observables emitted by the source Observable, emitting arrays of the latest values from all inner Observables.
Usage Examples
Basic Example: Combining Multiple Intervals
Basic Usage
With Project Function
import { fromEvent , map , interval , take , combineLatestAll } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const higherOrder = clicks . pipe (
map (() => interval ( Math . random () * 2000 ). pipe ( take ( 3 ))),
take ( 2 )
);
const result = higherOrder . pipe ( combineLatestAll ());
result . subscribe ( x => console . log ( x ));
// Output: [2, 0], [2, 1], [2, 2]
// Each inner interval emits at different rates
Real-World Example: Aggregating API Responses
import { of , delay , combineLatestAll } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
interface UserData {
profile : any ;
settings : any ;
permissions : any ;
}
function loadUserData ( userId : string ) {
const apiCalls$ = of (
ajax . getJSON ( `/api/users/ ${ userId } /profile` ),
ajax . getJSON ( `/api/users/ ${ userId } /settings` ),
ajax . getJSON ( `/api/users/ ${ userId } /permissions` )
);
return apiCalls$ . pipe (
combineLatestAll (( profile , settings , permissions ) => ({
profile ,
settings ,
permissions
}))
);
}
loadUserData ( '123' ). subscribe (( userData : UserData ) => {
console . log ( 'Complete user data loaded:' , userData );
});
import { of , fromEvent , map , combineLatestAll } from 'rxjs' ;
const nameInput = document . getElementById ( 'name' ) as HTMLInputElement ;
const emailInput = document . getElementById ( 'email' ) as HTMLInputElement ;
const ageInput = document . getElementById ( 'age' ) as HTMLInputElement ;
const formFields$ = of (
fromEvent ( nameInput , 'input' ). pipe ( map ( e => ( e . target as HTMLInputElement ). value )),
fromEvent ( emailInput , 'input' ). pipe ( map ( e => ( e . target as HTMLInputElement ). value )),
fromEvent ( ageInput , 'input' ). pipe ( map ( e => ( e . target as HTMLInputElement ). value ))
);
formFields$ . pipe (
combineLatestAll (( name , email , age ) => ({ name , email , age }))
). subscribe ( formData => {
console . log ( 'Form data:' , formData );
});
Practical Scenarios
Use combineLatestAll when you have a dynamic number of streams that all need to emit at least once before you start processing their combined values.
Scenario 1: Multi-Source Data Aggregation
Combine data from multiple sensors or data sources where you need the latest reading from all sources:
import { of , interval , map , take , combineLatestAll } from 'rxjs' ;
const sensors = [ 'temperature' , 'humidity' , 'pressure' ];
const sensorStreams$ = of (
... sensors . map ( sensor =>
interval ( 1000 ). pipe (
map (() => Math . random () * 100 ),
take ( 5 )
)
)
);
sensorStreams$ . pipe (
combineLatestAll (( temp , humidity , pressure ) => ({
temperature: temp ,
humidity: humidity ,
pressure: pressure ,
timestamp: Date . now ()
}))
). subscribe ( readings => {
console . log ( 'Latest sensor readings:' , readings );
});
Scenario 2: Parallel Task Completion
Wait for multiple async tasks to complete and combine their results:
import { of , delay , combineLatestAll } from 'rxjs' ;
function processImages ( imageUrls : string []) {
const processingTasks$ = of (
... imageUrls . map ( url =>
of ({ url , processed: true }). pipe (
delay ( Math . random () * 2000 )
)
)
);
return processingTasks$ . pipe (
combineLatestAll (( ... results ) => results )
);
}
processImages ([ 'img1.jpg' , 'img2.jpg' , 'img3.jpg' ])
. subscribe ( results => {
console . log ( 'All images processed:' , results );
});
Behavior Details
If the source Observable emits Observables that never complete, combineLatestAll will continue emitting indefinitely. Make sure your inner Observables complete if you need the combined Observable to complete.
Emission Timing
Waits for the outer Observable to complete
Only then subscribes to all collected inner Observables
Emits only after ALL inner Observables have emitted at least once
Subsequently emits whenever ANY inner Observable emits
Completion Behavior
Completes when all inner Observables complete
If any inner Observable errors, the output Observable errors immediately
combineLatest - Combines multiple Observables without flattening
combineLatestWith - Instance operator version for combining with specific streams
mergeAll - Flattens without waiting for all emissions
zipAll - Combines by index instead of latest values
forkJoin - Waits for all Observables to complete, then emits once