Skip to main content

Overview

Recursively projects each source value and each output value to an Observable, then merges all resulting Observables. Similar to mergeMap, but applies the projection function to its own output values as well, creating a recursive expansion.
expand is powerful for recursive operations like tree traversal, pagination, or any scenario where you need to repeatedly apply an operation to its own results.

Type Signature

function expand<T, O extends ObservableInput<unknown>>(
  project: (value: T, index: number) => O,
  concurrent: number = Infinity
): OperatorFunction<T, ObservedValueOf<O>>

Parameters

project
(value: T, index: number) => O
required
A function that maps each value (from the source OR from previous project results) to an Observable. The function receives:
  • value: The value to project (initial source value or recursively generated value)
  • index: The zero-based index of the emission
Must return an ObservableInput. If it returns an empty Observable, recursion stops for that branch.
concurrent
number
default:"Infinity"
Maximum number of input Observables being subscribed to concurrently. Controls how many recursive branches can be active simultaneously.

Returns

return
OperatorFunction<T, ObservedValueOf<O>>
A function that returns an Observable that emits all source values and all recursively projected values.

Usage Examples

Basic Example: Powers of Two

import { fromEvent, map, expand, of, delay, take } from 'rxjs';

const clicks = fromEvent(document, 'click');

const powersOfTwo = clicks.pipe(
  map(() => 1),
  expand(x => of(2 * x).pipe(delay(1000))),
  take(10)
);

powersOfTwo.subscribe(x => console.log(x));
// Click occurs:
// Immediately: 1
// After 1s: 2
// After 2s: 4
// After 3s: 8
// After 4s: 16
// ... up to 512 (10 values total)

Paginated API Calls

import { of, expand, map, EMPTY } from 'rxjs';

interface PageResponse {
  data: any[];
  nextPage: number | null;
  totalPages: number;
}

function fetchPage(page: number): Promise<PageResponse> {
  return fetch(`/api/items?page=${page}`).then(res => res.json());
}

// Start with page 1
of(1).pipe(
  expand(page => {
    console.log(`Fetching page ${page}`);
    return from(fetchPage(page)).pipe(
      mergeMap(response => {
        // If there's a next page, continue expansion
        if (response.nextPage) {
          return of(response.nextPage);
        }
        // Otherwise, stop this branch of expansion
        return EMPTY;
      })
    );
  }),
  // Transform to actual fetch
  mergeMap(page => from(fetchPage(page))),
  // Collect all data
  reduce((acc, response) => [...acc, ...response.data], [] as any[])
).subscribe(
  allData => console.log('All pages loaded:', allData),
  error => console.error('Error loading pages:', error)
);

Tree Traversal

import { of, expand, mergeMap, EMPTY } from 'rxjs';

interface TreeNode {
  id: string;
  name: string;
  children?: TreeNode[];
}

const tree: TreeNode = {
  id: '1',
  name: 'Root',
  children: [
    {
      id: '2',
      name: 'Child 1',
      children: [
        { id: '4', name: 'Grandchild 1' },
        { id: '5', name: 'Grandchild 2' }
      ]
    },
    {
      id: '3',
      name: 'Child 2',
      children: [
        { id: '6', name: 'Grandchild 3' }
      ]
    }
  ]
};

// Traverse entire tree
of(tree).pipe(
  expand(node => {
    if (node.children && node.children.length > 0) {
      return from(node.children);
    }
    return EMPTY;
  }),
  map(node => ({ id: node.id, name: node.name }))
).subscribe(node => {
  console.log('Visited node:', node);
});

// Output:
// Visited node: { id: '1', name: 'Root' }
// Visited node: { id: '2', name: 'Child 1' }
// Visited node: { id: '3', name: 'Child 2' }
// Visited node: { id: '4', name: 'Grandchild 1' }
// Visited node: { id: '5', name: 'Grandchild 2' }
// Visited node: { id: '6', name: 'Grandchild 3' }

Retry with Exponential Backoff

import { of, expand, delay, mergeMap, EMPTY, catchError } from 'rxjs';

interface RetryState {
  attempt: number;
  maxAttempts: number;
  delayMs: number;
}

function fetchWithRetry(url: string) {
  return of({ attempt: 1, maxAttempts: 5, delayMs: 1000 }).pipe(
    expand((state: RetryState) => {
      console.log(`Attempt ${state.attempt} of ${state.maxAttempts}`);
      
      return from(fetch(url)).pipe(
        mergeMap(response => {
          if (response.ok) {
            return EMPTY; // Success, stop retrying
          }
          throw new Error(`HTTP ${response.status}`);
        }),
        catchError(error => {
          console.error(`Attempt ${state.attempt} failed:`, error.message);
          
          if (state.attempt >= state.maxAttempts) {
            throw error; // Max attempts reached
          }
          
          // Retry with exponential backoff
          const nextDelay = state.delayMs * 2;
          return of({
            attempt: state.attempt + 1,
            maxAttempts: state.maxAttempts,
            delayMs: nextDelay
          }).pipe(delay(state.delayMs));
        })
      );
    }),
    mergeMap(state => 
      from(fetch(url)).then(res => res.json())
    )
  );
}

fetchWithRetry('/api/unreliable-endpoint').subscribe(
  data => console.log('Success:', data),
  error => console.error('All retries failed:', error)
);

Marble Diagram

Source:   --1--------------|
Expand:     |--2----------|
            |  |--4------|
            |  |  |--8--|
            |  |  |  |-16|
Result:   --1--2--4--8--16--|
Each value is re-emitted and also fed back through the project function.

Common Use Cases

  1. Pagination: Fetch all pages of results recursively
  2. Tree/Graph Traversal: Navigate hierarchical or graph structures
  3. Recursive API Calls: Follow links or references in API responses
  4. Exponential Sequences: Generate mathematical sequences
  5. State Space Exploration: Search through possible states
  6. Retry Logic: Implement sophisticated retry strategies
  7. Breadth-First Search: Explore levels of a structure
If the project function always returns non-empty Observables, expand will continue indefinitely! Always ensure there’s a termination condition (return EMPTY) or use take() to limit emissions.

Advanced Example: Crawling Linked Data

import { of, expand, mergeMap, EMPTY, tap, distinct } from 'rxjs';

interface Page {
  url: string;
  title: string;
  links: string[];
}

const visitedUrls = new Set<string>();
const maxDepth = 3;

function crawlWebsite(startUrl: string) {
  return of({ url: startUrl, depth: 0 }).pipe(
    expand(({ url, depth }) => {
      // Stop if max depth reached
      if (depth >= maxDepth) {
        return EMPTY;
      }
      
      // Skip if already visited
      if (visitedUrls.has(url)) {
        return EMPTY;
      }
      
      visitedUrls.add(url);
      console.log(`Crawling: ${url} (depth ${depth})`);
      
      return from(fetch(url).then(res => res.json())).pipe(
        mergeMap((page: Page) => {
          // Expand to all linked pages at next depth
          return from(page.links.map(link => ({ 
            url: link, 
            depth: depth + 1 
          })));
        }),
        catchError(error => {
          console.error(`Failed to crawl ${url}:`, error);
          return EMPTY;
        })
      );
    }, 3), // Max 3 concurrent requests
    mergeMap(({ url }) => 
      from(fetch(url).then(res => res.json()))
    ),
    distinct((page: Page) => page.url)
  );
}

crawlWebsite('https://example.com').subscribe(
  page => console.log('Crawled:', page.title),
  error => console.error('Crawl error:', error),
  () => console.log('Crawl complete')
);

Controlling Concurrency

The concurrent parameter is crucial for performance. Setting it too high with I/O operations can overwhelm the system. Setting it too low may make expansion very slow.
// Breadth-first (one level at a time)
expand(project, 1)

// Moderate concurrency (recommended for network operations)
expand(project, 5)

// Unlimited concurrency (default, use with caution)
expand(project, Infinity)