Skip to main content
An Observer is a consumer of values delivered by an Observable. It’s a collection of callbacks that know how to handle the different types of notifications sent by an Observable.

What is an Observer?

Observers are simply objects with three callbacks - one for each type of notification that an Observable can deliver:
  • next: Called when the Observable emits a value
  • error: Called when the Observable encounters an error
  • complete: Called when the Observable completes successfully
Think of an Observer as a listener with three different event handlers - one for data, one for errors, and one for completion.

Type Signature

interface Observer<T> {
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

// Partial observers are also valid
type PartialObserver<T> = 
  | { next: (value: T) => void; error?: (err: any) => void; complete?: () => void; }
  | { next?: (value: T) => void; error: (err: any) => void; complete?: () => void; }
  | { next?: (value: T) => void; error?: (err: any) => void; complete: () => void; }

Creating Observers

Full Observer Object

import { of } from 'rxjs';

const observer = {
  next: (x: number) => console.log('Observer got a next value: ' + x),
  error: (err: any) => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

of(1, 2, 3).subscribe(observer);

// Output:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

Partial Observers

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

// Only handle next values
interval(1000)
  .pipe(take(3))
  .subscribe({
    next: x => console.log('Value:', x)
  });

// Output:
// Value: 0
// Value: 1
// Value: 2
If you don’t provide a callback for a notification type, the execution will still happen normally, but that notification will be ignored.

Function Shorthand

For simple cases where you only care about values, you can pass a function directly:
import { of } from 'rxjs';

// Instead of: subscribe({ next: x => console.log(x) })
of(1, 2, 3).subscribe(x => console.log(x));

// Internally, RxJS creates an Observer object:
// { next: x => console.log(x) }

Observer Callbacks

next(value)

Called whenever the Observable emits a value.
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(1000)
  .pipe(take(3))
  .subscribe({
    next: value => {
      console.log('Received:', value);
      // Process the value
    }
  });
The next callback can be called zero to infinite times, according to the Observable contract.

error(err)

Called when the Observable encounters an error. This terminates the Observable execution.
import { throwError } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError } from 'rxjs/operators';

ajax('/api/data')
  .pipe(
    catchError(err => throwError(() => new Error('Failed to load data')))
  )
  .subscribe({
    next: data => console.log('Data:', data),
    error: err => {
      console.error('Error occurred:', err.message);
      // Handle error (show notification, retry, etc.)
    }
  });
After error is called, no more notifications will be delivered. The complete callback will NOT be called.

complete()

Called when the Observable completes successfully. This terminates the Observable execution.
import { of } from 'rxjs';

of(1, 2, 3).subscribe({
  next: x => console.log('Value:', x),
  complete: () => {
    console.log('Stream completed');
    // Perform cleanup or trigger next action
  }
});

// Output:
// Value: 1
// Value: 2
// Value: 3
// Stream completed
After complete is called, no more notifications will be delivered. The error callback will NOT be called.

Observer Execution Rules

Observers follow the Observable Contract:
next*(error|complete)?
This means:
  1. next can be called 0 to infinite times
  2. Either error or complete can be called once (but not both)
  3. After error or complete, no further notifications are delivered
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
// ✓ Valid: multiple next, then complete

Practical Examples

HTTP Request Observer

import { ajax } from 'rxjs/ajax';

ajax('/api/users').subscribe({
  next: response => {
    console.log('Users loaded:', response.response);
    // Update UI with user data
  },
  error: err => {
    console.error('Failed to load users:', err);
    // Show error message to user
  },
  complete: () => {
    console.log('Request completed');
    // Hide loading indicator
  }
});

Form Input Observer

import { fromEvent } from 'rxjs';
import { map, debounceTime } from 'rxjs/operators';

const input = document.querySelector('#search-input');
const search$ = fromEvent(input, 'input').pipe(
  map(event => (event.target as HTMLInputElement).value),
  debounceTime(300)
);

search$.subscribe({
  next: searchTerm => {
    console.log('Search for:', searchTerm);
    // Perform search with debounced input
  },
  error: err => {
    console.error('Search error:', err);
  }
});

WebSocket Observer

import { webSocket } from 'rxjs/webSocket';

const socket$ = webSocket('ws://localhost:8080');

socket$.subscribe({
  next: msg => {
    console.log('Message received:', msg);
    // Handle incoming message
  },
  error: err => {
    console.error('WebSocket error:', err);
    // Attempt reconnection
  },
  complete: () => {
    console.log('WebSocket connection closed');
    // Clean up resources
  }
});

State Management Observer

import { BehaviorSubject } from 'rxjs';

interface AppState {
  user: { name: string; id: number } | null;
  isLoading: boolean;
}

const state$ = new BehaviorSubject<AppState>({
  user: null,
  isLoading: false
});

// Component subscribes to state
state$.subscribe({
  next: state => {
    console.log('State updated:', state);
    // Re-render UI with new state
  }
});

// Update state
state$.next({ user: { name: 'Alice', id: 1 }, isLoading: false });

Best Practices

1. Always Handle Errors

Unhandled errors in Observables will crash your application. Always provide an error handler.
// ✗ Bad: No error handling
http.get('/api/data').subscribe(data => console.log(data));

// ✓ Good: Error handling
http.get('/api/data').subscribe({
  next: data => console.log(data),
  error: err => console.error('Failed:', err)
});

2. Keep Observers Simple

// ✗ Bad: Complex logic in observer
observable$.subscribe({
  next: data => {
    // 100 lines of transformation and business logic
  }
});

// ✓ Good: Use operators for transformations
observable$.pipe(
  map(transform),
  filter(validate),
  tap(logData)
).subscribe({
  next: data => updateUI(data)
});

3. Avoid Side Effects in next

Use the tap operator for side effects instead of putting them in the observer.
// ✗ Discouraged: Side effects in subscribe
data$.subscribe({
  next: data => {
    console.log('Data:', data);
    saveToLocalStorage(data);
    updateMetrics();
  }
});

// ✓ Better: Side effects in pipeline
data$.pipe(
  tap(data => console.log('Data:', data)),
  tap(saveToLocalStorage),
  tap(updateMetrics)
).subscribe();

4. Use TypeScript Types

import { Observer } from 'rxjs';

interface User {
  id: number;
  name: string;
}

const userObserver: Observer<User> = {
  next: (user: User) => console.log('User:', user.name),
  error: (err: Error) => console.error('Error:', err.message),
  complete: () => console.log('Done')
};

users$.subscribe(userObserver);

Common Patterns

Conditional Completion Handling

let completedNormally = false;

observable$.subscribe({
  next: data => processData(data),
  error: err => handleError(err),
  complete: () => {
    completedNormally = true;
    finalizeOperation();
  }
});

Multiple Observers

import { share } from 'rxjs/operators';

const shared$ = expensive$.pipe(share());

// Observer 1: Update UI
shared$.subscribe({
  next: data => updateUI(data)
});

// Observer 2: Log to analytics
shared$.subscribe({
  next: data => trackAnalytics(data)
});

// Observer 3: Cache data
shared$.subscribe({
  next: data => cacheData(data)
});

When to Use Different Observer Styles

StyleUse When
Full ObjectYou need to handle multiple notification types
Partial ObjectYou only care about specific notifications
Function ShorthandYou only need to process values (next)
  • Observable - The data source that Observers consume
  • Subscription - Manages the connection between Observable and Observer
  • Operators - Transform data before it reaches the Observer
  • Subject - Acts as both Observable and Observer