Skip to main content
A lazy, fluent API for building async data pipelines. Operations are not executed until a terminal method (toArray, forEach, drain) is called.

AsyncStream

from

Creates a new AsyncStream from an iterable, async iterable, or promise of an iterable.
static from<T>(source: StreamSource<T>): AsyncStream<T>

Parameters

source
StreamSource<T>
required
The source data. Can be an iterable, async iterable, or promise that resolves to an iterable.

Returns

A new AsyncStream instance.

Transformation methods

map

Adds a concurrent map step to the pipeline.
map<R>(
  mapper: (item: T, index: number) => Promise<R> | R,
  options?: ConcurrencyOptions,
): AsyncStream<R>

Parameters

mapper
(item: T, index: number) => Promise<R> | R
required
Function to transform each item.
options
ConcurrencyOptions
Options for controlling concurrency.

filter

Adds a filter step to the pipeline.
filter(
  predicate: (item: T, index: number) => Promise<boolean> | boolean,
  options?: ConcurrencyOptions,
): AsyncStream<T>

Parameters

predicate
(item: T, index: number) => Promise<boolean> | boolean
required
Function to test each item. Return true to keep the item.
options
ConcurrencyOptions
Options for controlling concurrency.

retry

Adds a retry wrapper around the previous step.
retry(options?: { times?: number; delay?: number }): AsyncStream<T>

Parameters

options
object
Retry options.

Terminal methods

toArray

Executes the pipeline and collects results into an array.
toArray(options?: StandardOptions): Promise<T[]>

Parameters

options
StandardOptions
Execution options.

Returns

An array of all results.

forEach

Executes the pipeline and calls fn for each result.
forEach(
  fn: (item: T, index: number) => Promise<void> | void,
  options?: StandardOptions,
): Promise<void>

Parameters

fn
(item: T, index: number) => Promise<void> | void
required
Function to call for each item.
options
StandardOptions
Execution options.

drain

Executes the pipeline, discarding results.
drain(options?: StandardOptions): Promise<void>

Parameters

options
StandardOptions
Execution options.

reduce

Executes the pipeline and reduces the results.
reduce<R>(
  reducer: (accumulator: R, item: T, index: number) => Promise<R> | R,
  initialValue: R,
  options?: StandardOptions,
): Promise<R>

Parameters

reducer
(accumulator: R, item: T, index: number) => Promise<R> | R
required
Function to reduce items to a single value.
initialValue
R
required
Initial accumulator value.
options
StandardOptions
Execution options.

Returns

The final reduced value.

Examples

Basic map and filter

import { AsyncStream } from '@temelj/async';

const result = await AsyncStream.from([1, 2, 3, 4, 5])
  .map(async (x) => x * 2)
  .filter((x) => x > 5)
  .toArray();

console.log(result); // [6, 8, 10]

Concurrent processing

import { AsyncStream } from '@temelj/async';

const users = await AsyncStream.from(userIds)
  .map(
    async (id) => {
      const response = await fetch(`/api/users/${id}`);
      return await response.json();
    },
    { concurrency: 5 }
  )
  .toArray();

Async iterable source

import { AsyncStream } from '@temelj/async';

async function* generateNumbers() {
  for (let i = 0; i < 10; i++) {
    await delay(100);
    yield i;
  }
}

const result = await AsyncStream.from(generateNumbers())
  .map((x) => x * 2)
  .filter((x) => x % 4 === 0)
  .toArray();

console.log(result); // [0, 4, 8, 12, 16]

forEach for side effects

import { AsyncStream } from '@temelj/async';

await AsyncStream.from(items)
  .map(async (item) => processItem(item))
  .forEach(async (result) => {
    await saveToDatabase(result);
  });

Reduce to single value

import { AsyncStream } from '@temelj/async';

const sum = await AsyncStream.from([1, 2, 3, 4, 5])
  .map(async (x) => x * 2)
  .reduce((acc, x) => acc + x, 0);

console.log(sum); // 30

Drain for side effects only

import { AsyncStream } from '@temelj/async';

await AsyncStream.from(notifications)
  .map(async (notif) => {
    await sendEmail(notif);
    return notif;
  })
  .drain();

console.log('All emails sent');

Complex pipeline

import { AsyncStream } from '@temelj/async';

const results = await AsyncStream.from(urls)
  .map(
    async (url) => {
      const response = await fetch(url);
      return await response.json();
    },
    { concurrency: 3 }
  )
  .filter((data) => data.isValid)
  .map(async (data) => {
    return await transformData(data);
  })
  .toArray();

With abort signal

import { AsyncStream } from '@temelj/async';

const controller = new AbortController();

const promise = AsyncStream.from(items)
  .map(
    async (item) => {
      await delay(100);
      return processItem(item);
    },
    { signal: controller.signal }
  )
  .toArray();

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

try {
  await promise;
} catch (error) {
  console.log('Stream cancelled');
}

Build docs developers (and LLMs) love