Configure parallel job processing with BullMQ workers
BullMQ provides two approaches to achieve concurrency: using a local concurrency factor or running multiple worker instances across different processes or machines.
When concurrency is set to a value greater than 1, the worker will:
Fetch multiple jobs from the queue (up to the concurrency limit)
Process them concurrently using Node.js’s event loop
Maintain “at-least-once” delivery guarantees
Preserve the order of job fetching (not processing)
// This worker can process 10 jobs at the same timeconst worker = new Worker( 'emailQueue', async (job: Job) => { // Send email asynchronously await emailService.send(job.data); return { sent: true, timestamp: Date.now() }; }, { concurrency: 10 },);
Concurrency is most effective for I/O-bound operations like database queries, HTTP requests, or file system operations. Node.js handles these efficiently through its event loop.
For CPU-intensive operations that block the event loop, use Sandboxed Processors instead of increasing concurrency. High concurrency with blocking operations can lead to stalled jobs.
When to use concurrency vs sandboxing:
// ✅ Good: I/O-bound with high concurrencyconst ioWorker = new Worker( 'apiQueue', async (job: Job) => { // Non-blocking async operations const response = await fetch(job.data.url); await database.save(response.data); return response.data; }, { concurrency: 50 }, // Safe to run many in parallel);// ❌ Bad: CPU-bound with high concurrencyconst cpuWorker = new Worker( 'imageQueue', async (job: Job) => { // Blocks the event loop! const processed = heavyImageProcessing(job.data.image); return processed; }, { concurrency: 50 }, // Will cause stalled jobs!);// ✅ Good: CPU-bound with sandboxingconst sandboxedWorker = new Worker( 'imageQueue', path.join(__dirname, 'image-processor.js'), // External file { concurrency: os.cpus().length }, // Match CPU cores);
// worker.jsimport { Worker } from 'bullmq';import cluster from 'cluster';import os from 'os';if (cluster.isPrimary) { const numCPUs = os.cpus().length; console.log(`Starting ${numCPUs} worker processes`); for (let i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} died, starting a new one`); cluster.fork(); });} else { // Each process runs a worker with concurrency const worker = new Worker( 'queueName', processorFunction, { connection: { host: 'localhost', port: 6379 }, concurrency: 5, // Each process handles 5 jobs concurrently }, ); console.log(`Worker process ${process.pid} started`);}
Worker concurrency controls how many jobs a single worker processes simultaneously. For limiting jobs across all workers, see Global Concurrency.
// Worker concurrency: This worker processes 10 jobs at onceconst worker1 = new Worker('queue', processor, { concurrency: 10 });const worker2 = new Worker('queue', processor, { concurrency: 10 });// Total: Up to 20 jobs processing simultaneously// Global concurrency: Maximum 1 job across ALL workersconst queue = new Queue('queue', { connection, defaultJobOptions: { group: { id: 'global-limit', concurrency: 1, // Only 1 job from this group at a time }, },});