Skip to main content

Overview

The toArray operator collects all values emitted by the source Observable and emits them as a single array when the source completes. This is useful when you need to gather all emissions into one collection.
toArray waits for the source to complete before emitting. If the source never completes or emits infinite values, the array will grow indefinitely, causing memory issues.

Signature

function toArray<T>(): OperatorFunction<T, T[]>

Parameters

No parameters.

Returns

return
OperatorFunction<T, T[]>
An Observable that emits a single array containing all values emitted by the source Observable, emitted when the source completes.

Usage Examples

Basic Usage

Collect interval values into an array:
import { interval, take, toArray } from 'rxjs';

const source = interval(1000);
const example = source.pipe(
  take(10),
  toArray()
);

example.subscribe(value => console.log(value));

// Output (after 10 seconds):
// [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Collect Filtered Results

Filter then collect into array:
import { range, filter, toArray } from 'rxjs';

range(1, 100).pipe(
  filter(x => x % 2 === 0), // Even numbers only
  toArray()
).subscribe(evenNumbers => {
  console.log('Even numbers:', evenNumbers);
});

// Output:
// Even numbers: [2, 4, 6, 8, ..., 100]

Collect HTTP Responses

Make multiple requests and collect results:
import { from, mergeMap, toArray } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const userIds = [1, 2, 3, 4, 5];

from(userIds).pipe(
  mergeMap(id => 
    ajax.getJSON(`/api/users/${id}`)
  ),
  toArray()
).subscribe(users => {
  console.log('All users loaded:', users);
  displayUsers(users);
});

Collect and Process

Collect all values then perform aggregate operations:
import { fromEvent, map, takeUntil, toArray } from 'rxjs';
import { timer } from 'rxjs';

const clicks$ = fromEvent(document, 'click');
const stop$ = timer(10000);

clicks$.pipe(
  map(event => ({ x: event.clientX, y: event.clientY })),
  takeUntil(stop$),
  toArray()
).subscribe(clickPositions => {
  console.log('Total clicks:', clickPositions.length);
  
  // Calculate average position
  const avgX = clickPositions.reduce((sum, pos) => sum + pos.x, 0) / clickPositions.length;
  const avgY = clickPositions.reduce((sum, pos) => sum + pos.y, 0) / clickPositions.length;
  
  console.log('Average click position:', { x: avgX, y: avgY });
});

Form Data Collection

Collect form values over time:
import { fromEvent, map, takeWhile, toArray } from 'rxjs';

const input = document.querySelector('#search');
const submitBtn = document.querySelector('#submit');

let submitted = false;

fromEvent(input, 'input').pipe(
  map(event => event.target.value),
  takeWhile(() => !submitted),
  toArray()
).subscribe(searchHistory => {
  console.log('Search history:', searchHistory);
});

fromEvent(submitBtn, 'click').subscribe(() => {
  submitted = true;
});

Collect Transformed Values

Apply transformations before collecting:
import { of, map, toArray } from 'rxjs';

interface Product {
  id: number;
  name: string;
  price: number;
}

const products: Product[] = [
  { id: 1, name: 'Widget', price: 10 },
  { id: 2, name: 'Gadget', price: 20 },
  { id: 3, name: 'Gizmo', price: 15 }
];

from(products).pipe(
  map(product => ({
    ...product,
    priceWithTax: product.price * 1.1
  })),
  toArray()
).subscribe(productsWithTax => {
  console.log('Products with tax:', productsWithTax);
});

How It Works

toArray is implemented using the reduce operator:
const arrReducer = (arr: any[], value: any) => (arr.push(value), arr);

export function toArray<T>(): OperatorFunction<T, T[]> {
  return (source) =>
    new Observable((subscriber) => {
      reduce(arrReducer, [] as T[])(source).subscribe(subscriber);
    });
}
It creates an empty array and pushes each emitted value into it, then emits the complete array when the source completes.

Important Behaviors

Completion Required: toArray only emits when the source completes. If the source never completes, you’ll never receive the array.
Error Handling: If the source errors before completing, no array is emitted - the error propagates immediately.
import { of, throwError, concat, toArray } from 'rxjs';

// This emits the array
of(1, 2, 3).pipe(toArray()).subscribe(console.log);
// Output: [1, 2, 3]

// This errors before emitting array
concat(
  of(1, 2, 3),
  throwError(() => new Error('Oops'))
).pipe(
  toArray()
).subscribe({
  next: console.log,      // Never called
  error: console.error    // Called with error
});

Memory Considerations

Be careful with infinite or long-running streams. Each value is stored in memory until completion.
// ❌ BAD - will accumulate forever
interval(1000).pipe(
  toArray()
).subscribe(); // Memory leak!

// ✅ GOOD - limited number of values
interval(1000).pipe(
  take(100),
  toArray()
).subscribe(); // OK - stops after 100 values

Common Use Cases

  1. Collecting API Results: Gather multiple HTTP responses
  2. Event Aggregation: Collect user interactions over time period
  3. Data Processing: Collect stream values for batch processing
  4. Testing: Collect all emissions for assertions
  5. Analytics: Gather metrics before sending to server
  6. Form Validation: Collect all validation results

Alternative: scan for Progressive Updates

If you need the array before completion, use scan:
import { interval, take, scan } from 'rxjs';

interval(1000).pipe(
  take(5),
  scan((acc, val) => [...acc, val], [] as number[])
).subscribe(array => {
  console.log('Current array:', array);
});

// Output:
// Current array: [0]
// Current array: [0, 1]
// Current array: [0, 1, 2]
// Current array: [0, 1, 2, 3]
// Current array: [0, 1, 2, 3, 4]

Comparison with Other Operators

OperatorEmitsWhenMemory
toArray()Once (array)On completeAll values
scan(reducer, [])Each emissionEach valueGrowing array
reduce(reducer, [])OnceOn completeAll values
bufferCount(n)Arrays of n itemsEvery n valuesLast n values

Testing Example

Useful in unit tests to collect all values:
import { TestScheduler } from 'rxjs/testing';
import { toArray } from 'rxjs';

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('a-b-c|', { a: 1, b: 2, c: 3 });
  const expected = '------(x|)';
  const values = { x: [1, 2, 3] };
  
  expectObservable(
    source$.pipe(toArray())
  ).toBe(expected, values);
});

Practical Pattern: Batch Processing

Collect items for batch processing:
import { fromEvent, bufferTime, mergeMap, toArray } from 'rxjs';
import { ajax } from 'rxjs/ajax';

interface AnalyticsEvent {
  type: string;
  timestamp: number;
}

const trackEvent = (event: AnalyticsEvent) => {
  // Track event
};

const events$ = merge(
  fromEvent(button1, 'click').pipe(map(() => ({ type: 'button1' }))),
  fromEvent(button2, 'click').pipe(map(() => ({ type: 'button2' })))
).pipe(
  map(event => ({ ...event, timestamp: Date.now() }))
);

// Send analytics in batches every 5 seconds
events$.pipe(
  bufferTime(5000),
  mergeMap(events => 
    ajax.post('/api/analytics', events)
  )
).subscribe();
  • reduce - Reduce to single value with custom reducer
  • scan - Accumulate values (emits each step)
  • bufferCount - Buffer into arrays of N items
  • bufferTime - Buffer by time window
  • take - Limit number of emissions

See Also