Operators are the essential pieces that allow complex asynchronous code to be easily composed in a declarative manner. They are functions that take an Observable as input and return a new Observable as output.
What are Operators?
RxJS is mostly useful for its operators, even though the Observable is the foundation. Operators enable you to transform, filter, combine, and manipulate Observable streams with ease.
Key Concept : Operators are pure functions that create new Observables based on the current Observable. They don’t modify the existing Observable.
Types of Operators
Pipeable Operators
Pipeable operators can be chained together using the pipe() method:
import { of } from 'rxjs' ;
import { map , filter } from 'rxjs/operators' ;
of ( 1 , 2 , 3 , 4 , 5 )
. pipe (
filter ( x => x % 2 === 0 ),
map ( x => x * 10 )
)
. subscribe ( x => console . log ( x ));
// Output: 20, 40
Type Signature:
interface OperatorFunction < T , R > {
( source : Observable < T >) : Observable < R >;
}
interface MonoTypeOperatorFunction < T > extends OperatorFunction < T , T > {}
Creation Operators
Creation operators are standalone functions that create new Observables:
import { of , from , interval , fromEvent } from 'rxjs' ;
// Create from values
const numbers$ = of ( 1 , 2 , 3 );
// Create from array
const array$ = from ([ 10 , 20 , 30 ]);
// Create from timer
const timer$ = interval ( 1000 );
// Create from events
const clicks$ = fromEvent ( document , 'click' );
Using the pipe() Method
The pipe() method allows you to compose multiple operators:
Basic Piping
Multiple Operators
Complex Chain
import { of } from 'rxjs' ;
import { map , filter } from 'rxjs/operators' ;
of ( 1 , 2 , 3 , 4 , 5 ). pipe (
map ( x => x * x ),
filter ( x => x > 10 )
). subscribe ( x => console . log ( x ));
// Output: 16, 25
Don’t use the old syntax operator()(obs). Always use .pipe(operator()) for better readability.
Operator Categories
Transform values emitted by an Observable:
map
mergeMap
switchMap
scan
import { of } from 'rxjs' ;
import { map } from 'rxjs/operators' ;
of ( 1 , 2 , 3 )
. pipe ( map ( x => x * 10 ))
. subscribe ( x => console . log ( x ));
// Output: 10, 20, 30
import { of } from 'rxjs' ;
import { mergeMap } from 'rxjs/operators' ;
of ( 1 , 2 , 3 )
. pipe (
mergeMap ( x => of ( x , x * 10 ))
)
. subscribe ( x => console . log ( x ));
// Output: 1, 10, 2, 20, 3, 30
import { fromEvent , interval } from 'rxjs' ;
import { switchMap } from 'rxjs/operators' ;
fromEvent ( button , 'click' )
. pipe (
switchMap (() => interval ( 1000 ))
)
. subscribe ( x => console . log ( x ));
// Switches to new interval on each click
import { of } from 'rxjs' ;
import { scan } from 'rxjs/operators' ;
of ( 1 , 2 , 3 , 4 )
. pipe (
scan (( acc , curr ) => acc + curr , 0 )
)
. subscribe ( x => console . log ( x ));
// Output: 1, 3, 6, 10 (running sum)
Common Transformation Operators:
map - Transform each value
mergeMap / flatMap - Map to Observable and merge
switchMap - Map to Observable and switch to latest
concatMap - Map to Observable and concat sequentially
exhaustMap - Map to Observable and ignore new until complete
scan - Accumulator (like reduce but emits each step)
groupBy - Group emissions by key
Filtering Operators
Filter values from an Observable stream:
filter
take
debounceTime
distinctUntilChanged
import { of } from 'rxjs' ;
import { filter } from 'rxjs/operators' ;
of ( 1 , 2 , 3 , 4 , 5 , 6 )
. pipe ( filter ( x => x % 2 === 0 ))
. subscribe ( x => console . log ( x ));
// Output: 2, 4, 6
Common Filtering Operators:
filter - Filter values by predicate
take - Take first n values
takeUntil - Take until notifier emits
takeWhile - Take while condition is true
first - Take first value (or first matching predicate)
last - Take last value
debounceTime - Emit after silence period
throttleTime - Emit at most once per period
distinctUntilChanged - Emit only when value changes
skip - Skip first n values
Combination Operators
Combine multiple Observables:
merge
concat
combineLatest
zip
import { merge , interval } from 'rxjs' ;
import { map } from 'rxjs/operators' ;
const first$ = interval ( 1000 ). pipe ( map ( x => `First: ${ x } ` ));
const second$ = interval ( 1500 ). pipe ( map ( x => `Second: ${ x } ` ));
merge ( first$ , second$ ). subscribe ( x => console . log ( x ));
// Merges emissions from both
import { concat , of } from 'rxjs' ;
const first$ = of ( 1 , 2 , 3 );
const second$ = of ( 4 , 5 , 6 );
concat ( first$ , second$ ). subscribe ( x => console . log ( x ));
// Output: 1, 2, 3, 4, 5, 6 (sequential)
import { combineLatest , of } from 'rxjs' ;
import { delay } from 'rxjs/operators' ;
const first$ = of ( 'A' , 'B' , 'C' );
const second$ = of ( 1 , 2 ). pipe ( delay ( 100 ));
combineLatest ([ first$ , second$ ])
. subscribe (([ letter , number ]) =>
console . log ( letter , number )
);
// Emits when any source emits
import { zip , of } from 'rxjs' ;
const letters$ = of ( 'A' , 'B' , 'C' );
const numbers$ = of ( 1 , 2 , 3 );
zip ( letters$ , numbers$ )
. subscribe (([ letter , number ]) =>
console . log ( letter , number )
);
// Output: A 1, B 2, C 3
Common Combination Operators:
merge - Merge emissions from multiple Observables
concat - Concatenate Observables sequentially
combineLatest - Combine latest values from all
zip - Pair emissions by index
forkJoin - Wait for all to complete, emit last values
race - Emit from first Observable to emit
withLatestFrom - Combine with latest from other Observable
Error Handling Operators
Handle errors gracefully:
import { of , throwError } from 'rxjs' ;
import { catchError } from 'rxjs/operators' ;
throwError (() => new Error ( 'Oops!' ))
. pipe (
catchError ( err => {
console . error ( 'Caught:' , err . message );
return of ( 'default value' );
})
)
. subscribe ( x => console . log ( x ));
// Output: default value
Common Error Handling Operators:
catchError - Catch and recover from errors
retry - Retry failed Observable
retryWhen - Retry with custom logic
Utility Operators
Utility functions for side effects and debugging:
import { of } from 'rxjs' ;
import { tap , map } from 'rxjs/operators' ;
of ( 1 , 2 , 3 )
. pipe (
tap ( x => console . log ( 'Before:' , x )),
map ( x => x * 10 ),
tap ( x => console . log ( 'After:' , x ))
)
. subscribe ();
Common Utility Operators:
tap - Perform side effects
delay - Delay emissions
timeout - Error if no emission within time
observeOn - Control execution context
subscribeOn - Control subscription context
Higher-Order Observables
Observables that emit other Observables:
import { of , interval } from 'rxjs' ;
import { map , mergeAll , switchAll } from 'rxjs/operators' ;
// Creates Observable of Observables
const urls$ = of ( '/api/users' , '/api/posts' );
const requests$ = urls$ . pipe (
map ( url => ajax ( url )) // Returns Observable<Observable<Response>>
);
// Flatten with mergeAll
requests$ . pipe ( mergeAll ()). subscribe ( response => console . log ( response ));
// Or use mergeMap (map + mergeAll)
urls$ . pipe (
mergeMap ( url => ajax ( url ))
). subscribe ( response => console . log ( response ));
Flattening operators convert higher-order Observables to regular Observables:
mergeAll / mergeMap - Merge all inner Observables
switchAll / switchMap - Switch to latest inner Observable
concatAll / concatMap - Concat inner Observables sequentially
exhaustAll / exhaustMap - Ignore new while inner is active
Creating Custom Operators
Using pipe() Function
Create reusable operator combinations:
import { pipe } from 'rxjs' ;
import { filter , map } from 'rxjs/operators' ;
function filterOddDoubleEven < T extends number >() {
return pipe (
filter (( n : T ) => n % 2 === 0 ),
map (( n : T ) => n * 2 )
);
}
// Usage
of ( 1 , 2 , 3 , 4 , 5 )
. pipe ( filterOddDoubleEven ())
. subscribe ( x => console . log ( x ));
// Output: 4, 8
From Scratch
Create completely custom operators:
import { Observable } from 'rxjs' ;
import { OperatorFunction } from 'rxjs' ;
function multiplyBy < T extends number >( multiplier : number ) : OperatorFunction < T , number > {
return ( source : Observable < T >) =>
new Observable ( subscriber => {
return source . subscribe ({
next : value => subscriber . next ( value * multiplier ),
error : err => subscriber . error ( err ),
complete : () => subscriber . complete ()
});
});
}
// Usage
of ( 1 , 2 , 3 )
. pipe ( multiplyBy ( 10 ))
. subscribe ( x => console . log ( x ));
// Output: 10, 20, 30
Practical Examples
Search with Debouncing
import { fromEvent } from 'rxjs' ;
import { debounceTime , map , distinctUntilChanged , switchMap } from 'rxjs/operators' ;
import { ajax } from 'rxjs/ajax' ;
const searchBox = document . getElementById ( 'search' );
fromEvent ( searchBox , 'input' )
. pipe (
map ( event => ( event . target as HTMLInputElement ). value ),
debounceTime ( 300 ),
distinctUntilChanged (),
switchMap ( term => ajax ( `/api/search?q= ${ term } ` ))
)
. subscribe ( results => displayResults ( results ));
HTTP Request with Retry
import { ajax } from 'rxjs/ajax' ;
import { retry , catchError , map } from 'rxjs/operators' ;
import { of } from 'rxjs' ;
ajax ( '/api/data' )
. pipe (
map ( response => response . response ),
retry ( 3 ),
catchError ( err => {
console . error ( 'Failed after retries:' , err );
return of ({ error: true , message: 'Failed to load data' });
})
)
. subscribe ( data => console . log ( data ));
Polling with Auto-Stop
import { interval , fromEvent } from 'rxjs' ;
import { switchMap , takeUntil } from 'rxjs/operators' ;
import { ajax } from 'rxjs/ajax' ;
const stopButton = document . getElementById ( 'stop' );
const stop$ = fromEvent ( stopButton , 'click' );
interval ( 5000 )
. pipe (
switchMap (() => ajax ( '/api/status' )),
takeUntil ( stop$ )
)
. subscribe ( status => updateStatus ( status ));
import { fromEvent , combineLatest } from 'rxjs' ;
import { map , startWith } from 'rxjs/operators' ;
const email$ = fromEvent ( emailInput , 'input' ). pipe (
map ( e => ( e . target as HTMLInputElement ). value ),
map ( email => / ^ [ ^ \s@ ] + @ [ ^ \s@ ] + \. [ ^ \s@ ] + $ / . test ( email )),
startWith ( false )
);
const password$ = fromEvent ( passwordInput , 'input' ). pipe (
map ( e => ( e . target as HTMLInputElement ). value ),
map ( pwd => pwd . length >= 8 ),
startWith ( false )
);
combineLatest ([ email$ , password$ ])
. pipe (
map (([ emailValid , passwordValid ]) => emailValid && passwordValid )
)
. subscribe ( isValid => {
submitButton . disabled = ! isValid ;
});
Best Practices
Choose the right flattening operator:
mergeMap - When order doesn’t matter, parallel requests
switchMap - When only latest matters (search, autocomplete)
concatMap - When order matters (sequential updates)
exhaustMap - When you want to ignore until complete (form submit)
1. Use Operators Instead of Subscribe
// ✗ BAD: Logic in subscribe
users$ . subscribe ( users => {
const activeUsers = users . filter ( u => u . active );
const userNames = activeUsers . map ( u => u . name );
displayNames ( userNames );
});
// ✓ GOOD: Logic in operators
users$ . pipe (
map ( users => users . filter ( u => u . active )),
map ( users => users . map ( u => u . name ))
). subscribe ( names => displayNames ( names ));
2. Avoid Nested Subscriptions
// ✗ BAD: Nested subscriptions
getUser (). subscribe ( user => {
getPosts ( user . id ). subscribe ( posts => {
getComments ( posts [ 0 ]. id ). subscribe ( comments => {
console . log ( comments );
});
});
});
// ✓ GOOD: Flattening operators
getUser (). pipe (
switchMap ( user => getPosts ( user . id )),
switchMap ( posts => getComments ( posts [ 0 ]. id ))
). subscribe ( comments => console . log ( comments ));
3. Share Expensive Operations
import { share , shareReplay } from 'rxjs/operators' ;
// Multiple subscribers share one execution
const data$ = expensiveHttpCall (). pipe (
shareReplay ( 1 ) // Cache last value
);
data$ . subscribe ( d => console . log ( 'Sub 1:' , d ));
data$ . subscribe ( d => console . log ( 'Sub 2:' , d ));
// Only one HTTP request made
4. Use Type-Safe Operators
import { Observable } from 'rxjs' ;
import { map } from 'rxjs/operators' ;
interface User {
id : number ;
name : string ;
}
const users$ : Observable < User []> = getUsers ();
// TypeScript ensures type safety
const names$ : Observable < string []> = users$ . pipe (
map ( users => users . map ( u => u . name ))
);
Operator Selection Guide
Need Operator Transform values mapFilter values filterFlatten nested Observables mergeMap, switchMap, concatMapTake first N take(n)Debounce input debounceTimeRetry on error retryHandle errors catchErrorCombine latest from all combineLatestWait for all to complete forkJoinSide effects tapAccumulate values scanShare execution share, shareReplay
Observable - Operators transform Observables
Subscription - Operators can affect subscription lifecycle
Observer - Operators modify what Observers receive
Subject - Can be used with operators like any Observable