A lazy, fluent API for building async data pipelines. Operations are not executed until a terminal method (toArray, forEach, drain) is called.
AsyncStream
from
Creates a new AsyncStream from an iterable, async iterable, or promise of an iterable.
static from<T>(source: StreamSource<T>): AsyncStream<T>
Parameters
The source data. Can be an iterable, async iterable, or promise that resolves to an iterable.
Returns
A new AsyncStream instance.
map
Adds a concurrent map step to the pipeline.
map<R>(
mapper: (item: T, index: number) => Promise<R> | R,
options?: ConcurrencyOptions,
): AsyncStream<R>
Parameters
mapper
(item: T, index: number) => Promise<R> | R
required
Function to transform each item.
Options for controlling concurrency.
Maximum number of concurrent operations.
Optional AbortSignal to cancel the operation.
filter
Adds a filter step to the pipeline.
filter(
predicate: (item: T, index: number) => Promise<boolean> | boolean,
options?: ConcurrencyOptions,
): AsyncStream<T>
Parameters
predicate
(item: T, index: number) => Promise<boolean> | boolean
required
Function to test each item. Return true to keep the item.
Options for controlling concurrency.
retry
Adds a retry wrapper around the previous step.
retry(options?: { times?: number; delay?: number }): AsyncStream<T>
Parameters
Retry options.
Maximum number of attempts.
Delay between retries in milliseconds.
Terminal methods
toArray
Executes the pipeline and collects results into an array.
toArray(options?: StandardOptions): Promise<T[]>
Parameters
Execution options.
Optional AbortSignal to cancel the operation.
Returns
forEach
Executes the pipeline and calls fn for each result.
forEach(
fn: (item: T, index: number) => Promise<void> | void,
options?: StandardOptions,
): Promise<void>
Parameters
fn
(item: T, index: number) => Promise<void> | void
required
Function to call for each item.
drain
Executes the pipeline, discarding results.
drain(options?: StandardOptions): Promise<void>
Parameters
reduce
Executes the pipeline and reduces the results.
reduce<R>(
reducer: (accumulator: R, item: T, index: number) => Promise<R> | R,
initialValue: R,
options?: StandardOptions,
): Promise<R>
Parameters
reducer
(accumulator: R, item: T, index: number) => Promise<R> | R
required
Function to reduce items to a single value.
Initial accumulator value.
Returns
Examples
Basic map and filter
import { AsyncStream } from '@temelj/async';
const result = await AsyncStream.from([1, 2, 3, 4, 5])
.map(async (x) => x * 2)
.filter((x) => x > 5)
.toArray();
console.log(result); // [6, 8, 10]
Concurrent processing
import { AsyncStream } from '@temelj/async';
const users = await AsyncStream.from(userIds)
.map(
async (id) => {
const response = await fetch(`/api/users/${id}`);
return await response.json();
},
{ concurrency: 5 }
)
.toArray();
Async iterable source
import { AsyncStream } from '@temelj/async';
async function* generateNumbers() {
for (let i = 0; i < 10; i++) {
await delay(100);
yield i;
}
}
const result = await AsyncStream.from(generateNumbers())
.map((x) => x * 2)
.filter((x) => x % 4 === 0)
.toArray();
console.log(result); // [0, 4, 8, 12, 16]
forEach for side effects
import { AsyncStream } from '@temelj/async';
await AsyncStream.from(items)
.map(async (item) => processItem(item))
.forEach(async (result) => {
await saveToDatabase(result);
});
Reduce to single value
import { AsyncStream } from '@temelj/async';
const sum = await AsyncStream.from([1, 2, 3, 4, 5])
.map(async (x) => x * 2)
.reduce((acc, x) => acc + x, 0);
console.log(sum); // 30
Drain for side effects only
import { AsyncStream } from '@temelj/async';
await AsyncStream.from(notifications)
.map(async (notif) => {
await sendEmail(notif);
return notif;
})
.drain();
console.log('All emails sent');
Complex pipeline
import { AsyncStream } from '@temelj/async';
const results = await AsyncStream.from(urls)
.map(
async (url) => {
const response = await fetch(url);
return await response.json();
},
{ concurrency: 3 }
)
.filter((data) => data.isValid)
.map(async (data) => {
return await transformData(data);
})
.toArray();
With abort signal
import { AsyncStream } from '@temelj/async';
const controller = new AbortController();
const promise = AsyncStream.from(items)
.map(
async (item) => {
await delay(100);
return processItem(item);
},
{ signal: controller.signal }
)
.toArray();
// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);
try {
await promise;
} catch (error) {
console.log('Stream cancelled');
}