Skip to main content
Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner. They are functions that take an Observable as input and return a new Observable as output.

What are Operators?

RxJS is mostly useful for its operators, even though the Observable is the foundation. Operators enable you to transform, filter, combine, and manipulate Observable streams with ease.
Key Concept: Operators are pure functions that create new Observables based on the current Observable. They don’t modify the existing Observable.

Types of Operators

Pipeable Operators

Pipeable operators can be chained together using the pipe() method:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 === 0),
    map(x => x * 10)
  )
  .subscribe(x => console.log(x));

// Output: 20, 40
Type Signature:
interface OperatorFunction<T, R> {
  (source: Observable<T>): Observable<R>;
}

interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {}

Creation Operators

Creation operators are standalone functions that create new Observables:
import { of, from, interval, fromEvent } from 'rxjs';

// Create from values
const numbers$ = of(1, 2, 3);

// Create from array
const array$ = from([10, 20, 30]);

// Create from timer
const timer$ = interval(1000);

// Create from events
const clicks$ = fromEvent(document, 'click');

Using the pipe() Method

The pipe() method allows you to compose multiple operators:
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
  map(x => x * x),
  filter(x => x > 10)
).subscribe(x => console.log(x));

// Output: 16, 25
Don’t use the old syntax operator()(obs). Always use .pipe(operator()) for better readability.

Operator Categories

Transformation Operators

Transform values emitted by an Observable:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(map(x => x * 10))
  .subscribe(x => console.log(x));

// Output: 10, 20, 30
Common Transformation Operators:
  • map - Transform each value
  • mergeMap / flatMap - Map to Observable and merge
  • switchMap - Map to Observable and switch to latest
  • concatMap - Map to Observable and concat sequentially
  • exhaustMap - Map to Observable and ignore new until complete
  • scan - Accumulator (like reduce but emits each step)
  • groupBy - Group emissions by key

Filtering Operators

Filter values from an Observable stream:
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5, 6)
  .pipe(filter(x => x % 2 === 0))
  .subscribe(x => console.log(x));

// Output: 2, 4, 6
Common Filtering Operators:
  • filter - Filter values by predicate
  • take - Take first n values
  • takeUntil - Take until notifier emits
  • takeWhile - Take while condition is true
  • first - Take first value (or first matching predicate)
  • last - Take last value
  • debounceTime - Emit after silence period
  • throttleTime - Emit at most once per period
  • distinctUntilChanged - Emit only when value changes
  • skip - Skip first n values

Combination Operators

Combine multiple Observables:
import { merge, interval } from 'rxjs';
import { map } from 'rxjs/operators';

const first$ = interval(1000).pipe(map(x => `First: ${x}`));
const second$ = interval(1500).pipe(map(x => `Second: ${x}`));

merge(first$, second$).subscribe(x => console.log(x));

// Merges emissions from both
Common Combination Operators:
  • merge - Merge emissions from multiple Observables
  • concat - Concatenate Observables sequentially
  • combineLatest - Combine latest values from all
  • zip - Pair emissions by index
  • forkJoin - Wait for all to complete, emit last values
  • race - Emit from first Observable to emit
  • withLatestFrom - Combine with latest from other Observable

Error Handling Operators

Handle errors gracefully:
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';

throwError(() => new Error('Oops!'))
  .pipe(
    catchError(err => {
      console.error('Caught:', err.message);
      return of('default value');
    })
  )
  .subscribe(x => console.log(x));

// Output: default value
Common Error Handling Operators:
  • catchError - Catch and recover from errors
  • retry - Retry failed Observable
  • retryWhen - Retry with custom logic

Utility Operators

Utility functions for side effects and debugging:
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

of(1, 2, 3)
  .pipe(
    tap(x => console.log('Before:', x)),
    map(x => x * 10),
    tap(x => console.log('After:', x))
  )
  .subscribe();
Common Utility Operators:
  • tap - Perform side effects
  • delay - Delay emissions
  • timeout - Error if no emission within time
  • observeOn - Control execution context
  • subscribeOn - Control subscription context

Higher-Order Observables

Observables that emit other Observables:
import { of, interval } from 'rxjs';
import { map, mergeAll, switchAll } from 'rxjs/operators';

// Creates Observable of Observables
const urls$ = of('/api/users', '/api/posts');
const requests$ = urls$.pipe(
  map(url => ajax(url)) // Returns Observable<Observable<Response>>
);

// Flatten with mergeAll
requests$.pipe(mergeAll()).subscribe(response => console.log(response));

// Or use mergeMap (map + mergeAll)
urls$.pipe(
  mergeMap(url => ajax(url))
).subscribe(response => console.log(response));
Flattening operators convert higher-order Observables to regular Observables:
  • mergeAll / mergeMap - Merge all inner Observables
  • switchAll / switchMap - Switch to latest inner Observable
  • concatAll / concatMap - Concat inner Observables sequentially
  • exhaustAll / exhaustMap - Ignore new while inner is active

Creating Custom Operators

Using pipe() Function

Create reusable operator combinations:
import { pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';

function filterOddDoubleEven<T extends number>() {
  return pipe(
    filter((n: T) => n % 2 === 0),
    map((n: T) => n * 2)
  );
}

// Usage
of(1, 2, 3, 4, 5)
  .pipe(filterOddDoubleEven())
  .subscribe(x => console.log(x));

// Output: 4, 8

From Scratch

Create completely custom operators:
import { Observable } from 'rxjs';
import { OperatorFunction } from 'rxjs';

function multiplyBy<T extends number>(multiplier: number): OperatorFunction<T, number> {
  return (source: Observable<T>) => 
    new Observable(subscriber => {
      return source.subscribe({
        next: value => subscriber.next(value * multiplier),
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
}

// Usage
of(1, 2, 3)
  .pipe(multiplyBy(10))
  .subscribe(x => console.log(x));

// Output: 10, 20, 30

Practical Examples

Search with Debouncing

import { fromEvent } from 'rxjs';
import { debounceTime, map, distinctUntilChanged, switchMap } from 'rxjs/operators';
import { ajax } from 'rxjs/ajax';

const searchBox = document.getElementById('search');

fromEvent(searchBox, 'input')
  .pipe(
    map(event => (event.target as HTMLInputElement).value),
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => ajax(`/api/search?q=${term}`))
  )
  .subscribe(results => displayResults(results));

HTTP Request with Retry

import { ajax } from 'rxjs/ajax';
import { retry, catchError, map } from 'rxjs/operators';
import { of } from 'rxjs';

ajax('/api/data')
  .pipe(
    map(response => response.response),
    retry(3),
    catchError(err => {
      console.error('Failed after retries:', err);
      return of({ error: true, message: 'Failed to load data' });
    })
  )
  .subscribe(data => console.log(data));

Polling with Auto-Stop

import { interval, fromEvent } from 'rxjs';
import { switchMap, takeUntil } from 'rxjs/operators';
import { ajax } from 'rxjs/ajax';

const stopButton = document.getElementById('stop');
const stop$ = fromEvent(stopButton, 'click');

interval(5000)
  .pipe(
    switchMap(() => ajax('/api/status')),
    takeUntil(stop$)
  )
  .subscribe(status => updateStatus(status));

Form Validation

import { fromEvent, combineLatest } from 'rxjs';
import { map, startWith } from 'rxjs/operators';

const email$ = fromEvent(emailInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  map(email => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)),
  startWith(false)
);

const password$ = fromEvent(passwordInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  map(pwd => pwd.length >= 8),
  startWith(false)
);

combineLatest([email$, password$])
  .pipe(
    map(([emailValid, passwordValid]) => emailValid && passwordValid)
  )
  .subscribe(isValid => {
    submitButton.disabled = !isValid;
  });

Best Practices

Choose the right flattening operator:
  • mergeMap - When order doesn’t matter, parallel requests
  • switchMap - When only latest matters (search, autocomplete)
  • concatMap - When order matters (sequential updates)
  • exhaustMap - When you want to ignore until complete (form submit)

1. Use Operators Instead of Subscribe

// ✗ BAD: Logic in subscribe
users$.subscribe(users => {
  const activeUsers = users.filter(u => u.active);
  const userNames = activeUsers.map(u => u.name);
  displayNames(userNames);
});

// ✓ GOOD: Logic in operators
users$.pipe(
  map(users => users.filter(u => u.active)),
  map(users => users.map(u => u.name))
).subscribe(names => displayNames(names));

2. Avoid Nested Subscriptions

// ✗ BAD: Nested subscriptions
getUser().subscribe(user => {
  getPosts(user.id).subscribe(posts => {
    getComments(posts[0].id).subscribe(comments => {
      console.log(comments);
    });
  });
});

// ✓ GOOD: Flattening operators
getUser().pipe(
  switchMap(user => getPosts(user.id)),
  switchMap(posts => getComments(posts[0].id))
).subscribe(comments => console.log(comments));

3. Share Expensive Operations

import { share, shareReplay } from 'rxjs/operators';

// Multiple subscribers share one execution
const data$ = expensiveHttpCall().pipe(
  shareReplay(1) // Cache last value
);

data$.subscribe(d => console.log('Sub 1:', d));
data$.subscribe(d => console.log('Sub 2:', d));
// Only one HTTP request made

4. Use Type-Safe Operators

import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

interface User {
  id: number;
  name: string;
}

const users$: Observable<User[]> = getUsers();

// TypeScript ensures type safety
const names$: Observable<string[]> = users$.pipe(
  map(users => users.map(u => u.name))
);

Operator Selection Guide

NeedOperator
Transform valuesmap
Filter valuesfilter
Flatten nested ObservablesmergeMap, switchMap, concatMap
Take first Ntake(n)
Debounce inputdebounceTime
Retry on errorretry
Handle errorscatchError
Combine latest from allcombineLatest
Wait for all to completeforkJoin
Side effectstap
Accumulate valuesscan
Share executionshare, shareReplay
  • Observable - Operators transform Observables
  • Subscription - Operators can affect subscription lifecycle
  • Observer - Operators modify what Observers receive
  • Subject - Can be used with operators like any Observable