Implement worker-based parallel processing in generators
For generators processing large datasets, doc-kit supports parallel processing using worker threads. This guide explains how to implement worker-based processing in your generators.
// src/generators/parallel-generator/generate.mjs/** * Process a chunk of items in a worker thread. * This function runs in isolated worker threads. * * @type {import('./types').Generator['processChunk']} */export async function processChunk(fullInput, itemIndices, deps) { const results = []; // Process only the items at specified indices for (const idx of itemIndices) { const item = fullInput[idx]; const result = await processItem(item, deps); results.push(result); } return results;}/** * Process a single item * @param {MetadataEntry} item * @param {object} deps */async function processItem(item, deps) { // Your processing logic here return { name: item.heading.data.name, type: item.type, // ... transformed data };}
processChunk runs in worker threads with no access to main thread state. Only use serializable data.
4
Implement generate
Implement the generate function to orchestrate workers:
/** * Main generation function that orchestrates worker threads * * @type {import('./types').Generator['generate']} */export async function* generate(input, worker) { const config = getConfig('parallel-generator'); // Prepare serializable dependencies const deps = { version: config.version.toString(), myOption: config.myOption, // Only include JSON-compatible data }; // Collect input into array for chunking const inputArray = []; for await (const item of input) { inputArray.push(item); } // Stream chunks as they complete for await (const chunkResult of worker.stream(inputArray, inputArray, deps)) { // chunkResult is an array of processed items from one chunk // Yield each item individually for (const item of chunkResult) { yield item; } // Or yield the whole chunk // yield chunkResult; }}
export async function* generate(input, worker) { const inputArray = await collectAll(input); const deps = { /* ... */ }; for await (const chunkResult of worker.stream(inputArray, inputArray, deps)) { // chunkResult is an array from one chunk for (const item of chunkResult) { yield item; // Yield items one by one } }}