The streaming module provides utilities for detecting, collecting, and caching async generator results in doc-kit’s processing pipeline.
isAsyncGenerator
Checks if a value is an async generator/iterable.
import { isAsyncGenerator } from 'doc-kit/streaming';
const gen = async function* () {
yield 1;
yield 2;
};
if (isAsyncGenerator(gen())) {
// Handle as async generator
}
true if the value is an async iterable (has Symbol.asyncIterator), false otherwise
Type Guard
This function acts as a TypeScript type guard:
function processResult(obj: unknown) {
if (isAsyncGenerator(obj)) {
// TypeScript knows obj is AsyncGenerator here
for await (const chunk of obj) {
// ...
}
}
}
collectAsyncGenerator
Collects all values from an async generator into a flat array. Each yielded chunk is spread into the results array.
import { collectAsyncGenerator } from 'doc-kit/streaming';
const generator = async function* () {
yield [1, 2, 3];
yield [4, 5];
yield [6];
};
const results = await collectAsyncGenerator(generator());
// results = [1, 2, 3, 4, 5, 6]
generator
AsyncGenerator<T[]>
required
Async generator yielding arrays of items
Promise resolving to flattened array of all yielded items
Debugging
The function logs collection progress:
- Each chunk collected with item count
- Final summary with total items and chunk count
createStreamingCache
Creates a cache for async generator collection results. Ensures that when multiple consumers request the same async generator, only one collection happens and all consumers share the result.
import { createStreamingCache } from 'doc-kit/streaming';
const streamingCache = createStreamingCache();
// First call starts collection
const result1 = streamingCache.getOrCollect('metadata', metadataGenerator());
// Second call reuses the same collection promise
const result2 = streamingCache.getOrCollect('metadata', metadataGenerator());
// Both resolve to the same array
await result1 === await result2; // true
Cache object with getOrCollect method
getOrCollect
Gets the collected result for a generator, starting collection if needed.
Cache key (usually generator name)
generator
AsyncGenerator<unknown[]>
required
The async generator to collect
Promise resolving to collected results
Usage in Generator Pipeline
From the main generator orchestration (generators.mjs:20-43):
const createGenerator = () => {
const cachedGenerators = {};
const streamingCache = createStreamingCache();
const getDependencyInput = async (dependsOn) => {
if (!dependsOn) return undefined;
const result = await cachedGenerators[dependsOn];
// If dependency is streaming, collect its results
if (isAsyncGenerator(result)) {
return streamingCache.getOrCollect(dependsOn, result);
}
return result;
};
// Later when consuming results:
const resultPromises = generators.map(async name => {
let result = await cachedGenerators[name];
if (isAsyncGenerator(result)) {
result = await streamingCache.getOrCollect(name, result);
}
return result;
});
};
Implementation Details
Chunk Flattening
The collectAsyncGenerator function spreads each chunk with results.push(...chunk) rather than pushing chunks as nested arrays. This is because generators yield arrays of processed items, and the final result should be a single flat array.
Cache Key Strategy
The cache uses string keys (typically generator names) to deduplicate collection. Once a key is cached, subsequent calls return the same promise, preventing redundant collection work.
Logging
All streaming utilities use the streaming logger namespace for debug output, helping trace collection behavior during pipeline execution.