Overview
Recursively projects each source value and each output value to an Observable, then merges all resulting Observables. Similar to mergeMap, but applies the projection function to its own output values as well, creating a recursive expansion.
expand is powerful for recursive operations like tree traversal, pagination, or any scenario where you need to repeatedly apply an operation to its own results.
Type Signature
function expand < T , O extends ObservableInput < unknown >>(
project : ( value : T , index : number ) => O ,
concurrent : number = Infinity
) : OperatorFunction < T , ObservedValueOf < O >>
Parameters
project
(value: T, index: number) => O
required
A function that maps each value (from the source OR from previous project results) to an Observable. The function receives:
value: The value to project (initial source value or recursively generated value)
index: The zero-based index of the emission
Must return an ObservableInput. If it returns an empty Observable, recursion stops for that branch.
Maximum number of input Observables being subscribed to concurrently. Controls how many recursive branches can be active simultaneously.
Returns
return
OperatorFunction<T, ObservedValueOf<O>>
A function that returns an Observable that emits all source values and all recursively projected values.
Usage Examples
Basic Example: Powers of Two
Power Expansion
Fibonacci Sequence
import { fromEvent , map , expand , of , delay , take } from 'rxjs' ;
const clicks = fromEvent ( document , 'click' );
const powersOfTwo = clicks . pipe (
map (() => 1 ),
expand ( x => of ( 2 * x ). pipe ( delay ( 1000 ))),
take ( 10 )
);
powersOfTwo . subscribe ( x => console . log ( x ));
// Click occurs:
// Immediately: 1
// After 1s: 2
// After 2s: 4
// After 3s: 8
// After 4s: 16
// ... up to 512 (10 values total)
Paginated API Calls
import { of , expand , map , EMPTY } from 'rxjs' ;
interface PageResponse {
data : any [];
nextPage : number | null ;
totalPages : number ;
}
function fetchPage ( page : number ) : Promise < PageResponse > {
return fetch ( `/api/items?page= ${ page } ` ). then ( res => res . json ());
}
// Start with page 1
of ( 1 ). pipe (
expand ( page => {
console . log ( `Fetching page ${ page } ` );
return from ( fetchPage ( page )). pipe (
mergeMap ( response => {
// If there's a next page, continue expansion
if ( response . nextPage ) {
return of ( response . nextPage );
}
// Otherwise, stop this branch of expansion
return EMPTY ;
})
);
}),
// Transform to actual fetch
mergeMap ( page => from ( fetchPage ( page ))),
// Collect all data
reduce (( acc , response ) => [ ... acc , ... response . data ], [] as any [])
). subscribe (
allData => console . log ( 'All pages loaded:' , allData ),
error => console . error ( 'Error loading pages:' , error )
);
Tree Traversal
import { of , expand , mergeMap , EMPTY } from 'rxjs' ;
interface TreeNode {
id : string ;
name : string ;
children ?: TreeNode [];
}
const tree : TreeNode = {
id: '1' ,
name: 'Root' ,
children: [
{
id: '2' ,
name: 'Child 1' ,
children: [
{ id: '4' , name: 'Grandchild 1' },
{ id: '5' , name: 'Grandchild 2' }
]
},
{
id: '3' ,
name: 'Child 2' ,
children: [
{ id: '6' , name: 'Grandchild 3' }
]
}
]
};
// Traverse entire tree
of ( tree ). pipe (
expand ( node => {
if ( node . children && node . children . length > 0 ) {
return from ( node . children );
}
return EMPTY ;
}),
map ( node => ({ id: node . id , name: node . name }))
). subscribe ( node => {
console . log ( 'Visited node:' , node );
});
// Output:
// Visited node: { id: '1', name: 'Root' }
// Visited node: { id: '2', name: 'Child 1' }
// Visited node: { id: '3', name: 'Child 2' }
// Visited node: { id: '4', name: 'Grandchild 1' }
// Visited node: { id: '5', name: 'Grandchild 2' }
// Visited node: { id: '6', name: 'Grandchild 3' }
Retry with Exponential Backoff
import { of , expand , delay , mergeMap , EMPTY , catchError } from 'rxjs' ;
interface RetryState {
attempt : number ;
maxAttempts : number ;
delayMs : number ;
}
function fetchWithRetry ( url : string ) {
return of ({ attempt: 1 , maxAttempts: 5 , delayMs: 1000 }). pipe (
expand (( state : RetryState ) => {
console . log ( `Attempt ${ state . attempt } of ${ state . maxAttempts } ` );
return from ( fetch ( url )). pipe (
mergeMap ( response => {
if ( response . ok ) {
return EMPTY ; // Success, stop retrying
}
throw new Error ( `HTTP ${ response . status } ` );
}),
catchError ( error => {
console . error ( `Attempt ${ state . attempt } failed:` , error . message );
if ( state . attempt >= state . maxAttempts ) {
throw error ; // Max attempts reached
}
// Retry with exponential backoff
const nextDelay = state . delayMs * 2 ;
return of ({
attempt: state . attempt + 1 ,
maxAttempts: state . maxAttempts ,
delayMs: nextDelay
}). pipe ( delay ( state . delayMs ));
})
);
}),
mergeMap ( state =>
from ( fetch ( url )). then ( res => res . json ())
)
);
}
fetchWithRetry ( '/api/unreliable-endpoint' ). subscribe (
data => console . log ( 'Success:' , data ),
error => console . error ( 'All retries failed:' , error )
);
Marble Diagram
Source: --1--------------|
Expand: |--2----------|
| |--4------|
| | |--8--|
| | | |-16|
Result: --1--2--4--8--16--|
Each value is re-emitted and also fed back through the project function.
Common Use Cases
Pagination : Fetch all pages of results recursively
Tree/Graph Traversal : Navigate hierarchical or graph structures
Recursive API Calls : Follow links or references in API responses
Exponential Sequences : Generate mathematical sequences
State Space Exploration : Search through possible states
Retry Logic : Implement sophisticated retry strategies
Breadth-First Search : Explore levels of a structure
If the project function always returns non-empty Observables, expand will continue indefinitely! Always ensure there’s a termination condition (return EMPTY) or use take() to limit emissions.
Advanced Example: Crawling Linked Data
import { of , expand , mergeMap , EMPTY , tap , distinct } from 'rxjs' ;
interface Page {
url : string ;
title : string ;
links : string [];
}
const visitedUrls = new Set < string >();
const maxDepth = 3 ;
function crawlWebsite ( startUrl : string ) {
return of ({ url: startUrl , depth: 0 }). pipe (
expand (({ url , depth }) => {
// Stop if max depth reached
if ( depth >= maxDepth ) {
return EMPTY ;
}
// Skip if already visited
if ( visitedUrls . has ( url )) {
return EMPTY ;
}
visitedUrls . add ( url );
console . log ( `Crawling: ${ url } (depth ${ depth } )` );
return from ( fetch ( url ). then ( res => res . json ())). pipe (
mergeMap (( page : Page ) => {
// Expand to all linked pages at next depth
return from ( page . links . map ( link => ({
url: link ,
depth: depth + 1
})));
}),
catchError ( error => {
console . error ( `Failed to crawl ${ url } :` , error );
return EMPTY ;
})
);
}, 3 ), // Max 3 concurrent requests
mergeMap (({ url }) =>
from ( fetch ( url ). then ( res => res . json ()))
),
distinct (( page : Page ) => page . url )
);
}
crawlWebsite ( 'https://example.com' ). subscribe (
page => console . log ( 'Crawled:' , page . title ),
error => console . error ( 'Crawl error:' , error ),
() => console . log ( 'Crawl complete' )
);
Controlling Concurrency
The concurrent parameter is crucial for performance. Setting it too high with I/O operations can overwhelm the system. Setting it too low may make expansion very slow.
// Breadth-first (one level at a time)
expand ( project , 1 )
// Moderate concurrency (recommended for network operations)
expand ( project , 5 )
// Unlimited concurrency (default, use with caution)
expand ( project , Infinity )