Isolate CPU-intensive job processing in separate processes or worker threads
Sandboxed processors run job processing code in isolated child processes or worker threads, preventing CPU-intensive operations from blocking the worker’s bookkeeping tasks and causing stalled jobs.
// processor.tsimport { SandboxedJob } from 'bullmq';module.exports = async (job: SandboxedJob) => { console.log('Processing job:', job.id); console.log('Job data:', job.data); // Perform CPU-intensive work const result = heavyComputation(job.data); // Report progress await job.updateProgress(50); // More work const final = moreHeavyWork(result); await job.updateProgress(100); // Return result return final;};function heavyComputation(data: any) { // CPU-intensive operations let result = 0; for (let i = 0; i < 1000000000; i++) { result += Math.sqrt(i); } return result;}
Then reference this file when creating the worker:
// worker.tsimport { Worker } from 'bullmq';import path from 'path';const processorFile = path.join(__dirname, 'processor.js');const worker = new Worker('queueName', processorFile, { connection: { host: 'localhost', port: 6379, }, concurrency: 4, // Run 4 sandboxed processes in parallel});console.log('Sandboxed worker started');
The processor file must be a path to a compiled JavaScript file (.js, .cjs, .mjs). If you’re using TypeScript, compile it first or use a file extension like .ts that your runtime supports.
Memory: Shared memory possible (with SharedArrayBuffer)
Startup: Faster thread creation
Resource usage: Less demanding than processes
Worker threads are lighter than child processes but still require duplicating the Node.js runtime. They’re not as lightweight as threads in other languages.
import os from 'os';const cpuCount = os.cpus().length;const worker = new Worker('queueName', processorFile, { concurrency: cpuCount, // One process/thread per CPU core});
For mixed workloads:
// Reserve some cores for the systemconst workerConcurrency = Math.max(1, cpuCount - 1);const worker = new Worker('queueName', processorFile, { concurrency: workerConcurrency, useWorkerThreads: true,});
Each sandboxed job requires spawning or reusing a process/thread:
// High overhead: New process per job (concurrency 1)const slowWorker = new Worker('queue', processorFile, { concurrency: 1,});// Optimized: Reuse processes/threads (concurrency > 1)const fastWorker = new Worker('queue', processorFile, { concurrency: 4, // Pool of 4 processes/threads});