Workers are the workhorses of BullMQ that pick up jobs from queues and process them. Each worker runs a processor function that handles the job’s business logic.
import { Worker } from 'bullmq';const worker = new Worker('myQueue', async (job) => { // Process the job console.log('Processing job:', job.id); console.log('Job data:', job.data); // Return a result return { status: 'completed', processedAt: Date.now() };}, { connection: { host: 'localhost', port: 6379, },});
Workers automatically start processing jobs when instantiated. Set autorun: false to control when processing begins.
const worker = new Worker('longTask', async (job, token, signal) => { // Check if job was cancelled if (signal?.aborted) { throw new Error('Job was cancelled'); } // Listen for cancellation signal?.addEventListener('abort', () => { console.log('Job cancelled, cleaning up...'); }); // Perform work with periodic checks for (let i = 0; i < 100; i++) { if (signal?.aborted) break; await processChunk(i); } return { completed: !signal?.aborted };});// Cancel a specific jobworker.cancelJob('job-123', 'User requested cancellation');// Cancel all jobsworker.cancelAllJobs('Server shutting down');
const worker = new Worker('parallelTasks', async (job) => { // Process job return await processTask(job.data);}, { connection: { host: 'localhost', port: 6379 }, concurrency: 5, // Process up to 5 jobs at once});// Update concurrency at runtimeworker.concurrency = 10;
Each worker can process multiple jobs concurrently. Scale horizontally by running multiple worker instances.
// Pause the workerawait worker.pause();// Pause without waiting for current jobs to finishawait worker.pause(true);// Check if pausedif (worker.isPaused()) { console.log('Worker is paused');}// Resume processingworker.resume();
Pause all workers for this queue:
import { Queue } from 'bullmq';const queue = new Queue('myQueue', { connection: { ... } });// Pause globally - affects all workersawait queue.pause();// Resume globallyawait queue.resume();
const worker = new Worker('myQueue', async (job) => { // Process job});// Graceful shutdownprocess.on('SIGTERM', async () => { console.log('Shutting down gracefully...'); // Wait for current jobs to complete await worker.close(); console.log('Worker closed'); process.exit(0);});// Force shutdownprocess.on('SIGINT', async () => { console.log('Force closing...'); // Don't wait for jobs to complete await worker.close(true); process.exit(0);});