Skip to main content

Overview

AsyncSubject is a variant of Subject that only emits a value when it completes. It will emit its latest value to all observers upon completion. If the Subject errors, no value is emitted.
Key Feature: AsyncSubject emits only the last value and only when complete is called.

Class Signature

class AsyncSubject<T> extends Subject<T> {
  next(value: T): void;
  complete(): void;
}

Constructor

const subject = new AsyncSubject<T>();
No parameters required.

Behavior

  • Values passed to next() are stored but not emitted to subscribers
  • Only the last value is remembered
  • Value is emitted only when complete() is called
  • All subscribers (past and future) receive the same final value
  • If error() is called, no value is emitted (only the error)

Usage Examples

Basic Usage

Emit only the final value:
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe({
  next: (v) => console.log(`Observer A: ${v}`),
  complete: () => console.log('Observer A complete')
});

subject.next(1);
subject.next(2);
subject.next(3);

console.log('Before complete - nothing emitted yet');

subject.complete();

// Output:
// Before complete - nothing emitted yet
// Observer A: 3
// Observer A complete

Single Async Result

Perfect for operations that produce a single final result:
import { AsyncSubject } from 'rxjs';

function fetchUserProfile(userId: number): AsyncSubject<User> {
  const subject = new AsyncSubject<User>();
  
  fetch(`/api/users/${userId}`)
    .then(response => response.json())
    .then(user => {
      subject.next(user);
      subject.complete();
    })
    .catch(error => {
      subject.error(error);
    });
  
  return subject;
}

const userProfile$ = fetchUserProfile(123);

// Multiple subscribers get the same result
userProfile$.subscribe(user => {
  console.log('Subscriber 1:', user.name);
});

userProfile$.subscribe(user => {
  console.log('Subscriber 2:', user.name);
});

Computation Result

Emit the result of a long computation:
import { AsyncSubject } from 'rxjs';

function calculatePi(iterations: number): AsyncSubject<number> {
  const subject = new AsyncSubject<number>();
  
  // Simulate expensive calculation
  setTimeout(() => {
    let pi = 0;
    for (let i = 0; i < iterations; i++) {
      pi += Math.pow(-1, i) / (2 * i + 1);
    }
    pi *= 4;
    
    subject.next(pi);
    subject.complete();
  }, 100);
  
  return subject;
}

const pi$ = calculatePi(1000000);

pi$.subscribe(result => {
  console.log('Pi approximation:', result);
});

Conditional Emission

Only emit if conditions are met:
import { AsyncSubject } from 'rxjs';

function validateForm(formData: FormData): AsyncSubject<boolean> {
  const subject = new AsyncSubject<boolean>();
  
  let isValid = true;
  
  // Validate field 1
  if (!formData.get('email')) {
    isValid = false;
  }
  
  // Validate field 2
  if (!formData.get('password')) {
    isValid = false;
  }
  
  // More validations...
  subject.next(isValid);
  
  // Emit final validation result
  subject.complete();
  
  return subject;
}

const validation$ = validateForm(myFormData);

validation$.subscribe(isValid => {
  if (isValid) {
    submitForm();
  } else {
    showErrors();
  }
});

Race Condition Resolution

Resolve which operation completes first:
import { AsyncSubject, race } from 'rxjs';

function createAsyncTask(name: string, delay: number): AsyncSubject<string> {
  const subject = new AsyncSubject<string>();
  
  setTimeout(() => {
    subject.next(`${name} completed`);
    subject.complete();
  }, delay);
  
  return subject;
}

const task1 = createAsyncTask('Task 1', 1000);
const task2 = createAsyncTask('Task 2', 500);
const task3 = createAsyncTask('Task 3', 1500);

race(task1, task2, task3).subscribe(winner => {
  console.log('Winner:', winner); // Task 2 completed
});

One-Time Resource

Load a resource once and share the result:
import { AsyncSubject } from 'rxjs';

class ConfigService {
  private configSubject = new AsyncSubject<Config>();
  public config$ = this.configSubject.asObservable();
  
  private loaded = false;
  
  loadConfig(): void {
    if (this.loaded) return;
    this.loaded = true;
    
    fetch('/api/config')
      .then(response => response.json())
      .then(config => {
        this.configSubject.next(config);
        this.configSubject.complete();
      })
      .catch(error => {
        this.configSubject.error(error);
      });
  }
}

const configService = new ConfigService();
configService.loadConfig();

// Multiple components can subscribe
configService.config$.subscribe(config => {
  console.log('Config loaded:', config);
});

// Late subscribers also get the config
setTimeout(() => {
  configService.config$.subscribe(config => {
    console.log('Late subscriber got config:', config);
  });
}, 2000);

Error Handling

If error() is called, the AsyncSubject will not emit any value, only the error notification.
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe({
  next: (v) => console.log('Next:', v),
  error: (err) => console.error('Error:', err.message),
  complete: () => console.log('Complete')
});

subject.next(1);
subject.next(2);
subject.error(new Error('Something went wrong'));

// Output:
// Error: Something went wrong
// (No value emitted)

Completing Without Values

If complete() is called without any next() calls:
import { AsyncSubject } from 'rxjs';

const subject = new AsyncSubject<number>();

subject.subscribe({
  next: (v) => console.log('Next:', v),
  complete: () => console.log('Complete - no value')
});

subject.complete(); // No next() was called

// Output:
// Complete - no value
// (complete notification but no value)

Comparison with Other Subjects

Choose the right Subject:
  • Subject: Immediate multicast, no replay
  • BehaviorSubject: Current value, immediate to new subscribers
  • ReplaySubject: Buffer N values, replay to new subscribers
  • AsyncSubject: Only last value, only on completion
Subject TypeEmitsWhenTo New Subscribers
SubjectAll valuesImmediatelyOnly future values
BehaviorSubjectCurrent value + newImmediatelyCurrent + future
ReplaySubjectBuffered + newImmediatelyBuffered + future
AsyncSubjectLast value onlyOn completionSame last value
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject
const subject = new Subject<number>();
subject.next(1);
subject.subscribe(x => console.log('Subject:', x));
subject.next(2);
// Output: Subject: 2 (missed 1)

// BehaviorSubject
const behavior = new BehaviorSubject<number>(0);
behavior.next(1);
behavior.subscribe(x => console.log('Behavior:', x));
behavior.next(2);
// Output: Behavior: 1, Behavior: 2

// ReplaySubject
const replay = new ReplaySubject<number>(2);
replay.next(1);
replay.subscribe(x => console.log('Replay:', x));
replay.next(2);
// Output: Replay: 1, Replay: 2

// AsyncSubject
const async = new AsyncSubject<number>();
async.next(1);
async.subscribe(x => console.log('Async:', x));
async.next(2);
async.complete();
// Output: Async: 2 (only on complete)

Use Cases

  1. Single Async Results: HTTP requests, file reads
  2. Computations: Long-running calculations
  3. Caching: One-time resource loading
  4. Final State: Emit only the final result of a process
  5. Lazy Evaluation: Compute only when needed
  6. Resource Initialization: Wait for initialization to complete

Pattern: Promise-like Behavior

AsyncSubject behaves similarly to Promises:
import { AsyncSubject } from 'rxjs';

function asyncOperation(): AsyncSubject<string> {
  const subject = new AsyncSubject<string>();
  
  // Simulate async work
  setTimeout(() => {
    subject.next('Result');
    subject.complete();
  }, 1000);
  
  return subject;
}

const result$ = asyncOperation();

// Can subscribe multiple times (like Promise.then())
result$.subscribe(value => console.log('First:', value));
result$.subscribe(value => console.log('Second:', value));

// Both get the same result when it completes

When NOT to Use AsyncSubject

Don’t use AsyncSubject when:
  • You need intermediate values (use Subject or ReplaySubject)
  • You need current value access (use BehaviorSubject)
  • Stream might not complete (use BehaviorSubject or ReplaySubject)
  • You need immediate emissions (use Subject)

Memory Considerations

AsyncSubject only stores the last value, making it memory-efficient:
const subject = new AsyncSubject<LargeObject>();

subject.next(largeObject1); // Replaced
subject.next(largeObject2); // Replaced
subject.next(largeObject3); // Kept

subject.complete();
// Only largeObject3 is in memory
  • Promise - Similar single-value async pattern
  • first() - Get first value then complete
  • last() - Get last value before complete

See Also