Streaming Architecture
doc-kit uses async generators to stream results from parallel processing. This allows generators to yield chunks of data as they’re processed, rather than waiting for all work to complete.
Why Streaming?
Streaming provides several benefits:
Memory efficiency - Process and yield data incrementally
Faster time-to-first-result - Start consuming results while work continues
Better progress feedback - See chunks complete in real-time
Parallel collection - Multiple consumers can collect the same stream
Async Generators
An async generator is a function that yields promises:
async function* generateNumbers () {
yield 1 ;
yield 2 ;
yield 3 ;
}
// Consume with for-await-of
for await ( const num of generateNumbers ()) {
console . log ( num ); // 1, 2, 3
}
In doc-kit
Generators with parallel processing return async generators:
{
hasParallelProcessor : true ,
async generate ( input , worker ) {
// Generator function (returns AsyncGenerator)
return ( async function* () {
// Stream results from worker
for await ( const chunk of worker . stream ( items , input , extra )) {
yield chunk ; // Each chunk contains processed items
}
})();
},
}
Detecting Async Generators
The system checks if a value is an async generator using:
// From src/streaming.mjs:13-16
export const isAsyncGenerator = obj =>
obj !== null &&
typeof obj === 'object' &&
typeof obj [ Symbol . asyncIterator ] === 'function' ;
Usage
const result = await generate ( input );
if ( isAsyncGenerator ( result )) {
// It's a stream - collect it
const collected = await collectAsyncGenerator ( result );
} else {
// It's a regular value - use directly
return result ;
}
Collecting Async Generators
Async generators are collected into flat arrays:
// From src/streaming.mjs:26-47
export const collectAsyncGenerator = async generator => {
const results = [];
let chunkCount = 0 ;
for await ( const chunk of generator ) {
chunkCount ++ ;
results . push ( ... chunk ); // Flatten chunks into single array
streamingLogger . debug ( `Collected chunk ${ chunkCount } ` , {
itemsInChunk: chunk . length ,
});
}
streamingLogger . debug ( `Collection complete` , {
totalItems: results . length ,
chunks: chunkCount ,
});
return results ;
};
Example
async function* generateChunks () {
yield [ 1 , 2 , 3 ]; // Chunk 1
yield [ 4 , 5 ]; // Chunk 2
yield [ 6 , 7 , 8 , 9 ]; // Chunk 3
}
const result = await collectAsyncGenerator ( generateChunks ());
// Result: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Each yielded chunk is spread into the results array. Always yield arrays, not individual items.
Streaming Cache
The streaming cache ensures async generators are only collected once:
// From src/streaming.mjs:54-82
export const createStreamingCache = () => {
const cache = new Map ();
return {
getOrCollect ( key , generator ) {
const hasKey = cache . has ( key );
if ( ! hasKey ) {
// Start collection and cache the promise
cache . set ( key , collectAsyncGenerator ( generator ));
}
streamingLogger . debug (
hasKey
? `Using cached result for " ${ key } "`
: `Starting collection for " ${ key } "`
);
return cache . get ( key );
},
};
};
Why Cache?
Imagine two generators depend on the same streaming generator:
const streamingCache = createStreamingCache ();
// First consumer
const result1 = await streamingCache . getOrCollect ( 'json' , jsonGenerator );
// Collection starts
// Second consumer (while first is still collecting)
const result2 = await streamingCache . getOrCollect ( 'json' , jsonGenerator );
// Returns same promise - no duplicate collection!
Without caching, the generator would be collected twice, doubling the work.
Integration with Generator System
The orchestration system uses the streaming cache automatically:
// From src/generators.mjs:18-20
const cachedGenerators = {};
const streamingCache = createStreamingCache ();
// When getting dependency input
const getDependencyInput = async dependsOn => {
if ( ! dependsOn ) return undefined ;
const result = await cachedGenerators [ dependsOn ];
// If it's an async generator, collect it (with caching)
if ( isAsyncGenerator ( result )) {
return streamingCache . getOrCollect ( dependsOn , result );
}
return result ;
};
Streaming vs Non-Streaming Flow
Non-Streaming Generator
{
async generate ( input ) {
const result = await processAll ( input );
return result ; // Wait for everything
}
}
Streaming Generator
{
hasParallelProcessor : true ,
async generate ( input , worker ) {
return ( async function* () {
for await ( const chunk of worker . stream ( items , input , extra )) {
yield chunk ; // Stream results as they're ready
}
})();
}
}
Example: Complete Streaming Flow
1. Generator Yields Chunks
// In your generator
hasParallelProcessor : true ,
async generate ( input , worker ) {
const items = input . map ( extractItems );
return ( async function* () {
// Worker distributes items across threads
for await ( const chunk of worker . stream ( items , input , {})) {
// chunk = [result1, result2, ...] (100 items)
yield chunk ;
}
})();
}
2. System Detects Async Generator
// In src/generators.mjs
const result = await generate ( dependencyInput , await worker );
if ( ! isAsyncGenerator ( result )) {
generatorsLogger . debug ( `Completed " ${ generatorName } "` );
}
// For streaming, completion is logged when collection finishes
3. Consumer Collects Stream
// In src/generators.mjs:115-122
const resultPromises = generators . map ( async name => {
let result = await cachedGenerators [ name ];
if ( isAsyncGenerator ( result )) {
// Collect the stream (or get cached result)
result = await streamingCache . getOrCollect ( name , result );
}
return result ;
});
4. Results Collected and Cached
// From collectAsyncGenerator
const results = [];
for await ( const chunk of generator ) {
results . push ( ... chunk ); // Flatten chunks
}
return results ; // [item1, item2, item3, ...]
Memory Usage
Streaming - Only holds current chunk in memory
Non-streaming - Holds entire result in memory
// Non-streaming: 1GB array in memory
const result = await processAll ( millionItems );
// Streaming: 100 items at a time (~100KB chunks)
for await ( const chunk of processStreaming ( millionItems )) {
// Previous chunks can be garbage collected
yield chunk ;
}
Time to First Result
Streaming - First chunk available immediately when any thread completes
Non-streaming - Must wait for all processing to finish
Parallelism
Streaming - Results yielded as soon as any chunk completes (Promise.race)
Non-streaming - Sequential processing or wait for all parallel work
Best Practices
DO: Yield Arrays
async function* goodGenerator () {
yield [ 1 , 2 , 3 ]; // ✅ Array
yield [ 4 , 5 ]; // ✅ Array
}
DON’T: Yield Individual Items
async function* badGenerator () {
yield 1 ; // ❌ Not an array
yield 2 ; // ❌ Not an array
}
DO: Use Consistent Chunk Sizes
// Good - ~100 items per chunk
for ( let i = 0 ; i < items . length ; i += 100 ) {
yield items . slice ( i , i + 100 );
}
DON’T: Yield Empty Arrays
if ( chunk . length > 0 ) {
yield chunk ; // ✅ Only yield non-empty chunks
}
Debugging Streaming Issues
Enable Streaming Logs
DEBUG = doc-kit:streaming npm run generate
Sample Output
doc-kit:streaming Starting collection for "json-simple" +0ms
doc-kit:streaming Collected chunk 1 { itemsInChunk: 100 } +50ms
doc-kit:streaming Collected chunk 2 { itemsInChunk: 100 } +45ms
doc-kit:streaming Collected chunk 3 { itemsInChunk: 100 } +48ms
doc-kit:streaming Collection complete { totalItems: 300, chunks: 3 } +2ms
Common Patterns
Pattern 1: Simple Streaming Generator
{
hasParallelProcessor : true ,
async generate ( input , worker ) {
return ( async function* () {
for await ( const chunk of worker . stream ( input , input , {})) {
yield chunk ;
}
})();
},
processChunk ( items , indices ) {
return indices . map ( i => transform ( items [ i ]));
},
}
Pattern 2: Post-Processing Chunks
async generate ( input , worker ) {
return ( async function* () {
for await ( const chunk of worker . stream ( items , input , {})) {
// Transform chunk before yielding
const processed = chunk . map ( addMetadata );
yield processed ;
}
})();
}
Pattern 3: Filtering Chunks
async generate ( input , worker ) {
return ( async function* () {
for await ( const chunk of worker . stream ( items , input , {})) {
// Filter chunk before yielding
const filtered = chunk . filter ( isValid );
if ( filtered . length > 0 ) {
yield filtered ;
}
}
})();
}
Next Steps
Worker Threads Understand the parallel processing implementation
Architecture See how streaming fits into the overall system