Skip to main content
Performs an async reduction over an iterable, processing items serially. Each item is processed one at a time in order.

Signature

async function reduce<T, R>(
  input: AsyncIterable<T> | Iterable<T>,
  reducer: (accumulator: R, item: T, index: number) => Promise<R> | R,
  initialValue: R,
  options?: StandardOptions,
): Promise<R>

Parameters

input
AsyncIterable<T> | Iterable<T>
required
The iterable to reduce. Can be an array or async iterable.
reducer
(accumulator: R, item: T, index: number) => Promise<R> | R
required
The reducer function. Receives the accumulator, current item, and index. Returns the new accumulator value.
initialValue
R
required
The initial value of the accumulator.
options
StandardOptions
Reduction options.

Returns

A promise that resolves with the final accumulated value.

Throws

  • AbortError if the signal is aborted
  • Any error thrown by the reducer function

Examples

Sum numbers

import { reduce } from '@temelj/async';

const numbers = [1, 2, 3, 4, 5];
const sum = await reduce(numbers, async (acc, n) => acc + n, 0);

console.log(sum); // 15

Concatenate strings

import { reduce } from '@temelj/async';

const words = ['Hello', 'async', 'world'];
const sentence = await reduce(
  words,
  async (acc, word) => acc + ' ' + word,
  ''
);

console.log(sentence.trim()); // 'Hello async world'

Build object from array

import { reduce } from '@temelj/async';

interface User {
  id: number;
  name: string;
}

const users: User[] = [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 3, name: 'Charlie' },
];

const userMap = await reduce(
  users,
  async (acc, user) => {
    acc[user.id] = user;
    return acc;
  },
  {} as Record<number, User>
);

console.log(userMap);
// { 1: { id: 1, name: 'Alice' }, 2: { id: 2, name: 'Bob' }, ... }

Sequential API calls with accumulation

import { reduce } from '@temelj/async';

const userIds = [1, 2, 3, 4, 5];

const allUserData = await reduce(
  userIds,
  async (acc, id) => {
    const response = await fetch(`/api/users/${id}`);
    const user = await response.json();
    acc.push(user);
    return acc;
  },
  [] as User[]
);

Aggregate statistics

import { reduce } from '@temelj/async';

interface Stats {
  count: number;
  sum: number;
  min: number;
  max: number;
}

const numbers = [5, 2, 8, 1, 9, 3];

const stats = await reduce(
  numbers,
  async (acc, n) => ({
    count: acc.count + 1,
    sum: acc.sum + n,
    min: Math.min(acc.min, n),
    max: Math.max(acc.max, n),
  }),
  { count: 0, sum: 0, min: Infinity, max: -Infinity } as Stats
);

console.log(stats);
// { count: 6, sum: 28, min: 1, max: 9 }
console.log(stats.sum / stats.count); // Average: 4.67

With async iterable

import { reduce } from '@temelj/async';

async function* generateNumbers() {
  for (let i = 1; i <= 5; i++) {
    await delay(100);
    yield i;
  }
}

const sum = await reduce(
  generateNumbers(),
  async (acc, n) => acc + n,
  0
);

console.log(sum); // 15

File content aggregation

import { reduce } from '@temelj/async';
import { readFile } from 'fs/promises';

const files = ['file1.txt', 'file2.txt', 'file3.txt'];

const combinedContent = await reduce(
  files,
  async (acc, filename) => {
    const content = await readFile(filename, 'utf-8');
    return acc + content + '\n';
  },
  ''
);

Pipeline with transformations

import { reduce } from '@temelj/async';

interface ProcessingResult {
  processed: number;
  failed: number;
  results: any[];
}

const items = [1, 2, 3, 4, 5];

const result = await reduce(
  items,
  async (acc, item) => {
    try {
      const processed = await processItem(item);
      return {
        processed: acc.processed + 1,
        failed: acc.failed,
        results: [...acc.results, processed],
      };
    } catch (error) {
      return {
        processed: acc.processed,
        failed: acc.failed + 1,
        results: acc.results,
      };
    }
  },
  { processed: 0, failed: 0, results: [] } as ProcessingResult
);

console.log(`Processed: ${result.processed}, Failed: ${result.failed}`);

Dependency chain

import { reduce } from '@temelj/async';

interface Task {
  name: string;
  execute: (context: any) => Promise<any>;
}

const tasks: Task[] = [
  {
    name: 'fetch',
    execute: async () => await fetchData(),
  },
  {
    name: 'transform',
    execute: async (data) => await transformData(data),
  },
  {
    name: 'save',
    execute: async (data) => await saveData(data),
  },
];

// Each task receives output from previous task
const finalResult = await reduce(
  tasks,
  async (context, task) => {
    console.log(`Executing task: ${task.name}`);
    return await task.execute(context);
  },
  null as any
);

With abort signal

import { reduce } from '@temelj/async';

const controller = new AbortController();

const task = reduce(
  [1, 2, 3, 4, 5],
  async (acc, n, index) => {
    await delay(200);
    console.log(`Processing item ${index}`);
    return acc + n;
  },
  0,
  { signal: controller.signal }
);

// Cancel after 500ms
setTimeout(() => controller.abort(), 500);

try {
  await task;
} catch (error) {
  if (error instanceof AbortError) {
    console.log('Reduction cancelled');
  }
}

Group by with async operations

import { reduce } from '@temelj/async';

interface Item {
  id: number;
  category: string;
  value: number;
}

const items: Item[] = [
  { id: 1, category: 'A', value: 10 },
  { id: 2, category: 'B', value: 20 },
  { id: 3, category: 'A', value: 30 },
];

const grouped = await reduce(
  items,
  async (acc, item) => {
    // Could involve async validation or enrichment
    await validateItem(item);
    
    if (!acc[item.category]) {
      acc[item.category] = [];
    }
    acc[item.category].push(item);
    return acc;
  },
  {} as Record<string, Item[]>
);

console.log(grouped);
// { A: [{ id: 1, ... }, { id: 3, ... }], B: [{ id: 2, ... }] }

Build docs developers (and LLMs) love