forkJoin
Accepts an Array or dictionary Object of Observables and returns an Observable that emits either an array or dictionary of values when all input Observables complete.
Import
import { forkJoin } from 'rxjs' ;
Type Signature
// Array of Observables
function forkJoin < A extends readonly unknown []>(
sources : readonly [ ... ObservableInputTuple < A >]
) : Observable < A >;
// Dictionary of Observables
function forkJoin < T extends Record < string , ObservableInput < any >>>(
sourcesObject : T
) : Observable <{ [ K in keyof T ] : ObservedValueOf < T [ K ]> }>;
// With result selector
function forkJoin < A extends readonly unknown [], R >(
sources : readonly [ ... ObservableInputTuple < A >],
resultSelector : ( ... values : A ) => R
) : Observable < R >;
Parameters
sources
Array<ObservableInput> | Object
required
Either an array of Observables or an object where values are Observables.
Optional function to transform the combined last values before emission.
Returns
An Observable that emits:
An array of last values (when given an array)
An object of last values (when given an object)
The result of resultSelector (when provided)
Emits only once, then completes
Completes immediately if given an empty array
Description
forkJoin is similar to Promise.all() - it waits for all input Observables to complete, then emits a single value containing the last value from each Observable.
Key characteristics:
Waits for ALL Observables to complete
Emits ONLY the last value from each Observable
Emits only ONCE, then completes
Errors immediately if ANY Observable errors
Completes immediately if given empty array
Completes without emitting if any Observable completes without emitting
Examples
Parallel HTTP Requests (Dictionary)
import { forkJoin } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const requests$ = forkJoin ({
user: ajax . getJSON ( '/api/user' ),
posts: ajax . getJSON ( '/api/posts' ),
comments: ajax . getJSON ( '/api/comments' )
});
requests$ . subscribe ({
next : ({ user , posts , comments }) => {
console . log ( 'User:' , user );
console . log ( 'Posts:' , posts . length );
console . log ( 'Comments:' , comments . length );
},
error : err => console . error ( 'Request failed:' , err ),
complete : () => console . log ( 'All requests completed!' )
});
Parallel HTTP Requests (Array)
import { forkJoin } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
const requests$ = forkJoin ([
ajax . getJSON ( '/api/user' ),
ajax . getJSON ( '/api/posts' ),
ajax . getJSON ( '/api/comments' )
]);
requests$ . subscribe (([ user , posts , comments ]) => {
console . log ( 'Got all data:' , { user , posts , comments });
});
Wait for Multiple Timers
import { forkJoin , of , timer } from 'rxjs' ;
const observable = forkJoin ({
foo: of ( 1 , 2 , 3 , 4 ), // Emits 1,2,3,4 then completes
bar: Promise . resolve ( 8 ), // Resolves to 8
baz: timer ( 4000 ) // Emits 0 after 4 seconds
});
observable . subscribe ({
next : value => console . log ( value ),
complete : () => console . log ( 'Done!' )
});
// After 4 seconds:
// { foo: 4, bar: 8, baz: 0 }
// Done!
Common Use Cases
Load Multiple Resources
import { forkJoin } from 'rxjs' ;
import { ajax } from 'rxjs/ajax' ;
import { map } from 'rxjs/operators' ;
function loadUserProfile ( userId : string ) {
return forkJoin ({
user: ajax . getJSON ( `/api/users/ ${ userId } ` ),
posts: ajax . getJSON ( `/api/users/ ${ userId } /posts` ),
followers: ajax . getJSON ( `/api/users/ ${ userId } /followers` ),
following: ajax . getJSON ( `/api/users/ ${ userId } /following` )
}). pipe (
map (({ user , posts , followers , following }) => ({
... user ,
stats: {
postsCount: posts . length ,
followersCount: followers . length ,
followingCount: following . length
}
}))
);
}
loadUserProfile ( '123' ). subscribe ( profile => {
console . log ( 'Profile loaded:' , profile );
});
import { forkJoin , of } from 'rxjs' ;
import { map , delay } from 'rxjs/operators' ;
function validateUsername ( username : string ) {
return ajax . getJSON ( `/api/check-username?name= ${ username } ` ). pipe (
map ( response => response . available )
);
}
function validateEmail ( email : string ) {
return ajax . getJSON ( `/api/check-email?email= ${ email } ` ). pipe (
map ( response => response . valid )
);
}
function validateForm ( username : string , email : string ) {
return forkJoin ({
usernameValid: validateUsername ( username ),
emailValid: validateEmail ( email )
}). pipe (
map (({ usernameValid , emailValid }) => ({
valid: usernameValid && emailValid ,
errors: {
username: ! usernameValid ? 'Username already taken' : null ,
email: ! emailValid ? 'Invalid email' : null
}
}))
);
}
validateForm ( 'john' , 'john@example.com' ). subscribe ( result => {
if ( result . valid ) {
console . log ( 'Form is valid!' );
} else {
console . log ( 'Validation errors:' , result . errors );
}
});
Batch File Upload
import { forkJoin , from } from 'rxjs' ;
import { mergeMap } from 'rxjs/operators' ;
function uploadFile ( file : File ) : Observable < string > {
const formData = new FormData ();
formData . append ( 'file' , file );
return ajax . post ( '/api/upload' , formData ). pipe (
map ( response => response . response . url )
);
}
function uploadFiles ( files : File []) {
const uploads$ = files . map ( file => uploadFile ( file ));
return forkJoin ( uploads$ );
}
const files = [ file1 , file2 , file3 ];
uploadFiles ( files ). subscribe ({
next : urls => console . log ( 'All files uploaded:' , urls ),
error : err => console . error ( 'Upload failed:' , err )
});
Database Migrations
import { forkJoin , defer } from 'rxjs' ;
function runMigration ( name : string , fn : () => Promise < void >) {
return defer (() => {
console . log ( `Running migration: ${ name } ` );
return from ( fn ()). pipe (
tap (() => console . log ( `Completed migration: ${ name } ` ))
);
});
}
const migrations$ = forkJoin ([
runMigration ( '001_create_users' , () => db . exec ( 'CREATE TABLE users...' )),
runMigration ( '002_create_posts' , () => db . exec ( 'CREATE TABLE posts...' )),
runMigration ( '003_add_indexes' , () => db . exec ( 'CREATE INDEX...' ))
]);
migrations$ . subscribe ({
next : () => console . log ( 'All migrations completed' ),
error : err => console . error ( 'Migration failed:' , err )
});
Behavior Details
Only Last Values
import { forkJoin , of } from 'rxjs' ;
forkJoin ([
of ( 1 , 2 , 3 , 4 ), // Last value: 4
of ( 'a' , 'b' , 'c' ) // Last value: 'c'
]). subscribe (([ num , letter ]) => {
console . log ( num , letter ); // 4, 'c'
});
Empty Observable Behavior
import { forkJoin , of , EMPTY } from 'rxjs' ;
// This completes WITHOUT emitting
forkJoin ([
of ( 1 , 2 , 3 ),
EMPTY // Completes without emitting
]). subscribe ({
next : x => console . log ( 'Next:' , x ), // Never called
complete : () => console . log ( 'Complete!' ) // Called immediately
});
Error Handling
import { forkJoin , of , throwError } from 'rxjs' ;
import { catchError } from 'rxjs/operators' ;
// Without error handling - entire forkJoin errors
forkJoin ([
of ( 1 ),
throwError (() => new Error ( 'Fail' )),
of ( 3 )
]). subscribe ({
next : x => console . log ( x ), // Never called
error : err => console . error ( 'Error:' , err ) // Called with error
});
// With error handling - provides fallback
forkJoin ([
of ( 1 ),
throwError (() => new Error ( 'Fail' )). pipe (
catchError (() => of ( 'recovered' ))
),
of ( 3 )
]). subscribe ({
next : x => console . log ( x ) // [1, 'recovered', 3]
});
Comparison with Other Operators
forkJoin (Wait for All)
combineLatest (Emit on Any Change)
import { forkJoin , timer } from 'rxjs' ;
import { take } from 'rxjs/operators' ;
forkJoin ([
timer ( 0 , 100 ). pipe ( take ( 3 )),
timer ( 0 , 200 ). pipe ( take ( 2 ))
]). subscribe (([ a , b ]) => {
console . log ( 'forkJoin:' , a , b );
});
// Output (after both complete):
// forkJoin: 2, 1
Important Notes
forkJoin will never emit if any Observable never completes. Make sure all your Observables will eventually complete.
If you need values emitted during execution (not just the last), use combineLatest or zip instead.
forkJoin is perfect for loading all data before displaying a page, similar to Promise.all().
combineLatest - Emit whenever any Observable emits
zip - Combine by index position
concat - Subscribe sequentially
merge - Merge emissions concurrently
See Also