Skip to main content
Workers are the core processing units in BullMQ that consume jobs from queues and execute your business logic. A worker is equivalent to a message receiver in traditional message queue systems.

Basic Worker Creation

Create a worker by instantiating the Worker class with a queue name and a processor function:
import { Worker, Job } from 'bullmq';

const worker = new Worker('queueName', async (job: Job) => {
  // Process the job
  console.log('Processing job:', job.id);
  console.log('Job data:', job.data);
  
  // Optionally report progress
  await job.updateProgress(42);
  
  // Return a value that will be stored with the job
  return { status: 'completed', result: 'some value' };
});

Processor Function Signature

The processor function receives up to three parameters:
type Processor<T = any, R = any, N extends string = string> = (
  job: Job<T, R, N>,
  token?: string,
  signal?: AbortSignal,
) => Promise<R>;
  • job: The job instance containing data and methods
  • token: Optional lock token for the job
  • signal: Optional AbortSignal for job cancellation

Example with All Parameters

const worker = new Worker(
  'queueName',
  async (job: Job, token?: string, signal?: AbortSignal) => {
    // Access job data
    const { userId, action } = job.data;
    
    // Check for cancellation
    if (signal?.aborted) {
      throw new Error('Job was cancelled');
    }
    
    // Process with cancellation support
    signal?.addEventListener('abort', () => {
      console.log('Cancellation requested');
    });
    
    // Do work
    const result = await processUserAction(userId, action);
    
    return result;
  },
);

Worker Options

Configure worker behavior with options:
import { Worker, WorkerOptions } from 'bullmq';

const options: WorkerOptions = {
  connection: {
    host: 'localhost',
    port: 6379,
  },
  concurrency: 5,
  lockDuration: 30000,
  maxStalledCount: 1,
  stalledInterval: 30000,
  autorun: true,
  removeOnComplete: { count: 1000 },
  removeOnFail: { count: 5000 },
};

const worker = new Worker('queueName', processorFunction, options);

Key Options

concurrency
number
default:"1"
Number of jobs that can be processed in parallel. See Concurrency.
autorun
boolean
default:"true"
Whether to start processing jobs immediately upon worker creation.
lockDuration
number
default:"30000"
Duration in milliseconds that a worker holds a lock on a job.
maxStalledCount
number
default:"1"
Maximum times a job can be recovered from stalled state before moving to failed.
stalledInterval
number
default:"30000"
Interval in milliseconds between stalled job checks.
removeOnComplete
KeepJobs
Automatically remove completed jobs. See Auto-removal.
removeOnFail
KeepJobs
Automatically remove failed jobs. See Auto-removal.

Manual Worker Control

Control when the worker starts processing:
const worker = new Worker(
  'queueName',
  async (job: Job) => {
    return await processJob(job);
  },
  { autorun: false },
);

// Start processing when ready
await worker.run();

Worker Events

Listen to worker events to monitor job processing:
// Job becomes active
worker.on('active', (job: Job) => {
  console.log(`Job ${job.id} is now active`);
});

// Job completes successfully
worker.on('completed', (job: Job, returnvalue: any) => {
  console.log(`Job ${job.id} completed with result:`, returnvalue);
});

// Job fails
worker.on('failed', (job: Job | undefined, error: Error) => {
  console.error(`Job ${job?.id} failed:`, error.message);
});

// Job reports progress
worker.on('progress', (job: Job, progress: number | object) => {
  console.log(`Job ${job.id} progress:`, progress);
});

// Worker errors (important!)
worker.on('error', (err: Error) => {
  console.error('Worker error:', err);
});

// Queue drained (no more jobs)
worker.on('drained', () => {
  console.log('Queue is empty');
});

// Worker paused/resumed
worker.on('paused', () => console.log('Worker paused'));
worker.on('resumed', () => console.log('Worker resumed'));

// Worker closing/closed
worker.on('closing', (msg: string) => console.log('Worker closing:', msg));
worker.on('closed', () => console.log('Worker closed'));
Always attach an error listener to prevent uncaught exceptions from crashing your application.

Progress Reporting

Report job progress from within the processor:
const worker = new Worker('queueName', async (job: Job) => {
  const items = job.data.items;
  
  for (let i = 0; i < items.length; i++) {
    await processItem(items[i]);
    
    // Report percentage
    await job.updateProgress(((i + 1) / items.length) * 100);
    
    // Or report detailed object
    await job.updateProgress({
      processed: i + 1,
      total: items.length,
      currentItem: items[i].id,
    });
  }
  
  return { totalProcessed: items.length };
});

TypeScript Generics

Define types for job data and return values:
interface MyJobData {
  userId: string;
  action: string;
  metadata: Record<string, any>;
}

interface MyJobResult {
  status: 'success' | 'failure';
  message: string;
  timestamp: number;
}

const worker = new Worker<MyJobData, MyJobResult>(
  'queueName',
  async (job: Job<MyJobData, MyJobResult>) => {
    const { userId, action } = job.data; // Typed!
    
    // Process...
    
    return {
      status: 'success',
      message: `Processed ${action} for ${userId}`,
      timestamp: Date.now(),
    }; // Return type is checked!
  },
);

Waiting Until Ready

Wait for the worker’s Redis connection to be ready:
const worker = new Worker('queueName', processorFunction);

// Useful in tests
await worker.waitUntilReady();
console.log('Worker is ready to process jobs');

Graceful Shutdown

Properly close the worker:
// Wait for current jobs to complete
await worker.close();

// Force immediate shutdown
await worker.close(true);
See Graceful Shutdown for more details.

Global Job Events

Monitor jobs across all workers using QueueEvents:
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('queueName');

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed by any worker`);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.log(`Job ${jobId} failed:`, failedReason);
});

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress:`, data);
});

Best Practices

1

Always handle errors

Attach an error listener to prevent crashes:
worker.on('error', err => console.error(err));
2

Keep processors async

Use async/await or return promises to avoid blocking the event loop.
3

Report progress for long jobs

Help monitor long-running jobs by calling job.updateProgress().
4

Handle shutdown gracefully

Call worker.close() on process termination:
process.on('SIGTERM', async () => {
  await worker.close();
  process.exit(0);
});
5

Use sandboxing for CPU-intensive work

See Sandboxed Processors for CPU-heavy tasks.

Concurrency

Process multiple jobs in parallel

Sandboxed Processors

Isolate CPU-intensive processors

Graceful Shutdown

Properly close workers

Cancelling Jobs

Cancel jobs in progress

API Reference

Build docs developers (and LLMs) love