Skip to main content
For generators processing large datasets, doc-kit supports parallel processing using worker threads. This guide explains how to implement worker-based processing in your generators.

When to Use Parallel Processing

Use Workers When

  • Processing many independent items
  • Each item takes significant time
  • Operations are CPU-intensive
  • Dataset is large (hundreds+ of files)

Don't Use Workers When

  • Items have dependencies on each other
  • Output must be in specific order
  • Operation is I/O bound
  • Dataset is small (< 100 items)

How Parallel Processing Works

The framework:
  1. Splits input into chunks based on CPU cores
  2. Spawns worker threads (one per core)
  3. Each worker processes its assigned items using processChunk
  4. Main thread collects and streams results via generate

Implementing Parallel Processing

1

Enable Parallel Processing

Set hasParallelProcessor: true in generator metadata:
// src/generators/parallel-generator/index.mjs
import { createLazyGenerator } from '../../utils/generators.mjs';

/**
 * @type {import('./types').Generator}
 */
export default createLazyGenerator({
  name: 'parallel-generator',

  version: '1.0.0',

  description: 'Processes data in parallel',

  dependsOn: 'metadata',

  // Enable parallel processing
  hasParallelProcessor: true,
});
2

Define Types

Create types for processChunk in types.d.ts:
export type Generator = GeneratorMetadata<
  {
    // Custom configuration
    myOption: string;
  },
  Generate<InputType, AsyncGenerator<OutputType>>,
  // processChunk signature
  ProcessChunk<
    InputType[],           // Full input array
    OutputType[],          // Chunk results
    SerializableDeps       // Serializable dependencies
  >
>;
3

Implement processChunk

Create the processChunk function in generate.mjs:
// src/generators/parallel-generator/generate.mjs

/**
 * Process a chunk of items in a worker thread.
 * This function runs in isolated worker threads.
 *
 * @type {import('./types').Generator['processChunk']}
 */
export async function processChunk(fullInput, itemIndices, deps) {
  const results = [];

  // Process only the items at specified indices
  for (const idx of itemIndices) {
    const item = fullInput[idx];
    const result = await processItem(item, deps);
    results.push(result);
  }

  return results;
}

/**
 * Process a single item
 * @param {MetadataEntry} item
 * @param {object} deps
 */
async function processItem(item, deps) {
  // Your processing logic here
  return {
    name: item.heading.data.name,
    type: item.type,
    // ... transformed data
  };
}
processChunk runs in worker threads with no access to main thread state. Only use serializable data.
4

Implement generate

Implement the generate function to orchestrate workers:
/**
 * Main generation function that orchestrates worker threads
 *
 * @type {import('./types').Generator['generate']}
 */
export async function* generate(input, worker) {
  const config = getConfig('parallel-generator');

  // Prepare serializable dependencies
  const deps = {
    version: config.version.toString(),
    myOption: config.myOption,
    // Only include JSON-compatible data
  };

  // Collect input into array for chunking
  const inputArray = [];
  for await (const item of input) {
    inputArray.push(item);
  }

  // Stream chunks as they complete
  for await (const chunkResult of worker.stream(inputArray, inputArray, deps)) {
    // chunkResult is an array of processed items from one chunk
    
    // Yield each item individually
    for (const item of chunkResult) {
      yield item;
    }
    
    // Or yield the whole chunk
    // yield chunkResult;
  }
}

Key Concepts

Full Input and Item Indices

Workers receive the full input array but only process items at specified indices:
export async function processChunk(fullInput, itemIndices, deps) {
  const results = [];

  // fullInput: [item0, item1, item2, ..., item99]
  // itemIndices: [0, 1, 2] (first chunk) or [3, 4, 5] (second chunk), etc.

  for (const idx of itemIndices) {
    const item = fullInput[idx]; // Access only assigned items
    results.push(await processItem(item, deps));
  }

  return results;
}
This allows workers to access other items for context while processing their assigned items.

Serializable Dependencies

Only JSON-compatible data can be passed to workers:
const deps = {
  version: '1.0.0',          // String ✓
  maxSize: 1024,             // Number ✓
  enabled: true,             // Boolean ✓
  config: { key: 'value' },  // Plain object ✓
  list: [1, 2, 3],          // Array ✓
};
Converting non-serializable data:
import getConfig from '../../utils/configuration/index.mjs';

export async function* generate(input, worker) {
  const config = getConfig('my-generator');

  // Convert SemVer to string
  const deps = {
    version: config.version.toString(),
    
    // Extract only serializable parts
    ref: config.ref,
    
    // Read file contents instead of passing file handles
    template: await readFile(config.templatePath, 'utf-8'),
  };

  // ...
}

Worker Stream API

The worker.stream() method manages parallel processing:
worker.stream(
  input,        // Items to process (array)
  fullInput,    // Full input passed to processChunk
  deps          // Serializable dependencies
)
Parameters:
  • input - Array of items to split into chunks
  • fullInput - Full input array passed to each processChunk (usually same as input)
  • deps - Serializable dependencies passed to processChunk
Returns: AsyncGenerator<ChunkResult[]> Each iteration yields results from one completed chunk.

Processing Stream Results

export async function* generate(input, worker) {
  const inputArray = await collectAll(input);
  const deps = { /* ... */ };

  for await (const chunkResult of worker.stream(inputArray, inputArray, deps)) {
    // chunkResult is an array from one chunk
    for (const item of chunkResult) {
      yield item; // Yield items one by one
    }
  }
}

Complete Example

Here’s a complete parallel processing generator:
// src/generators/jsx-ast/index.mjs
import { createLazyGenerator } from '../../utils/generators.mjs';

/**
 * Generator for converting MDAST to JSX AST.
 *
 * @type {import('./types').Generator}
 */
export default createLazyGenerator({
  name: 'jsx-ast',

  version: '1.0.0',

  description: 'Generates JSX AST from the input MDAST',

  dependsOn: 'metadata',

  defaultConfiguration: {
    ref: 'main',
  },

  hasParallelProcessor: true,
});

Performance Considerations

Chunk Size

The framework automatically determines chunk size based on:
  • Number of CPU cores
  • Total number of items
  • Minimum chunk size (to avoid overhead)
You don’t need to configure this manually.

Worker Overhead

Worker threads have startup overhead. Parallel processing is beneficial when:
  • Processing time per item > Worker overhead per item
  • Generally beneficial for 100+ items with non-trivial processing

Memory Usage

Each worker receives a copy of fullInput and deps. For large datasets, this can use significant memory.
Optimization strategies:
  1. Only pass necessary data in deps
  2. Consider using itemIndices to access shared readonly data
  3. For very large datasets, consider streaming instead of collecting all input

Debugging Worker Issues

Common Issues

Cause: Trying to pass non-serializable data in depsSolution: Convert to JSON-compatible format:
// Bad
const deps = { version: config.version }; // SemVer object

// Good
const deps = { version: config.version.toString() };
Cause: Not correctly iterating itemIndicesSolution: Ensure you process only assigned indices:
export async function processChunk(fullInput, itemIndices, deps) {
  const results = [];
  
  // Correct
  for (const idx of itemIndices) {
    results.push(processItem(fullInput[idx]));
  }
  
  return results;
}
Cause: Accessing main thread state from workerSolution: Pass all needed data through deps:
// Bad - accessing outer scope
const VERSION = '1.0.0';
export async function processChunk(fullInput, itemIndices, deps) {
  console.log(VERSION); // undefined in worker!
}

// Good - passing through deps
export async function* generate(input, worker) {
  const deps = { version: '1.0.0' };
  await worker.stream(inputArray, inputArray, deps);
}

export async function processChunk(fullInput, itemIndices, deps) {
  console.log(deps.version); // Works!
}

Next Steps

Creating Custom Generators

Learn the basics of generator creation

Built-in Generators

See parallel processing in action

Build docs developers (and LLMs) love