Skip to main content
The streaming module provides utilities for detecting, collecting, and caching async generator results in doc-kit’s processing pipeline.

isAsyncGenerator

Checks if a value is an async generator/iterable.
import { isAsyncGenerator } from 'doc-kit/streaming';

const gen = async function* () {
  yield 1;
  yield 2;
};

if (isAsyncGenerator(gen())) {
  // Handle as async generator
}
obj
unknown
required
Value to check
return
boolean
true if the value is an async iterable (has Symbol.asyncIterator), false otherwise

Type Guard

This function acts as a TypeScript type guard:
function processResult(obj: unknown) {
  if (isAsyncGenerator(obj)) {
    // TypeScript knows obj is AsyncGenerator here
    for await (const chunk of obj) {
      // ...
    }
  }
}

collectAsyncGenerator

Collects all values from an async generator into a flat array. Each yielded chunk is spread into the results array.
import { collectAsyncGenerator } from 'doc-kit/streaming';

const generator = async function* () {
  yield [1, 2, 3];
  yield [4, 5];
  yield [6];
};

const results = await collectAsyncGenerator(generator());
// results = [1, 2, 3, 4, 5, 6]
generator
AsyncGenerator<T[]>
required
Async generator yielding arrays of items
return
Promise<T[]>
Promise resolving to flattened array of all yielded items

Debugging

The function logs collection progress:
  • Each chunk collected with item count
  • Final summary with total items and chunk count

createStreamingCache

Creates a cache for async generator collection results. Ensures that when multiple consumers request the same async generator, only one collection happens and all consumers share the result.
import { createStreamingCache } from 'doc-kit/streaming';

const streamingCache = createStreamingCache();

// First call starts collection
const result1 = streamingCache.getOrCollect('metadata', metadataGenerator());

// Second call reuses the same collection promise
const result2 = streamingCache.getOrCollect('metadata', metadataGenerator());

// Both resolve to the same array
await result1 === await result2; // true
return
object
Cache object with getOrCollect method

getOrCollect

Gets the collected result for a generator, starting collection if needed.
key
string
required
Cache key (usually generator name)
generator
AsyncGenerator<unknown[]>
required
The async generator to collect
return
Promise<unknown[]>
Promise resolving to collected results

Usage in Generator Pipeline

From the main generator orchestration (generators.mjs:20-43):
const createGenerator = () => {
  const cachedGenerators = {};
  const streamingCache = createStreamingCache();

  const getDependencyInput = async (dependsOn) => {
    if (!dependsOn) return undefined;

    const result = await cachedGenerators[dependsOn];

    // If dependency is streaming, collect its results
    if (isAsyncGenerator(result)) {
      return streamingCache.getOrCollect(dependsOn, result);
    }

    return result;
  };

  // Later when consuming results:
  const resultPromises = generators.map(async name => {
    let result = await cachedGenerators[name];

    if (isAsyncGenerator(result)) {
      result = await streamingCache.getOrCollect(name, result);
    }

    return result;
  });
};

Implementation Details

Chunk Flattening

The collectAsyncGenerator function spreads each chunk with results.push(...chunk) rather than pushing chunks as nested arrays. This is because generators yield arrays of processed items, and the final result should be a single flat array.

Cache Key Strategy

The cache uses string keys (typically generator names) to deduplicate collection. Once a key is cached, subsequent calls return the same promise, preventing redundant collection work.

Logging

All streaming utilities use the streaming logger namespace for debug output, helping trace collection behavior during pipeline execution.

Build docs developers (and LLMs) love