Skip to main content
The @temelj/async package provides a comprehensive set of utilities for working with asynchronous operations in TypeScript. It includes concurrency control, retry logic, rate limiting, synchronization primitives, and error handling.

Installation

npm install @temelj/async

Core functions

attempt

Safely executes a function (synchronous or asynchronous) and wraps its result in a Promise.
function attempt<T, Args extends unknown[]>(
  fn: (...args: Args) => T | PromiseLike<T>,
  args?: Args,
  options?: StandardOptions,
): Promise<T>
import { attempt } from "@temelj/async";

const result = await attempt(() => fetchData());

delay

Resolves after the specified duration in milliseconds. Rejects immediately with AbortError if the signal is aborted.
function delay(ms: number, options?: StandardOptions): Promise<void>
import { delay } from "@temelj/async";

await delay(1000); // Wait for 1 second

retry

Retries a function upon failure with configurable backoff.
function retry<T>(
  fn: (attempt: number) => Promise<T>,
  options?: RetryOptions,
): Promise<T>
interface RetryOptions extends StandardOptions {
  /** Maximum number of attempts. Default: 3. */
  times?: number;
  /** Delay between retries in ms, or a function computing the delay from the attempt number. */
  delay?: number | ((attempt: number) => number);
  /** Predicate to decide whether to retry on a given error. */
  shouldRetry?: (error: unknown) => boolean;
}
import { retry } from "@temelj/async";

const result = await retry(
  async (attempt) => {
    console.log(`Attempt ${attempt + 1}`);
    return await fetchData();
  },
  { times: 3 }
);

timeout

Races a promise against a timer. Rejects with TimeoutError if the promise does not resolve within the specified milliseconds.
function timeout<T>(
  promise: PromiseLike<T> | (() => PromiseLike<T> | T),
  ms: number,
  options?: StandardOptions & { fallback?: T },
): Promise<T>
import { timeout } from "@temelj/async";

try {
  const result = await timeout(fetchData(), 5000);
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log("Operation timed out");
  }
}

wait

Waits until a predicate returns true, polling at a configurable interval.
function wait(
  predicate: () => Promise<boolean> | boolean,
  options?: StandardOptions & {
    interval?: number;
    timeout?: number;
  },
): Promise<void>
import { wait } from "@temelj/async";

let isReady = false;

// Poll every 100ms until ready
await wait(() => isReady, { interval: 100, timeout: 5000 });

Concurrency control

map

Maps over an iterable concurrently with configurable concurrency and resilience. Preserves order of output even if tasks finish out of order.
function map<T, R>(
  input: AsyncIterable<T> | Iterable<T> | Promise<Iterable<T>>,
  mapper: (item: T, index: number) => Promise<R | SkipSymbol> | R | SkipSymbol,
  options?: ConcurrencyOptions & ResilienceOptions,
): Promise<R[]>
import { map } from "@temelj/async";

const urls = ["url1", "url2", "url3"];
const results = await map(urls, async (url) => {
  return await fetch(url).then(r => r.json());
});

limit

Returns a wrapper around a function that enforces concurrency limits. All calls to the wrapper share the same concurrency pool.
function limit<Args extends unknown[], R>(
  fn: (...args: Args) => PromiseLike<R> | R,
  concurrency: number,
  options?: StandardOptions,
): (...args: Args) => Promise<R>
import { limit } from "@temelj/async";

const limitedFetch = limit(fetch, 3);

// Only 3 requests will run concurrently
const results = await Promise.all([
  limitedFetch("url1"),
  limitedFetch("url2"),
  limitedFetch("url3"),
  limitedFetch("url4"), // Waits for one to complete
  limitedFetch("url5"), // Waits for one to complete
]);

reduce

Performs an async reduction over an iterable, processing items serially.
function reduce<T, R>(
  input: AsyncIterable<T> | Iterable<T>,
  reducer: (accumulator: R, item: T, index: number) => Promise<R> | R,
  initialValue: R,
  options?: StandardOptions,
): Promise<R>
import { reduce } from "@temelj/async";

const sum = await reduce(
  [1, 2, 3, 4, 5],
  async (acc, num) => acc + num,
  0
);
// Result: 15

Rate limiting

debounce

Creates a debounced function that delays invoking the function until after wait milliseconds have elapsed since the last invocation.
function debounce<Args extends unknown[], R>(
  fn: (...args: Args) => PromiseLike<R> | R,
  waitMs: number,
  options?: DebounceOptions,
): (...args: Args) => Promise<R>
interface DebounceOptions {
  /** Fire on the leading edge. Default: false. */
  leading?: boolean;
  /** Fire on the trailing edge. Default: true. */
  trailing?: boolean;
  /** Optional AbortSignal to cancel debouncing. */
  signal?: AbortSignal;
}
import { debounce } from "@temelj/async";

const debouncedSearch = debounce(async (query: string) => {
  return await api.search(query);
}, 300);

// Only the last call within 300ms will execute
debouncedSearch("a");
debouncedSearch("ab");
debouncedSearch("abc"); // Only this one executes

throttle

Creates an async-aware throttled function that only invokes the function at most once per interval.
function throttle<Args extends unknown[], R>(
  fn: (...args: Args) => PromiseLike<R> | R,
  intervalMs: number,
  options?: StandardOptions,
): (...args: Args) => Promise<R>
import { throttle } from "@temelj/async";

const throttledScroll = throttle(async () => {
  await updateScrollPosition();
}, 100);

// Will execute at most once every 100ms
window.addEventListener("scroll", throttledScroll);

Synchronization primitives

Mutex

A mutual exclusion lock that ensures only one task accesses a resource at a time.
class Mutex {
  acquire(signal?: AbortSignal): Promise<() => void>
  runExclusive<T>(fn: () => Promise<T>, signal?: AbortSignal): Promise<T>
}
import { Mutex } from "@temelj/async";

const mutex = new Mutex();
let counter = 0;

async function increment() {
  const release = await mutex.acquire();
  try {
    counter++;
  } finally {
    release();
  }
}

Barrier

A synchronization barrier that waits until a specified number of tasks have called wait. Once the capacity is reached, all waiting tasks are released simultaneously.
class Barrier {
  constructor(capacity: number)
  wait(signal?: AbortSignal): Promise<void>
}
import { Barrier } from "@temelj/async";

const barrier = new Barrier(3);

async function worker(id: number) {
  console.log(`Worker ${id} starting`);
  await doWork();
  console.log(`Worker ${id} waiting at barrier`);
  await barrier.wait();
  console.log(`Worker ${id} released`);
}

// All workers wait until 3 have reached the barrier
Promise.all([worker(1), worker(2), worker(3)]);

Queue

A class-based task runner with priority support and concurrency control.
class Queue {
  constructor(options?: QueueOptions)
  add<T>(fn: () => PromiseLike<T> | T, options?: AddOptions): Promise<T>
  addAll<T>(fns: (() => Promise<T>)[], options?: AddOptions): Promise<T[]>
  pause(): void
  resume(): void
  clear(): void
  get size(): number
  get pending(): number
  get onIdle(): Promise<void>
}
interface QueueOptions {
  concurrency?: number;
  autoStart?: boolean;
}

interface AddOptions {
  priority?: number;
  signal?: AbortSignal;
}
import { Queue } from "@temelj/async";

const queue = new Queue({ concurrency: 2 });

queue.add(async () => await task1());
queue.add(async () => await task2());
queue.add(async () => await task3());

await queue.onIdle; // Wait for all tasks to complete

Stream processing

AsyncStream

A lazy, fluent API for building async data pipelines. Operations are not executed until a terminal method is called.
class AsyncStream<T> {
  static from<T>(source: StreamSource<T>): AsyncStream<T>
  map<R>(mapper: (item: T, index: number) => Promise<R> | R, options?: ConcurrencyOptions): AsyncStream<R>
  filter(predicate: (item: T, index: number) => Promise<boolean> | boolean, options?: ConcurrencyOptions): AsyncStream<T>
  retry(options?: { times?: number; delay?: number }): AsyncStream<T>
  toArray(options?: StandardOptions): Promise<T[]>
  forEach(fn: (item: T, index: number) => Promise<void> | void, options?: StandardOptions): Promise<void>
  drain(options?: StandardOptions): Promise<void>
  reduce<R>(reducer: (accumulator: R, item: T, index: number) => Promise<R> | R, initialValue: R, options?: StandardOptions): Promise<R>
}
import { AsyncStream } from "@temelj/async";

const results = await AsyncStream.from([1, 2, 3, 4, 5])
  .map(async (n) => n * 2)
  .filter((n) => n > 5)
  .toArray();
// Result: [6, 8, 10]

Utilities

defer

Creates a deferred promise, exposing resolve and reject alongside the promise itself.
function defer<T>(): Deferred<T>

interface Deferred<T> {
  readonly promise: Promise<T>;
  readonly resolve: (value: T | PromiseLike<T>) => void;
  readonly reject: (reason?: unknown) => void;
}
import { defer } from "@temelj/async";

const deferred = defer<string>();

setTimeout(() => {
  deferred.resolve("Done!");
}, 1000);

const result = await deferred.promise;

Error types

AbortError

Error thrown when an operation is aborted via an AbortSignal.
class AbortError extends Error

TimeoutError

Error thrown when an operation exceeds its allowed time limit.
class TimeoutError extends Error

Type definitions

StandardOptions

interface StandardOptions {
  /** The signal to abort the operation. */
  signal?: AbortSignal;
}

ConcurrencyOptions

interface ConcurrencyOptions extends StandardOptions {
  /** Max number of concurrent operations. Default: Infinity. */
  concurrency?: number;
}

ResilienceOptions

interface ResilienceOptions extends StandardOptions {
  /** If true, the operation fails immediately on the first error. Default: true. */
  stopOnError?: boolean;
}

Skip symbol

A unique symbol used to skip items during iteration without throwing errors.
const Skip: unique symbol
type SkipSymbol = typeof Skip
Return Skip from a mapper function to omit the value from the result without throwing an error.

Build docs developers (and LLMs) love