Skip to main content

forkJoin

Accepts an Array or dictionary Object of Observables and returns an Observable that emits either an array or dictionary of values when all input Observables complete.

Import

import { forkJoin } from 'rxjs';

Type Signature

// Array of Observables
function forkJoin<A extends readonly unknown[]>(
  sources: readonly [...ObservableInputTuple<A>]
): Observable<A>;

// Dictionary of Observables
function forkJoin<T extends Record<string, ObservableInput<any>>>(
  sourcesObject: T
): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;

// With result selector
function forkJoin<A extends readonly unknown[], R>(
  sources: readonly [...ObservableInputTuple<A>],
  resultSelector: (...values: A) => R
): Observable<R>;

Parameters

sources
Array<ObservableInput> | Object
required
Either an array of Observables or an object where values are Observables.
resultSelector
Function
Optional function to transform the combined last values before emission.

Returns

Observable
Observable<T>
An Observable that emits:
  • An array of last values (when given an array)
  • An object of last values (when given an object)
  • The result of resultSelector (when provided)
  • Emits only once, then completes
  • Completes immediately if given an empty array

Description

forkJoin is similar to Promise.all() - it waits for all input Observables to complete, then emits a single value containing the last value from each Observable. Key characteristics:
  • Waits for ALL Observables to complete
  • Emits ONLY the last value from each Observable
  • Emits only ONCE, then completes
  • Errors immediately if ANY Observable errors
  • Completes immediately if given empty array
  • Completes without emitting if any Observable completes without emitting

Examples

Parallel HTTP Requests (Dictionary)

import { forkJoin } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const requests$ = forkJoin({
  user: ajax.getJSON('/api/user'),
  posts: ajax.getJSON('/api/posts'),
  comments: ajax.getJSON('/api/comments')
});

requests$.subscribe({
  next: ({ user, posts, comments }) => {
    console.log('User:', user);
    console.log('Posts:', posts.length);
    console.log('Comments:', comments.length);
  },
  error: err => console.error('Request failed:', err),
  complete: () => console.log('All requests completed!')
});

Parallel HTTP Requests (Array)

import { forkJoin } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const requests$ = forkJoin([
  ajax.getJSON('/api/user'),
  ajax.getJSON('/api/posts'),
  ajax.getJSON('/api/comments')
]);

requests$.subscribe(([user, posts, comments]) => {
  console.log('Got all data:', { user, posts, comments });
});

Wait for Multiple Timers

import { forkJoin, of, timer } from 'rxjs';

const observable = forkJoin({
  foo: of(1, 2, 3, 4),          // Emits 1,2,3,4 then completes
  bar: Promise.resolve(8),       // Resolves to 8
  baz: timer(4000)               // Emits 0 after 4 seconds
});

observable.subscribe({
  next: value => console.log(value),
  complete: () => console.log('Done!')
});

// After 4 seconds:
// { foo: 4, bar: 8, baz: 0 }
// Done!

Common Use Cases

Load Multiple Resources

import { forkJoin } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

function loadUserProfile(userId: string) {
  return forkJoin({
    user: ajax.getJSON(`/api/users/${userId}`),
    posts: ajax.getJSON(`/api/users/${userId}/posts`),
    followers: ajax.getJSON(`/api/users/${userId}/followers`),
    following: ajax.getJSON(`/api/users/${userId}/following`)
  }).pipe(
    map(({ user, posts, followers, following }) => ({
      ...user,
      stats: {
        postsCount: posts.length,
        followersCount: followers.length,
        followingCount: following.length
      }
    }))
  );
}

loadUserProfile('123').subscribe(profile => {
  console.log('Profile loaded:', profile);
});

Form Validation

import { forkJoin, of } from 'rxjs';
import { map, delay } from 'rxjs/operators';

function validateUsername(username: string) {
  return ajax.getJSON(`/api/check-username?name=${username}`).pipe(
    map(response => response.available)
  );
}

function validateEmail(email: string) {
  return ajax.getJSON(`/api/check-email?email=${email}`).pipe(
    map(response => response.valid)
  );
}

function validateForm(username: string, email: string) {
  return forkJoin({
    usernameValid: validateUsername(username),
    emailValid: validateEmail(email)
  }).pipe(
    map(({ usernameValid, emailValid }) => ({
      valid: usernameValid && emailValid,
      errors: {
        username: !usernameValid ? 'Username already taken' : null,
        email: !emailValid ? 'Invalid email' : null
      }
    }))
  );
}

validateForm('john', 'john@example.com').subscribe(result => {
  if (result.valid) {
    console.log('Form is valid!');
  } else {
    console.log('Validation errors:', result.errors);
  }
});

Batch File Upload

import { forkJoin, from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

function uploadFile(file: File): Observable<string> {
  const formData = new FormData();
  formData.append('file', file);
  
  return ajax.post('/api/upload', formData).pipe(
    map(response => response.response.url)
  );
}

function uploadFiles(files: File[]) {
  const uploads$ = files.map(file => uploadFile(file));
  return forkJoin(uploads$);
}

const files = [file1, file2, file3];

uploadFiles(files).subscribe({
  next: urls => console.log('All files uploaded:', urls),
  error: err => console.error('Upload failed:', err)
});

Database Migrations

import { forkJoin, defer } from 'rxjs';

function runMigration(name: string, fn: () => Promise<void>) {
  return defer(() => {
    console.log(`Running migration: ${name}`);
    return from(fn()).pipe(
      tap(() => console.log(`Completed migration: ${name}`))
    );
  });
}

const migrations$ = forkJoin([
  runMigration('001_create_users', () => db.exec('CREATE TABLE users...')),
  runMigration('002_create_posts', () => db.exec('CREATE TABLE posts...')),
  runMigration('003_add_indexes', () => db.exec('CREATE INDEX...'))
]);

migrations$.subscribe({
  next: () => console.log('All migrations completed'),
  error: err => console.error('Migration failed:', err)
});

Behavior Details

Only Last Values

import { forkJoin, of } from 'rxjs';

forkJoin([
  of(1, 2, 3, 4),  // Last value: 4
  of('a', 'b', 'c') // Last value: 'c'
]).subscribe(([num, letter]) => {
  console.log(num, letter); // 4, 'c'
});

Empty Observable Behavior

import { forkJoin, of, EMPTY } from 'rxjs';

// This completes WITHOUT emitting
forkJoin([
  of(1, 2, 3),
  EMPTY  // Completes without emitting
]).subscribe({
  next: x => console.log('Next:', x),  // Never called
  complete: () => console.log('Complete!')  // Called immediately
});

Error Handling

import { forkJoin, of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

// Without error handling - entire forkJoin errors
forkJoin([
  of(1),
  throwError(() => new Error('Fail')),
  of(3)
]).subscribe({
  next: x => console.log(x),  // Never called
  error: err => console.error('Error:', err)  // Called with error
});

// With error handling - provides fallback
forkJoin([
  of(1),
  throwError(() => new Error('Fail')).pipe(
    catchError(() => of('recovered'))
  ),
  of(3)
]).subscribe({
  next: x => console.log(x)  // [1, 'recovered', 3]
});

Comparison with Other Operators

import { forkJoin, timer } from 'rxjs';
import { take } from 'rxjs/operators';

forkJoin([
  timer(0, 100).pipe(take(3)),
  timer(0, 200).pipe(take(2))
]).subscribe(([a, b]) => {
  console.log('forkJoin:', a, b);
});

// Output (after both complete):
// forkJoin: 2, 1

Important Notes

forkJoin will never emit if any Observable never completes. Make sure all your Observables will eventually complete.
If you need values emitted during execution (not just the last), use combineLatest or zip instead.
forkJoin is perfect for loading all data before displaying a page, similar to Promise.all().
  • combineLatest - Emit whenever any Observable emits
  • zip - Combine by index position
  • concat - Subscribe sequentially
  • merge - Merge emissions concurrently

See Also