Skip to main content
BullMQ provides two approaches to achieve concurrency: using a local concurrency factor or running multiple worker instances across different processes or machines.

Local Concurrency Factor

The concurrency option determines how many jobs a single worker instance can process simultaneously:
import { Worker, Job } from 'bullmq';

const worker = new Worker(
  'queueName',
  async (job: Job) => {
    // Process job asynchronously
    const result = await performAsyncOperation(job.data);
    return result;
  },
  {
    connection: {
      host: 'localhost',
      port: 6379,
    },
    concurrency: 50,
  },
);

How It Works

When concurrency is set to a value greater than 1, the worker will:
  1. Fetch multiple jobs from the queue (up to the concurrency limit)
  2. Process them concurrently using Node.js’s event loop
  3. Maintain “at-least-once” delivery guarantees
  4. Preserve the order of job fetching (not processing)
// This worker can process 10 jobs at the same time
const worker = new Worker(
  'emailQueue',
  async (job: Job) => {
    // Send email asynchronously
    await emailService.send(job.data);
    return { sent: true, timestamp: Date.now() };
  },
  { concurrency: 10 },
);
Concurrency is most effective for I/O-bound operations like database queries, HTTP requests, or file system operations. Node.js handles these efficiently through its event loop.

Dynamic Concurrency Adjustment

You can adjust the concurrency value while the worker is running:
const worker = new Worker('queueName', processorFunction, {
  concurrency: 5,
});

// Later, increase concurrency during peak hours
worker.concurrency = 20;

// Reduce during low-traffic periods
worker.concurrency = 2;

Use Case: Adaptive Scaling

import { Worker } from 'bullmq';
import os from 'os';

const worker = new Worker('queueName', processorFunction, {
  concurrency: 10,
});

// Adjust based on CPU usage
setInterval(() => {
  const cpuLoad = os.loadavg()[0] / os.cpus().length;
  
  if (cpuLoad > 0.8) {
    // High load: reduce concurrency
    worker.concurrency = Math.max(1, worker.concurrency - 5);
    console.log('Reduced concurrency to:', worker.concurrency);
  } else if (cpuLoad < 0.3) {
    // Low load: increase concurrency
    worker.concurrency = Math.min(50, worker.concurrency + 5);
    console.log('Increased concurrency to:', worker.concurrency);
  }
}, 60000); // Check every minute

CPU-Intensive Work: Use Sandboxing

For CPU-intensive operations that block the event loop, use Sandboxed Processors instead of increasing concurrency. High concurrency with blocking operations can lead to stalled jobs.
When to use concurrency vs sandboxing:
// ✅ Good: I/O-bound with high concurrency
const ioWorker = new Worker(
  'apiQueue',
  async (job: Job) => {
    // Non-blocking async operations
    const response = await fetch(job.data.url);
    await database.save(response.data);
    return response.data;
  },
  { concurrency: 50 }, // Safe to run many in parallel
);

// ❌ Bad: CPU-bound with high concurrency
const cpuWorker = new Worker(
  'imageQueue',
  async (job: Job) => {
    // Blocks the event loop!
    const processed = heavyImageProcessing(job.data.image);
    return processed;
  },
  { concurrency: 50 }, // Will cause stalled jobs!
);

// ✅ Good: CPU-bound with sandboxing
const sandboxedWorker = new Worker(
  'imageQueue',
  path.join(__dirname, 'image-processor.js'), // External file
  { concurrency: os.cpus().length }, // Match CPU cores
);

Multiple Worker Instances

The recommended approach for production is running multiple worker processes:

Single Machine, Multiple Processes

// worker.js
import { Worker } from 'bullmq';
import cluster from 'cluster';
import os from 'os';

if (cluster.isPrimary) {
  const numCPUs = os.cpus().length;
  console.log(`Starting ${numCPUs} worker processes`);
  
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  
  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died, starting a new one`);
    cluster.fork();
  });
} else {
  // Each process runs a worker with concurrency
  const worker = new Worker(
    'queueName',
    processorFunction,
    {
      connection: { host: 'localhost', port: 6379 },
      concurrency: 5, // Each process handles 5 jobs concurrently
    },
  );
  
  console.log(`Worker process ${process.pid} started`);
}

Multiple Machines

Deploy the same worker code across multiple servers:
// worker.js - Deploy this to multiple servers
import { Worker } from 'bullmq';

const worker = new Worker(
  'queueName',
  processorFunction,
  {
    connection: {
      host: process.env.REDIS_HOST,
      port: parseInt(process.env.REDIS_PORT || '6379'),
    },
    concurrency: 10,
  },
);

worker.on('ready', () => {
  console.log(`Worker on ${require('os').hostname()} is ready`);
});
Benefits of multiple workers:
  • High availability: If one worker crashes, others continue processing
  • Load distribution: Work is distributed across multiple machines
  • Resource utilization: Each machine’s CPU/memory is used efficiently
  • Scalability: Add more workers as job volume increases

Combining Strategies

For optimal performance, combine multiple workers with concurrency:
// Production setup: 4 servers × 4 processes × concurrency of 5
// = 80 concurrent job executions

if (cluster.isPrimary) {
  for (let i = 0; i < 4; i++) {
    cluster.fork();
  }
} else {
  const worker = new Worker('queueName', processorFunction, {
    connection: redisConnection,
    concurrency: 5,
  });
}

Concurrency vs Global Concurrency

Worker concurrency controls how many jobs a single worker processes simultaneously. For limiting jobs across all workers, see Global Concurrency.
// Worker concurrency: This worker processes 10 jobs at once
const worker1 = new Worker('queue', processor, { concurrency: 10 });
const worker2 = new Worker('queue', processor, { concurrency: 10 });
// Total: Up to 20 jobs processing simultaneously

// Global concurrency: Maximum 1 job across ALL workers
const queue = new Queue('queue', {
  connection,
  defaultJobOptions: {
    group: {
      id: 'global-limit',
      concurrency: 1, // Only 1 job from this group at a time
    },
  },
});

Monitoring Concurrency

Track active job count:
import { Queue } from 'bullmq';

const queue = new Queue('queueName');

setInterval(async () => {
  const activeCount = await queue.getActiveCount();
  const waitingCount = await queue.getWaitingCount();
  
  console.log(`Active: ${activeCount}, Waiting: ${waitingCount}`);
  
  if (waitingCount > 1000 && activeCount < 50) {
    console.log('Consider increasing concurrency or adding more workers');
  }
}, 10000);

Best Practices

1

Start with conservative values

Begin with low concurrency (5-10) and increase based on monitoring.
2

Match workload type

  • I/O-bound: High concurrency (20-100)
  • CPU-bound: Use sandboxing with concurrency = CPU cores
  • Mixed: Medium concurrency (10-20) + sandboxing for heavy tasks
3

Monitor resource usage

Watch CPU, memory, and Redis connection count. Adjust concurrency if limits are hit.
4

Use multiple workers in production

Don’t rely on a single worker process. Distribute across multiple processes/machines.
5

Set appropriate lockDuration

Higher concurrency may need longer locks:
{ concurrency: 50, lockDuration: 60000 }

Sandboxed Processors

Isolate CPU-intensive work

Stalled Jobs

Understand and prevent stalled jobs

Global Concurrency

Limit jobs across all workers

Rate Limiting

Control job processing rate

API Reference

Build docs developers (and LLMs) love