Overview
BullMQ provides a comprehensive event system to monitor job lifecycle and queue operations. Events can be listened to at different levels: Queue, Worker, and QueueEvents.
Event Types
Queue Events (Local)
These events are emitted by the Queue instance that performs the operation:
import { Queue } from 'bullmq' ;
const queue = new Queue ( 'myQueue' , {
connection: { host: 'localhost' , port: 6379 },
});
queue . on ( 'waiting' , ( job ) => {
console . log ( `Job ${ job . id } is waiting` );
});
queue . on ( 'progress' , ( jobId , progress ) => {
console . log ( `Job ${ jobId } progress: ${ progress } ` );
});
queue . on ( 'removed' , ( jobId ) => {
console . log ( `Job ${ jobId } was removed` );
});
queue . on ( 'cleaned' , ( jobs , type ) => {
console . log ( `Cleaned ${ jobs . length } ${ type } jobs` );
});
Worker Events (Local)
These events are emitted by the Worker instance processing jobs:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'myQueue' , async ( job ) => {
// Process job
return { status: 'done' };
}, {
connection: { host: 'localhost' , port: 6379 },
});
worker . on ( 'active' , ( job ) => {
console . log ( `Job ${ job . id } is now active` );
});
worker . on ( 'completed' , ( job , result ) => {
console . log ( `Job ${ job . id } completed with:` , result );
});
worker . on ( 'failed' , ( job , error ) => {
console . error ( `Job ${ job ?. id } failed:` , error . message );
});
worker . on ( 'progress' , ( job , progress ) => {
console . log ( `Job ${ job . id } progress:` , progress );
});
Global Events (QueueEvents)
QueueEvents listens to events from all workers and queues for a given queue name:
import { QueueEvents } from 'bullmq' ;
const queueEvents = new QueueEvents ( 'myQueue' , {
connection: { host: 'localhost' , port: 6379 },
});
queueEvents . on ( 'completed' , ({ jobId , returnvalue }) => {
console . log ( `Job ${ jobId } completed globally` );
});
queueEvents . on ( 'failed' , ({ jobId , failedReason }) => {
console . log ( `Job ${ jobId } failed: ${ failedReason } ` );
});
queueEvents . on ( 'progress' , ({ jobId , data }) => {
console . log ( `Job ${ jobId } progress:` , data );
});
QueueEvents requires a dedicated Redis connection and uses blocking operations to listen for events in real-time.
Queue Event Reference
waiting
Emitted when a job is added to the queue:
queue . on ( 'waiting' , ( job : Job ) => {
console . log ( `Job ${ job . id } added to queue` );
console . log ( 'Job name:' , job . name );
console . log ( 'Job data:' , job . data );
});
progress
Emitted when a job’s progress is updated:
queue . on ( 'progress' , ( jobId : string , progress : number | object ) => {
if ( typeof progress === 'number' ) {
console . log ( `Job ${ jobId } : ${ progress } %` );
} else {
console . log ( `Job ${ jobId } progress:` , progress );
}
});
removed
Emitted when a job is removed:
queue . on ( 'removed' , ( jobId : string ) => {
console . log ( `Job ${ jobId } removed from queue` );
});
cleaned
Emitted when jobs are cleaned:
queue . on ( 'cleaned' , ( jobIds : string [], type : string ) => {
console . log ( `Cleaned ${ jobIds . length } ${ type } jobs` );
});
paused / resumed
Emitted when the queue is paused or resumed:
queue . on ( 'paused' , () => {
console . log ( 'Queue paused' );
});
queue . on ( 'resumed' , () => {
console . log ( 'Queue resumed' );
});
error
Emitted on errors:
queue . on ( 'error' , ( error : Error ) => {
console . error ( 'Queue error:' , error );
});
Worker Event Reference
active
Emitted when a job starts processing:
worker . on ( 'active' , ( job : Job , prev : string ) => {
console . log ( `Job ${ job . id } is now active` );
console . log ( 'Previous state:' , prev );
});
completed
Emitted when a job completes successfully:
worker . on ( 'completed' , ( job : Job , result : any , prev : string ) => {
console . log ( `Job ${ job . id } completed` );
console . log ( 'Result:' , result );
console . log ( 'Previous state:' , prev );
});
failed
Emitted when a job fails:
worker . on ( 'failed' , ( job : Job | undefined , error : Error , prev : string ) => {
if ( job ) {
console . error ( `Job ${ job . id } failed:` , error . message );
console . log ( 'Attempts made:' , job . attemptsMade );
console . log ( 'Previous state:' , prev );
} else {
console . error ( 'Job failed and was removed:' , error . message );
}
});
The job parameter can be undefined if the job reached the stalled limit and was removed by the removeOnFail option.
progress
Emitted when a job reports progress:
worker . on ( 'progress' , ( job : Job , progress : number | object ) => {
console . log ( `Job ${ job . id } progress:` , progress );
});
drained
Emitted when the queue has no jobs waiting:
worker . on ( 'drained' , () => {
console . log ( 'Queue is empty, waiting for jobs...' );
});
stalled
Emitted when a job is detected as stalled:
worker . on ( 'stalled' , ( jobId : string , prev : string ) => {
console . log ( `Job ${ jobId } stalled` );
console . log ( 'Previous state:' , prev );
});
paused / resumed
Emitted when the worker is paused or resumed:
worker . on ( 'paused' , () => {
console . log ( 'Worker paused' );
});
worker . on ( 'resumed' , () => {
console . log ( 'Worker resumed' );
});
ready
Emitted when the worker’s blocking connection is ready:
worker . on ( 'ready' , () => {
console . log ( 'Worker is ready' );
});
closing / closed
Emitted during worker shutdown:
worker . on ( 'closing' , ( msg : string ) => {
console . log ( 'Worker is closing:' , msg );
});
worker . on ( 'closed' , () => {
console . log ( 'Worker closed' );
});
error
Always add an error listener to prevent unhandled exceptions!
worker . on ( 'error' , ( error : Error ) => {
console . error ( 'Worker error:' , error );
});
QueueEvents Reference
added
Emitted when a job is created:
queueEvents . on ( 'added' , ({ jobId , name }, id ) => {
console . log ( `Job ${ jobId } ( ${ name } ) was added` );
console . log ( 'Event ID:' , id );
});
active
Emitted when a job becomes active:
queueEvents . on ( 'active' , ({ jobId , prev }, id ) => {
console . log ( `Job ${ jobId } is now active` );
console . log ( 'Previous state:' , prev );
});
completed
Emitted when a job completes:
queueEvents . on ( 'completed' , ({ jobId , returnvalue , prev }, id ) => {
console . log ( `Job ${ jobId } completed` );
console . log ( 'Return value:' , returnvalue );
console . log ( 'Previous state:' , prev );
});
failed
Emitted when a job fails:
queueEvents . on ( 'failed' , ({ jobId , failedReason , prev }, id ) => {
console . log ( `Job ${ jobId } failed` );
console . log ( 'Reason:' , failedReason );
console . log ( 'Previous state:' , prev );
});
progress
Emitted on progress updates:
queueEvents . on ( 'progress' , ({ jobId , data }, id ) => {
console . log ( `Job ${ jobId } progress:` , data );
});
delayed
Emitted when a job is delayed:
queueEvents . on ( 'delayed' , ({ jobId , delay }, id ) => {
console . log ( `Job ${ jobId } delayed by ${ delay } ms` );
});
removed
Emitted when a job is removed:
queueEvents . on ( 'removed' , ({ jobId , prev }, id ) => {
console . log ( `Job ${ jobId } removed` );
console . log ( 'Previous state:' , prev );
});
waiting
Emitted when a job enters waiting state:
queueEvents . on ( 'waiting' , ({ jobId , prev }, id ) => {
console . log ( `Job ${ jobId } is waiting` );
console . log ( 'Previous state:' , prev );
});
waiting-children
Emitted when a job is waiting for children:
queueEvents . on ( 'waiting-children' , ({ jobId }, id ) => {
console . log ( `Job ${ jobId } waiting for child jobs` );
});
stalled
Emitted when a job stalls:
queueEvents . on ( 'stalled' , ({ jobId }, id ) => {
console . log ( `Job ${ jobId } has stalled` );
});
retries-exhausted
Emitted when a job exhausts all retry attempts:
queueEvents . on ( 'retries-exhausted' , ({ jobId , attemptsMade }, id ) => {
console . log ( `Job ${ jobId } exhausted ${ attemptsMade } retry attempts` );
});
deduplicated
Emitted when a job is rejected due to deduplication:
queueEvents . on ( 'deduplicated' , ({ jobId , deduplicationId , deduplicatedJobId }, id ) => {
console . log ( `Job ${ jobId } was deduplicated` );
console . log ( 'Deduplication ID:' , deduplicationId );
console . log ( 'Existing job ID:' , deduplicatedJobId );
});
duplicated
Emitted when a job with the same ID already exists:
queueEvents . on ( 'duplicated' , ({ jobId }, id ) => {
console . log ( `Job ${ jobId } already exists` );
});
drained
Emitted when the waiting list is emptied:
queueEvents . on ( 'drained' , ( id ) => {
console . log ( 'Queue has been drained' );
});
paused / resumed
queueEvents . on ( 'paused' , ( args , id ) => {
console . log ( 'Queue paused globally' );
});
queueEvents . on ( 'resumed' , ( args , id ) => {
console . log ( 'Queue resumed globally' );
});
cleaned
Emitted when jobs are cleaned:
queueEvents . on ( 'cleaned' , ({ count }, id ) => {
console . log ( `Cleaned ${ count } jobs` );
});
Event Comparison
Queue Events
Worker Events
QueueEvents
Scope : Local to the Queue instanceUse case : Monitor operations performed by this specific queueConnection : Uses the queue’s main connectionEvents : waiting, progress, removed, cleaned, paused, resumed
Scope : Local to the Worker instanceUse case : Monitor jobs processed by this specific workerConnection : Uses the worker’s connectionsEvents : active, completed, failed, progress, drained, stalled, paused, resumed
Scope : Global for the queue nameUse case : Monitor all activity across all workers and queuesConnection : Requires dedicated blocking connectionEvents : All job lifecycle events from any worker/queue
Job-Specific Events
Listen to events for a specific job:
import { QueueEvents } from 'bullmq' ;
const queueEvents = new QueueEvents ( 'myQueue' );
const job = await queue . add ( 'task' , { data: 'value' });
// Listen to events for this specific job
queueEvents . on ( `completed: ${ job . id } ` , ({ returnvalue }) => {
console . log ( 'Job completed with:' , returnvalue );
});
queueEvents . on ( `failed: ${ job . id } ` , ({ failedReason }) => {
console . error ( 'Job failed:' , failedReason );
});
queueEvents . on ( `progress: ${ job . id } ` , ({ data }) => {
console . log ( 'Job progress:' , data );
});
Event Resumption
Resume listening from a specific event ID:
const queueEvents = new QueueEvents ( 'myQueue' , {
connection: { host: 'localhost' , port: 6379 },
lastEventId: '1234567890-0' , // Resume from this event
});
TypeScript Support
Type-safe event listeners:
import { Worker , Job } from 'bullmq' ;
interface TaskData {
userId : string ;
action : string ;
}
interface TaskResult {
success : boolean ;
timestamp : number ;
}
const worker = new Worker < TaskData , TaskResult >( 'tasks' , async ( job ) => {
return {
success: true ,
timestamp: Date . now (),
};
});
// Type-safe event handlers
worker . on ( 'completed' , ( job : Job < TaskData , TaskResult >, result : TaskResult ) => {
console . log ( `User ${ job . data . userId } task completed` );
console . log ( 'Success:' , result . success );
});
Event Best Practices
Always Handle Errors Add error listeners to Queue, Worker, and QueueEvents to prevent crashes.
Use QueueEvents for Monitoring Use QueueEvents for centralized monitoring across all workers.
Clean Up Listeners Remove event listeners when no longer needed to prevent memory leaks.
Don't Block Event Handlers Keep event handlers fast. For heavy operations, queue another job.
Example: Complete Event Monitoring
import { Queue , Worker , QueueEvents } from 'bullmq' ;
const connection = { host: 'localhost' , port: 6379 };
// Create queue
const queue = new Queue ( 'monitoring-demo' , { connection });
// Create worker
const worker = new Worker ( 'monitoring-demo' , async ( job ) => {
await job . updateProgress ( 25 );
await delay ( 1000 );
await job . updateProgress ( 50 );
await delay ( 1000 );
await job . updateProgress ( 75 );
await delay ( 1000 );
return { processed: true };
}, { connection });
// Create global event listener
const queueEvents = new QueueEvents ( 'monitoring-demo' , { connection });
// Queue events (local)
queue . on ( 'waiting' , ( job ) => {
console . log ( '[Queue] Job waiting:' , job . id );
});
// Worker events (local)
worker . on ( 'active' , ( job ) => {
console . log ( '[Worker] Job active:' , job . id );
});
worker . on ( 'progress' , ( job , progress ) => {
console . log ( '[Worker] Job progress:' , job . id , progress );
});
worker . on ( 'completed' , ( job , result ) => {
console . log ( '[Worker] Job completed:' , job . id , result );
});
worker . on ( 'failed' , ( job , error ) => {
console . error ( '[Worker] Job failed:' , job ?. id , error . message );
});
worker . on ( 'error' , ( err ) => {
console . error ( '[Worker] Error:' , err );
});
// Global events
queueEvents . on ( 'completed' , ({ jobId , returnvalue }) => {
console . log ( '[Global] Job completed:' , jobId , returnvalue );
});
queueEvents . on ( 'failed' , ({ jobId , failedReason }) => {
console . error ( '[Global] Job failed:' , jobId , failedReason );
});
queueEvents . on ( 'progress' , ({ jobId , data }) => {
console . log ( '[Global] Job progress:' , jobId , data );
});
// Add a job
await queue . add ( 'demo-job' , { data: 'test' });
// Graceful shutdown
process . on ( 'SIGTERM' , async () => {
await worker . close ();
await queueEvents . close ();
await queue . close ();
});
Next Steps
Workers Learn how to process jobs with workers
Jobs Understand job lifecycle and manipulation
Queues Explore queue operations