Architecture Pattern
The pipeline uses a Factory pattern to provide a unified interface for both local and remote job processing: Code References:src/pipeline/PipelineFactory.ts- Factory implementationsrc/pipeline/trpc/interfaces.ts:20-35- IPipeline interfacesrc/pipeline/PipelineManager.ts- Local implementationsrc/pipeline/PipelineClient.ts- Remote implementation
Core Components
Pipeline Factory
Location:src/pipeline/PipelineFactory.ts
Central factory that selects pipeline implementation based on configuration:
Naming clarifies the mode:
PipelineManager runs an in-process worker; PipelineClient connects to an out-of-process worker via tRPC.Pipeline Manager
Location:src/pipeline/PipelineManager.ts
Manages job queue and worker coordination for embedded processing.
Responsibilities:
- Job queue management with concurrency limits
- Worker lifecycle management
- Progress tracking and status updates
- Database state synchronization
- Job recovery after restart
On startup, the manager loads pending jobs from the database and resets
RUNNING jobs to QUEUED for re-execution.- Load
QUEUEDandRUNNINGjobs from database - Reset
RUNNINGjobs toQUEUEDstate - Resume processing with original configuration
- Maintain progress history
src/pipeline/PipelineManager.ts
Pipeline Client
Location:src/pipeline/PipelineClient.ts
Type-safe tRPC client providing identical interface to PipelineManager for external worker communication.
Features:
- tRPC client for remote job operations over HTTP
- Identical method signatures to
PipelineManager - Error handling and connection management
- Connectivity check via
pingprocedure - Event-driven job completion waiting
Pipeline Worker
Location:src/pipeline/PipelineWorker.ts
Executes individual jobs with progress reporting.
Execution Flow:
Process Steps:
- Fetch job configuration from queue
- Initialize scraper with job parameters
- Process content through scraper pipeline
- Update progress via callbacks
- Store results and mark completion
src/pipeline/PipelineWorker.ts
Job Lifecycle
Job States
State Descriptions:| State | Description |
|---|---|
QUEUED | Job created, waiting for worker |
RUNNING | Worker processing job |
COMPLETED | Successful completion |
FAILED | Error during processing |
CANCELLED | Manual cancellation |
State Transitions
All state transitions persist to database and emit events:src/pipeline/PipelineManager.ts
Progress Tracking
Jobs report progress through callback mechanism:- Pages discovered and processed
- Current processing status
- Error messages and warnings
- Processing rate (pages/min)
Write-Through Architecture
Pipeline jobs serve as the single source of truth, containing both runtime state and database fields.
Consistency Guarantee
All state changes immediately synchronize to database:- Immediate persistence ensures recovery capability
- No state drift between memory and database
- Event emission after persistence guarantees consistency
src/pipeline/PipelineManager.ts
Recovery Mechanism
Database state enables automatic recovery after crashes or restarts.
- Load pending jobs on startup
- Reset
RUNNINGjobs toQUEUED - Resume processing with original configuration
- Maintain progress history
Concurrency Management
Worker Pool
PipelineManager maintains configurable worker pool: Configuration:- Default concurrency: 3 workers
- Configurable via
DOCS_MCP_CONCURRENCYor--concurrency - Workers process jobs independently
- Queue coordination prevents conflicts
src/pipeline/PipelineManager.ts
Job Distribution
Jobs are distributed using FIFO queue with worker availability:- FIFO queue ordering
- Worker availability checking
- Load balancing across workers
- Graceful worker shutdown handling
External Worker RPC
tRPC Procedures
Location:src/pipeline/trpc/router.ts
Type-safe RPC procedures for remote worker communication:
| Procedure | Description |
|---|---|
ping | Connectivity check |
enqueueJob | Create new job |
getJobs | List jobs with optional filtering |
getJob | Get job details by ID |
cancelJob | Cancel a running job |
clearCompletedJobs | Remove finished jobs |
subscribeToEvents | WebSocket subscription for events |
Data Contracts
Requests and responses use shared TypeScript types through tRPC, ensuring end-to-end type safety.
Error Handling
Errors propagate as structured tRPC errors:BAD_REQUEST: Invalid input parametersNOT_FOUND: Job not foundINTERNAL_SERVER_ERROR: Processing errorTIMEOUT: Job execution timeout
Configuration Persistence
Job Configuration
Each job stores complete scraper configuration in the database:versions.config column (JSON)
Code Reference: src/store/DocumentManagementService.ts
Reproducible Processing
Stored configuration enables exact re-indexing with the same parameters, ensuring consistent results across runs.
- Re-index documentation with same settings
- Debug processing issues
- Audit configuration changes
- Version-specific processing rules
Monitoring and Observability
Progress Reporting
Real-time progress updates through multiple channels: Update Frequency:- Progress callbacks: Every page processed
- Database persistence: Every state change
- Event emission: Every update
- UI polling: Every 3 seconds (fallback)
src/pipeline/PipelineWorker.ts
Error Tracking
Comprehensive error information:- Exception stack traces
- Processing context at failure
- Retry attempt logging
- User-friendly error messages
src/pipeline/PipelineManager.ts
Performance Metrics
Job execution metrics tracked:- Processing Duration: Start to completion time
- Pages Per Minute: Processing rate
- Memory Usage: Peak memory consumption
- Queue Depth: Pending jobs count
- Worker Utilization: Active vs idle workers
Scaling Patterns
Vertical Scaling
Increase processing power within single process:Higher Concurrency
Increase worker count with
--concurrencyMore Memory
Allocate more RAM for large documents
Faster Storage
Use SSD for database and cache
Better CPU
Faster processing per worker
Horizontal Scaling
Distribute workers across processes:Multiple Workers
Deploy separate worker containers
Load Balancer
Distribute jobs across workers
Independent Scaling
Scale workers without coordinator
Connection Pool
Manage database connections
Hybrid Deployment
Combine embedded and external workers:- Coordinator with embedded workers for baseline
- Additional external workers for peak load
- Flexible resource allocation
- Cost-optimized scaling
Next Steps
Event Bus
Learn about real-time event architecture
Content Processing
Understand content transformation
