Workers are the core processing units in BullMQ that consume jobs from queues and execute your business logic. A worker is equivalent to a message receiver in traditional message queue systems.
Basic Worker Creation
Create a worker by instantiating the Worker class with a queue name and a processor function:
import { Worker , Job } from 'bullmq' ;
const worker = new Worker ( 'queueName' , async ( job : Job ) => {
// Process the job
console . log ( 'Processing job:' , job . id );
console . log ( 'Job data:' , job . data );
// Optionally report progress
await job . updateProgress ( 42 );
// Return a value that will be stored with the job
return { status: 'completed' , result: 'some value' };
});
Processor Function Signature
The processor function receives up to three parameters:
type Processor < T = any , R = any , N extends string = string > = (
job : Job < T , R , N >,
token ?: string ,
signal ?: AbortSignal ,
) => Promise < R >;
job : The job instance containing data and methods
token : Optional lock token for the job
signal : Optional AbortSignal for job cancellation
Example with All Parameters
const worker = new Worker (
'queueName' ,
async ( job : Job , token ?: string , signal ?: AbortSignal ) => {
// Access job data
const { userId , action } = job . data ;
// Check for cancellation
if ( signal ?. aborted ) {
throw new Error ( 'Job was cancelled' );
}
// Process with cancellation support
signal ?. addEventListener ( 'abort' , () => {
console . log ( 'Cancellation requested' );
});
// Do work
const result = await processUserAction ( userId , action );
return result ;
},
);
Worker Options
Configure worker behavior with options:
import { Worker , WorkerOptions } from 'bullmq' ;
const options : WorkerOptions = {
connection: {
host: 'localhost' ,
port: 6379 ,
},
concurrency: 5 ,
lockDuration: 30000 ,
maxStalledCount: 1 ,
stalledInterval: 30000 ,
autorun: true ,
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
};
const worker = new Worker ( 'queueName' , processorFunction , options );
Key Options
Number of jobs that can be processed in parallel. See Concurrency .
Whether to start processing jobs immediately upon worker creation.
Duration in milliseconds that a worker holds a lock on a job.
Maximum times a job can be recovered from stalled state before moving to failed.
Interval in milliseconds between stalled job checks.
Manual Worker Control
Control when the worker starts processing:
const worker = new Worker (
'queueName' ,
async ( job : Job ) => {
return await processJob ( job );
},
{ autorun: false },
);
// Start processing when ready
await worker . run ();
Worker Events
Listen to worker events to monitor job processing:
// Job becomes active
worker . on ( 'active' , ( job : Job ) => {
console . log ( `Job ${ job . id } is now active` );
});
// Job completes successfully
worker . on ( 'completed' , ( job : Job , returnvalue : any ) => {
console . log ( `Job ${ job . id } completed with result:` , returnvalue );
});
// Job fails
worker . on ( 'failed' , ( job : Job | undefined , error : Error ) => {
console . error ( `Job ${ job ?. id } failed:` , error . message );
});
// Job reports progress
worker . on ( 'progress' , ( job : Job , progress : number | object ) => {
console . log ( `Job ${ job . id } progress:` , progress );
});
// Worker errors (important!)
worker . on ( 'error' , ( err : Error ) => {
console . error ( 'Worker error:' , err );
});
// Queue drained (no more jobs)
worker . on ( 'drained' , () => {
console . log ( 'Queue is empty' );
});
// Worker paused/resumed
worker . on ( 'paused' , () => console . log ( 'Worker paused' ));
worker . on ( 'resumed' , () => console . log ( 'Worker resumed' ));
// Worker closing/closed
worker . on ( 'closing' , ( msg : string ) => console . log ( 'Worker closing:' , msg ));
worker . on ( 'closed' , () => console . log ( 'Worker closed' ));
Always attach an error listener to prevent uncaught exceptions from crashing your application.
Progress Reporting
Report job progress from within the processor:
const worker = new Worker ( 'queueName' , async ( job : Job ) => {
const items = job . data . items ;
for ( let i = 0 ; i < items . length ; i ++ ) {
await processItem ( items [ i ]);
// Report percentage
await job . updateProgress ((( i + 1 ) / items . length ) * 100 );
// Or report detailed object
await job . updateProgress ({
processed: i + 1 ,
total: items . length ,
currentItem: items [ i ]. id ,
});
}
return { totalProcessed: items . length };
});
TypeScript Generics
Define types for job data and return values:
interface MyJobData {
userId : string ;
action : string ;
metadata : Record < string , any >;
}
interface MyJobResult {
status : 'success' | 'failure' ;
message : string ;
timestamp : number ;
}
const worker = new Worker < MyJobData , MyJobResult >(
'queueName' ,
async ( job : Job < MyJobData , MyJobResult >) => {
const { userId , action } = job . data ; // Typed!
// Process...
return {
status: 'success' ,
message: `Processed ${ action } for ${ userId } ` ,
timestamp: Date . now (),
}; // Return type is checked!
},
);
Waiting Until Ready
Wait for the worker’s Redis connection to be ready:
const worker = new Worker ( 'queueName' , processorFunction );
// Useful in tests
await worker . waitUntilReady ();
console . log ( 'Worker is ready to process jobs' );
Graceful Shutdown
Properly close the worker:
// Wait for current jobs to complete
await worker . close ();
// Force immediate shutdown
await worker . close ( true );
See Graceful Shutdown for more details.
Global Job Events
Monitor jobs across all workers using QueueEvents:
import { QueueEvents } from 'bullmq' ;
const queueEvents = new QueueEvents ( 'queueName' );
queueEvents . on ( 'completed' , ({ jobId , returnvalue }) => {
console . log ( `Job ${ jobId } completed by any worker` );
});
queueEvents . on ( 'failed' , ({ jobId , failedReason }) => {
console . log ( `Job ${ jobId } failed:` , failedReason );
});
queueEvents . on ( 'progress' , ({ jobId , data }) => {
console . log ( `Job ${ jobId } progress:` , data );
});
Best Practices
Always handle errors
Attach an error listener to prevent crashes: worker . on ( 'error' , err => console . error ( err ));
Keep processors async
Use async/await or return promises to avoid blocking the event loop.
Report progress for long jobs
Help monitor long-running jobs by calling job.updateProgress().
Handle shutdown gracefully
Call worker.close() on process termination: process . on ( 'SIGTERM' , async () => {
await worker . close ();
process . exit ( 0 );
});
Use sandboxing for CPU-intensive work
Concurrency Process multiple jobs in parallel
Sandboxed Processors Isolate CPU-intensive processors
Graceful Shutdown Properly close workers
Cancelling Jobs Cancel jobs in progress
API Reference