Skip to main content

Overview

BullMQ provides a comprehensive event system to monitor job lifecycle and queue operations. Events can be listened to at different levels: Queue, Worker, and QueueEvents.

Event Types

Queue Events (Local)

These events are emitted by the Queue instance that performs the operation:
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', {
  connection: { host: 'localhost', port: 6379 },
});

queue.on('waiting', (job) => {
  console.log(`Job ${job.id} is waiting`);
});

queue.on('progress', (jobId, progress) => {
  console.log(`Job ${jobId} progress: ${progress}`);
});

queue.on('removed', (jobId) => {
  console.log(`Job ${jobId} was removed`);
});

queue.on('cleaned', (jobs, type) => {
  console.log(`Cleaned ${jobs.length} ${type} jobs`);
});

Worker Events (Local)

These events are emitted by the Worker instance processing jobs:
import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  // Process job
  return { status: 'done' };
}, {
  connection: { host: 'localhost', port: 6379 },
});

worker.on('active', (job) => {
  console.log(`Job ${job.id} is now active`);
});

worker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with:`, result);
});

worker.on('failed', (job, error) => {
  console.error(`Job ${job?.id} failed:`, error.message);
});

worker.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress:`, progress);
});

Global Events (QueueEvents)

QueueEvents listens to events from all workers and queues for a given queue name:
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('myQueue', {
  connection: { host: 'localhost', port: 6379 },
});

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed globally`);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.log(`Job ${jobId} failed: ${failedReason}`);
});

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress:`, data);
});
QueueEvents requires a dedicated Redis connection and uses blocking operations to listen for events in real-time.

Queue Event Reference

waiting

Emitted when a job is added to the queue:
queue.on('waiting', (job: Job) => {
  console.log(`Job ${job.id} added to queue`);
  console.log('Job name:', job.name);
  console.log('Job data:', job.data);
});

progress

Emitted when a job’s progress is updated:
queue.on('progress', (jobId: string, progress: number | object) => {
  if (typeof progress === 'number') {
    console.log(`Job ${jobId}: ${progress}%`);
  } else {
    console.log(`Job ${jobId} progress:`, progress);
  }
});

removed

Emitted when a job is removed:
queue.on('removed', (jobId: string) => {
  console.log(`Job ${jobId} removed from queue`);
});

cleaned

Emitted when jobs are cleaned:
queue.on('cleaned', (jobIds: string[], type: string) => {
  console.log(`Cleaned ${jobIds.length} ${type} jobs`);
});

paused / resumed

Emitted when the queue is paused or resumed:
queue.on('paused', () => {
  console.log('Queue paused');
});

queue.on('resumed', () => {
  console.log('Queue resumed');
});

error

Emitted on errors:
queue.on('error', (error: Error) => {
  console.error('Queue error:', error);
});

Worker Event Reference

active

Emitted when a job starts processing:
worker.on('active', (job: Job, prev: string) => {
  console.log(`Job ${job.id} is now active`);
  console.log('Previous state:', prev);
});

completed

Emitted when a job completes successfully:
worker.on('completed', (job: Job, result: any, prev: string) => {
  console.log(`Job ${job.id} completed`);
  console.log('Result:', result);
  console.log('Previous state:', prev);
});

failed

Emitted when a job fails:
worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
  if (job) {
    console.error(`Job ${job.id} failed:`, error.message);
    console.log('Attempts made:', job.attemptsMade);
    console.log('Previous state:', prev);
  } else {
    console.error('Job failed and was removed:', error.message);
  }
});
The job parameter can be undefined if the job reached the stalled limit and was removed by the removeOnFail option.

progress

Emitted when a job reports progress:
worker.on('progress', (job: Job, progress: number | object) => {
  console.log(`Job ${job.id} progress:`, progress);
});

drained

Emitted when the queue has no jobs waiting:
worker.on('drained', () => {
  console.log('Queue is empty, waiting for jobs...');
});

stalled

Emitted when a job is detected as stalled:
worker.on('stalled', (jobId: string, prev: string) => {
  console.log(`Job ${jobId} stalled`);
  console.log('Previous state:', prev);
});

paused / resumed

Emitted when the worker is paused or resumed:
worker.on('paused', () => {
  console.log('Worker paused');
});

worker.on('resumed', () => {
  console.log('Worker resumed');
});

ready

Emitted when the worker’s blocking connection is ready:
worker.on('ready', () => {
  console.log('Worker is ready');
});

closing / closed

Emitted during worker shutdown:
worker.on('closing', (msg: string) => {
  console.log('Worker is closing:', msg);
});

worker.on('closed', () => {
  console.log('Worker closed');
});

error

Always add an error listener to prevent unhandled exceptions!
worker.on('error', (error: Error) => {
  console.error('Worker error:', error);
});

QueueEvents Reference

added

Emitted when a job is created:
queueEvents.on('added', ({ jobId, name }, id) => {
  console.log(`Job ${jobId} (${name}) was added`);
  console.log('Event ID:', id);
});

active

Emitted when a job becomes active:
queueEvents.on('active', ({ jobId, prev }, id) => {
  console.log(`Job ${jobId} is now active`);
  console.log('Previous state:', prev);
});

completed

Emitted when a job completes:
queueEvents.on('completed', ({ jobId, returnvalue, prev }, id) => {
  console.log(`Job ${jobId} completed`);
  console.log('Return value:', returnvalue);
  console.log('Previous state:', prev);
});

failed

Emitted when a job fails:
queueEvents.on('failed', ({ jobId, failedReason, prev }, id) => {
  console.log(`Job ${jobId} failed`);
  console.log('Reason:', failedReason);
  console.log('Previous state:', prev);
});

progress

Emitted on progress updates:
queueEvents.on('progress', ({ jobId, data }, id) => {
  console.log(`Job ${jobId} progress:`, data);
});

delayed

Emitted when a job is delayed:
queueEvents.on('delayed', ({ jobId, delay }, id) => {
  console.log(`Job ${jobId} delayed by ${delay}ms`);
});

removed

Emitted when a job is removed:
queueEvents.on('removed', ({ jobId, prev }, id) => {
  console.log(`Job ${jobId} removed`);
  console.log('Previous state:', prev);
});

waiting

Emitted when a job enters waiting state:
queueEvents.on('waiting', ({ jobId, prev }, id) => {
  console.log(`Job ${jobId} is waiting`);
  console.log('Previous state:', prev);
});

waiting-children

Emitted when a job is waiting for children:
queueEvents.on('waiting-children', ({ jobId }, id) => {
  console.log(`Job ${jobId} waiting for child jobs`);
});

stalled

Emitted when a job stalls:
queueEvents.on('stalled', ({ jobId }, id) => {
  console.log(`Job ${jobId} has stalled`);
});

retries-exhausted

Emitted when a job exhausts all retry attempts:
queueEvents.on('retries-exhausted', ({ jobId, attemptsMade }, id) => {
  console.log(`Job ${jobId} exhausted ${attemptsMade} retry attempts`);
});

deduplicated

Emitted when a job is rejected due to deduplication:
queueEvents.on('deduplicated', ({ jobId, deduplicationId, deduplicatedJobId }, id) => {
  console.log(`Job ${jobId} was deduplicated`);
  console.log('Deduplication ID:', deduplicationId);
  console.log('Existing job ID:', deduplicatedJobId);
});

duplicated

Emitted when a job with the same ID already exists:
queueEvents.on('duplicated', ({ jobId }, id) => {
  console.log(`Job ${jobId} already exists`);
});

drained

Emitted when the waiting list is emptied:
queueEvents.on('drained', (id) => {
  console.log('Queue has been drained');
});

paused / resumed

queueEvents.on('paused', (args, id) => {
  console.log('Queue paused globally');
});

queueEvents.on('resumed', (args, id) => {
  console.log('Queue resumed globally');
});

cleaned

Emitted when jobs are cleaned:
queueEvents.on('cleaned', ({ count }, id) => {
  console.log(`Cleaned ${count} jobs`);
});

Event Comparison

Scope: Local to the Queue instanceUse case: Monitor operations performed by this specific queueConnection: Uses the queue’s main connectionEvents: waiting, progress, removed, cleaned, paused, resumed

Job-Specific Events

Listen to events for a specific job:
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('myQueue');

const job = await queue.add('task', { data: 'value' });

// Listen to events for this specific job
queueEvents.on(`completed:${job.id}`, ({ returnvalue }) => {
  console.log('Job completed with:', returnvalue);
});

queueEvents.on(`failed:${job.id}`, ({ failedReason }) => {
  console.error('Job failed:', failedReason);
});

queueEvents.on(`progress:${job.id}`, ({ data }) => {
  console.log('Job progress:', data);
});

Event Resumption

Resume listening from a specific event ID:
const queueEvents = new QueueEvents('myQueue', {
  connection: { host: 'localhost', port: 6379 },
  lastEventId: '1234567890-0',  // Resume from this event
});

TypeScript Support

Type-safe event listeners:
import { Worker, Job } from 'bullmq';

interface TaskData {
  userId: string;
  action: string;
}

interface TaskResult {
  success: boolean;
  timestamp: number;
}

const worker = new Worker<TaskData, TaskResult>('tasks', async (job) => {
  return {
    success: true,
    timestamp: Date.now(),
  };
});

// Type-safe event handlers
worker.on('completed', (job: Job<TaskData, TaskResult>, result: TaskResult) => {
  console.log(`User ${job.data.userId} task completed`);
  console.log('Success:', result.success);
});

Event Best Practices

Always Handle Errors

Add error listeners to Queue, Worker, and QueueEvents to prevent crashes.

Use QueueEvents for Monitoring

Use QueueEvents for centralized monitoring across all workers.

Clean Up Listeners

Remove event listeners when no longer needed to prevent memory leaks.

Don't Block Event Handlers

Keep event handlers fast. For heavy operations, queue another job.

Example: Complete Event Monitoring

import { Queue, Worker, QueueEvents } from 'bullmq';

const connection = { host: 'localhost', port: 6379 };

// Create queue
const queue = new Queue('monitoring-demo', { connection });

// Create worker
const worker = new Worker('monitoring-demo', async (job) => {
  await job.updateProgress(25);
  await delay(1000);
  
  await job.updateProgress(50);
  await delay(1000);
  
  await job.updateProgress(75);
  await delay(1000);
  
  return { processed: true };
}, { connection });

// Create global event listener
const queueEvents = new QueueEvents('monitoring-demo', { connection });

// Queue events (local)
queue.on('waiting', (job) => {
  console.log('[Queue] Job waiting:', job.id);
});

// Worker events (local)
worker.on('active', (job) => {
  console.log('[Worker] Job active:', job.id);
});

worker.on('progress', (job, progress) => {
  console.log('[Worker] Job progress:', job.id, progress);
});

worker.on('completed', (job, result) => {
  console.log('[Worker] Job completed:', job.id, result);
});

worker.on('failed', (job, error) => {
  console.error('[Worker] Job failed:', job?.id, error.message);
});

worker.on('error', (err) => {
  console.error('[Worker] Error:', err);
});

// Global events
queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log('[Global] Job completed:', jobId, returnvalue);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.error('[Global] Job failed:', jobId, failedReason);
});

queueEvents.on('progress', ({ jobId, data }) => {
  console.log('[Global] Job progress:', jobId, data);
});

// Add a job
await queue.add('demo-job', { data: 'test' });

// Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.close();
  await queueEvents.close();
  await queue.close();
});

Next Steps

Workers

Learn how to process jobs with workers

Jobs

Understand job lifecycle and manipulation

Queues

Explore queue operations

Build docs developers (and LLMs) love