Skip to main content

Overview

Workers are the workhorses of BullMQ that pick up jobs from queues and process them. Each worker runs a processor function that handles the job’s business logic.
import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  // Process the job
  console.log('Processing job:', job.id);
  console.log('Job data:', job.data);
  
  // Return a result
  return { status: 'completed', processedAt: Date.now() };
}, {
  connection: {
    host: 'localhost',
    port: 6379,
  },
});
Workers automatically start processing jobs when instantiated. Set autorun: false to control when processing begins.

Processor Function

The processor function is called for each job and must return a promise:

Basic Processor

const worker = new Worker('emailQueue', async (job) => {
  const { to, subject, body } = job.data;
  
  // Send email
  await sendEmail(to, subject, body);
  
  // Return result
  return { sent: true, messageId: 'msg-123' };
});

Processor with Progress

Report progress during long-running jobs:
const worker = new Worker('processVideo', async (job) => {
  const { videoUrl } = job.data;
  
  // Report progress as percentage
  await job.updateProgress(10);
  
  const video = await downloadVideo(videoUrl);
  await job.updateProgress(50);
  
  const processed = await processVideo(video);
  await job.updateProgress(90);
  
  await uploadVideo(processed);
  await job.updateProgress(100);
  
  return { status: 'completed' };
});

Processor with Cancellation Support

Handle job cancellation gracefully:
const worker = new Worker('longTask', async (job, token, signal) => {
  // Check if job was cancelled
  if (signal?.aborted) {
    throw new Error('Job was cancelled');
  }
  
  // Listen for cancellation
  signal?.addEventListener('abort', () => {
    console.log('Job cancelled, cleaning up...');
  });
  
  // Perform work with periodic checks
  for (let i = 0; i < 100; i++) {
    if (signal?.aborted) break;
    await processChunk(i);
  }
  
  return { completed: !signal?.aborted };
});

// Cancel a specific job
worker.cancelJob('job-123', 'User requested cancellation');

// Cancel all jobs
worker.cancelAllJobs('Server shutting down');

Worker Options

WorkerOptions Interface

interface WorkerOptions {
  // Processing
  concurrency?: number;           // Jobs to process in parallel (default: 1)
  autorun?: boolean;              // Start processing immediately (default: true)
  
  // Identity
  name?: string;                  // Worker name for monitoring
  
  // Timing
  lockDuration?: number;          // Job lock duration in ms (default: 30000)
  lockRenewTime?: number;         // Lock renewal interval (default: lockDuration/2)
  stalledInterval?: number;       // Check for stalled jobs (default: 30000)
  drainDelay?: number;            // Polling interval when queue is empty (default: 5)
  
  // Retry and failure handling
  maxStalledCount?: number;       // Max times job can be recovered (default: 1)
  maxStartedAttempts?: number;    // Max times job can start processing
  
  // Rate limiting
  limiter?: RateLimiterOptions;   // Worker rate limiter
  maximumRateLimitDelay?: number; // Max rate limit delay (default: 30000)
  
  // Cleanup
  removeOnComplete?: KeepJobs;    // Auto-remove completed jobs
  removeOnFail?: KeepJobs;        // Auto-remove failed jobs
  
  // Advanced
  skipStalledCheck?: boolean;     // Disable stalled job check (default: false)
  skipLockRenewal?: boolean;      // Disable lock renewal (default: false)
  
  // Sandboxing
  useWorkerThreads?: boolean;     // Use worker threads instead of processes
}

Concurrency

Process multiple jobs simultaneously:
const worker = new Worker('parallelTasks', async (job) => {
  // Process job
  return await processTask(job.data);
}, {
  connection: { host: 'localhost', port: 6379 },
  concurrency: 5,  // Process up to 5 jobs at once
});

// Update concurrency at runtime
worker.concurrency = 10;
Each worker can process multiple jobs concurrently. Scale horizontally by running multiple worker instances.

Worker Events

Job Lifecycle Events

worker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

Worker State Events

worker.on('ready', () => {
  console.log('Worker is ready and connected to Redis');
});

worker.on('drained', () => {
  console.log('Queue is empty, waiting for jobs');
});

worker.on('paused', () => {
  console.log('Worker is paused');
});

worker.on('resumed', () => {
  console.log('Worker resumed processing');
});

worker.on('closing', (msg) => {
  console.log('Worker is closing:', msg);
});

worker.on('closed', () => {
  console.log('Worker closed');
});

worker.on('error', (err) => {
  console.error('Worker error:', err);
});
Always add an ‘error’ listener to prevent unhandled exceptions that could crash your worker.

Stalled Jobs

worker.on('stalled', (jobId) => {
  console.log(`Job ${jobId} stalled and was moved back to waiting`);
});

Pausing and Resuming

Pause only this worker instance:
// Pause the worker
await worker.pause();

// Pause without waiting for current jobs to finish
await worker.pause(true);

// Check if paused
if (worker.isPaused()) {
  console.log('Worker is paused');
}

// Resume processing
worker.resume();

Rate Limiting

Worker-Level Rate Limiting

const worker = new Worker('apiCalls', async (job) => {
  return await callExternalAPI(job.data);
}, {
  limiter: {
    max: 10,        // Maximum 10 jobs
    duration: 1000, // per 1 second
  },
});

Manual Rate Limiting

Manually trigger rate limiting from within a processor:
import { Worker, RateLimitError } from 'bullmq';

const worker = new Worker('apiCalls', async (job) => {
  try {
    return await callAPI(job.data);
  } catch (err) {
    if (err.statusCode === 429) {
      // Rate limited by API, delay this job
      throw Worker.RateLimitError();
    }
    throw err;
  }
});

Graceful Shutdown

Properly close workers to avoid job loss:
const worker = new Worker('myQueue', async (job) => {
  // Process job
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully...');
  
  // Wait for current jobs to complete
  await worker.close();
  
  console.log('Worker closed');
  process.exit(0);
});

// Force shutdown
process.on('SIGINT', async () => {
  console.log('Force closing...');
  
  // Don't wait for jobs to complete
  await worker.close(true);
  
  process.exit(0);
});

Sandboxed Processors

Run processors in separate processes or worker threads:
import { Worker } from 'bullmq';

// Processor in separate file: processor.js
const worker = new Worker('myQueue', './processor.js', {
  connection: { host: 'localhost', port: 6379 },
});

Stalled Jobs

BullMQ automatically recovers stalled jobs:
const worker = new Worker('myQueue', async (job) => {
  // Process job
}, {
  stalledInterval: 30000,    // Check every 30 seconds
  maxStalledCount: 1,        // Move to failed after 1 stall
  lockDuration: 30000,       // Job must complete within 30 seconds
});
A job is considered stalled when its lock expires without being renewed. This usually indicates a crashed worker or hung processor.

Manual Job Fetching

Fetch jobs manually for advanced use cases:
const worker = new Worker('myQueue', async (job) => {
  // Processor function
}, {
  autorun: false,  // Don't start automatically
});

// Start stalled checker
await worker.startStalledCheckTimer();

// Manually fetch and process jobs
while (true) {
  const job = await worker.getNextJob('my-token');
  
  if (job) {
    try {
      const result = await processJob(job);
      await job.moveToCompleted(result, 'my-token', false);
    } catch (err) {
      await job.moveToFailed(err, 'my-token', false);
    }
  } else {
    // No jobs available
    await delay(5000);
  }
}

TypeScript Support

Type-safe workers with generics:
interface TaskData {
  userId: string;
  action: string;
}

interface TaskResult {
  success: boolean;
  timestamp: number;
}

type TaskName = 'notify' | 'email' | 'sms';

const worker = new Worker<TaskData, TaskResult, TaskName>(
  'tasks',
  async (job) => {
    // job.data is typed as TaskData
    // job.name is typed as TaskName
    console.log(`Processing ${job.name} for user ${job.data.userId}`);
    
    // Return type is TaskResult
    return {
      success: true,
      timestamp: Date.now(),
    };
  }
);

Best Practices

Handle Errors

Always add error listeners and handle exceptions gracefully in your processor.

Use Concurrency Wisely

Set concurrency based on your workload. CPU-bound tasks should use lower concurrency.

Monitor Progress

Update progress for long-running jobs to provide visibility.

Graceful Shutdown

Always close workers properly to avoid losing in-progress jobs.

Set Lock Duration

Configure lockDuration based on expected job duration plus buffer time.

Clean Up Resources

Use removeOnComplete and removeOnFail to prevent Redis memory bloat.

Next Steps

Jobs

Learn about job lifecycle and manipulation

Events

Monitor worker activity with events

Queues

Understand queue operations

Build docs developers (and LLMs) love