Skip to main content
The pipeline system manages asynchronous document processing with persistent job state and coordinated execution across embedded or external workers.

Architecture Pattern

The pipeline uses a Factory pattern to provide a unified interface for both local and remote job processing: Code References:
  • src/pipeline/PipelineFactory.ts - Factory implementation
  • src/pipeline/trpc/interfaces.ts:20-35 - IPipeline interface
  • src/pipeline/PipelineManager.ts - Local implementation
  • src/pipeline/PipelineClient.ts - Remote implementation

Core Components

Pipeline Factory

Location: src/pipeline/PipelineFactory.ts Central factory that selects pipeline implementation based on configuration:
interface PipelineOptions {
  recoverJobs?: boolean;   // Enable job recovery from database
  serverUrl?: string;      // External worker URL
  concurrency?: number;    // Worker concurrency limit
}
Selection Logic:
1

Check for External Worker

If serverUrl is specified → Create PipelineClient
2

Check Recovery Mode

If recoverJobs: true → Create PipelineManager with recovery
3

Default to Immediate Mode

If recoverJobs: false → Create PipelineManager without recovery
Naming clarifies the mode: PipelineManager runs an in-process worker; PipelineClient connects to an out-of-process worker via tRPC.

Pipeline Manager

Location: src/pipeline/PipelineManager.ts Manages job queue and worker coordination for embedded processing. Responsibilities:
  • Job queue management with concurrency limits
  • Worker lifecycle management
  • Progress tracking and status updates
  • Database state synchronization
  • Job recovery after restart
Job Recovery:
On startup, the manager loads pending jobs from the database and resets RUNNING jobs to QUEUED for re-execution.
Recovery Process:
  1. Load QUEUED and RUNNING jobs from database
  2. Reset RUNNING jobs to QUEUED state
  3. Resume processing with original configuration
  4. Maintain progress history
Code Reference: src/pipeline/PipelineManager.ts

Pipeline Client

Location: src/pipeline/PipelineClient.ts Type-safe tRPC client providing identical interface to PipelineManager for external worker communication. Features:
  • tRPC client for remote job operations over HTTP
  • Identical method signatures to PipelineManager
  • Error handling and connection management
  • Connectivity check via ping procedure
  • Event-driven job completion waiting
Key Methods:
class PipelineClient implements IPipeline {
  async enqueueScrapeJob(options): Promise<string>
  async getJob(jobId): Promise<PipelineJob>
  async cancelJob(jobId): Promise<void>
  async waitForJobCompletion(jobId): Promise<PipelineJob>
}

Pipeline Worker

Location: src/pipeline/PipelineWorker.ts Executes individual jobs with progress reporting. Execution Flow: Process Steps:
  1. Fetch job configuration from queue
  2. Initialize scraper with job parameters
  3. Process content through scraper pipeline
  4. Update progress via callbacks
  5. Store results and mark completion
Code Reference: src/pipeline/PipelineWorker.ts

Job Lifecycle

Job States

State Descriptions:
StateDescription
QUEUEDJob created, waiting for worker
RUNNINGWorker processing job
COMPLETEDSuccessful completion
FAILEDError during processing
CANCELLEDManual cancellation

State Transitions

All state transitions persist to database and emit events:
// Example state transition
await this.updateJobStatus(jobId, 'RUNNING');
// 1. Update in-memory job state
// 2. Persist to database
// 3. Emit JOB_STATUS_CHANGE event to EventBus
Code Reference: src/pipeline/PipelineManager.ts

Progress Tracking

Jobs report progress through callback mechanism:
interface ScraperProgressEvent {
  pagesDiscovered: number;
  pagesProcessed: number;
  currentUrl?: string;
  status: string;
  errors?: string[];
}
Progress Flow: Tracked Metrics:
  • Pages discovered and processed
  • Current processing status
  • Error messages and warnings
  • Processing rate (pages/min)

Write-Through Architecture

Pipeline jobs serve as the single source of truth, containing both runtime state and database fields.

Consistency Guarantee

All state changes immediately synchronize to database:
class PipelineManager {
  private async updateJobStatus(jobId: string, status: JobStatus) {
    // 1. Update in-memory state
    const job = this.jobs.get(jobId);
    job.status = status;
    
    // 2. Persist to database (write-through)
    await this.docService.updateVersion(jobId, { status });
    
    // 3. Emit event
    this.eventBus.emit('JOB_STATUS_CHANGE', job);
  }
}
Benefits:
  • Immediate persistence ensures recovery capability
  • No state drift between memory and database
  • Event emission after persistence guarantees consistency
Code Reference: src/pipeline/PipelineManager.ts

Recovery Mechanism

Database state enables automatic recovery after crashes or restarts.
Recovery Process:
  1. Load pending jobs on startup
  2. Reset RUNNING jobs to QUEUED
  3. Resume processing with original configuration
  4. Maintain progress history
async start() {
  if (this.recoverJobs) {
    const pendingJobs = await this.loadPendingJobs();
    for (const job of pendingJobs) {
      if (job.status === 'RUNNING') {
        await this.updateJobStatus(job.id, 'QUEUED');
      }
      this.queue.push(job);
    }
  }
  this.startWorkers();
}

Concurrency Management

Worker Pool

PipelineManager maintains configurable worker pool: Configuration:
  • Default concurrency: 3 workers
  • Configurable via DOCS_MCP_CONCURRENCY or --concurrency
  • Workers process jobs independently
  • Queue coordination prevents conflicts
Code Reference: src/pipeline/PipelineManager.ts

Job Distribution

Jobs are distributed using FIFO queue with worker availability:
private async processQueue() {
  while (this.queue.length > 0 && this.hasAvailableWorker()) {
    const job = this.queue.shift();
    const worker = this.getAvailableWorker();
    await worker.execute(job);
  }
}
Strategy:
  • FIFO queue ordering
  • Worker availability checking
  • Load balancing across workers
  • Graceful worker shutdown handling

External Worker RPC

tRPC Procedures

Location: src/pipeline/trpc/router.ts Type-safe RPC procedures for remote worker communication:
ProcedureDescription
pingConnectivity check
enqueueJobCreate new job
getJobsList jobs with optional filtering
getJobGet job details by ID
cancelJobCancel a running job
clearCompletedJobsRemove finished jobs
subscribeToEventsWebSocket subscription for events
Example Client Usage:
const client = createTRPCClient({
  url: 'http://worker:8080/api'
});

const jobId = await client.enqueueJob.mutate({
  libraryName: 'react',
  version: '18.0.0',
  url: 'https://react.dev'
});

Data Contracts

Requests and responses use shared TypeScript types through tRPC, ensuring end-to-end type safety.
Type Safety Flow:
// Shared types in src/types/
interface ScrapeJobOptions {
  libraryName: string;
  version: string;
  url: string;
  // ...
}

// Server procedure
const router = trpc.router({
  enqueueJob: trpc.procedure
    .input(z.object({ /* Zod schema */ }))
    .mutation(async ({ input }) => {
      // Type-safe input
      return jobId;
    })
});

// Client gets full type safety
const jobId = await client.enqueueJob.mutate(options);

Error Handling

Errors propagate as structured tRPC errors:
import { TRPCError } from '@trpc/server';

throw new TRPCError({
  code: 'BAD_REQUEST',
  message: 'Invalid job configuration'
});
Error Codes:
  • BAD_REQUEST: Invalid input parameters
  • NOT_FOUND: Job not found
  • INTERNAL_SERVER_ERROR: Processing error
  • TIMEOUT: Job execution timeout

Configuration Persistence

Job Configuration

Each job stores complete scraper configuration in the database:
interface PipelineJob {
  id: string;
  libraryName: string;
  version: string;
  status: JobStatus;
  config: {
    url: string;
    maxDepth?: number;
    followLinks?: boolean;
    selectors?: string[];
    // ... all scraper options
  };
}
Storage Location: versions.config column (JSON) Code Reference: src/store/DocumentManagementService.ts

Reproducible Processing

Stored configuration enables exact re-indexing with the same parameters, ensuring consistent results across runs.
Use Cases:
  • Re-index documentation with same settings
  • Debug processing issues
  • Audit configuration changes
  • Version-specific processing rules

Monitoring and Observability

Progress Reporting

Real-time progress updates through multiple channels: Update Frequency:
  • Progress callbacks: Every page processed
  • Database persistence: Every state change
  • Event emission: Every update
  • UI polling: Every 3 seconds (fallback)
Code Reference: src/pipeline/PipelineWorker.ts

Error Tracking

Comprehensive error information:
interface JobError {
  message: string;
  stack?: string;
  context: {
    url?: string;
    page?: number;
    retryCount: number;
  };
}
Error Storage:
  • Exception stack traces
  • Processing context at failure
  • Retry attempt logging
  • User-friendly error messages
Code Reference: src/pipeline/PipelineManager.ts

Performance Metrics

Job execution metrics tracked:
  • Processing Duration: Start to completion time
  • Pages Per Minute: Processing rate
  • Memory Usage: Peak memory consumption
  • Queue Depth: Pending jobs count
  • Worker Utilization: Active vs idle workers

Scaling Patterns

Vertical Scaling

Increase processing power within single process:

Higher Concurrency

Increase worker count with --concurrency

More Memory

Allocate more RAM for large documents

Faster Storage

Use SSD for database and cache

Better CPU

Faster processing per worker
Configuration:
docs-mcp-server --concurrency 10

Horizontal Scaling

Distribute workers across processes:

Multiple Workers

Deploy separate worker containers

Load Balancer

Distribute jobs across workers

Independent Scaling

Scale workers without coordinator

Connection Pool

Manage database connections
Architecture:
# Coordinator
docs-mcp-server mcp --server-url http://worker-lb:8080/api

# Workers (scaled independently)
docs-mcp-server worker --port 8080

Hybrid Deployment

Combine embedded and external workers:
  • Coordinator with embedded workers for baseline
  • Additional external workers for peak load
  • Flexible resource allocation
  • Cost-optimized scaling

Next Steps

Event Bus

Learn about real-time event architecture

Content Processing

Understand content transformation

Build docs developers (and LLMs) love