Overview
The Worker class represents a worker that processes jobs from the queue. As soon as the class is instantiated and a connection to Redis is established, it will start processing jobs.
Constructor
new Worker(
name: string,
processor?: Processor | string | URL,
opts?: WorkerOptions
)
The name of the queue to process
The processor function or path to processor file
Configuration options for the worker
Example
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job) => {
// Process the job
console.log('Processing job', job.id, job.data);
return { processed: true };
}, {
connection: {
host: 'localhost',
port: 6379,
},
concurrency: 5,
});
Properties
Unique identifier for this worker instance.
concurrency
The maximum number of jobs this worker will process concurrently.
opts
readonly opts: WorkerOptions
The worker configuration options.
Methods
run
Starts processing jobs from the queue.
pause
Pauses the processing of jobs for this worker.
pause(doNotWaitActive?: boolean): Promise<void>
If true, does not wait for active jobs to finish
resume
Resumes processing jobs after being paused.
isPaused
Checks if the worker is currently paused.
isRunning
Checks if the worker is currently running.
getNextJob
Manually fetches the next job from the queue.
getNextJob(token: string, opts?: { block?: boolean }): Promise<Job | undefined>
Worker token to assign to retrieved job
Whether to block waiting for a job
cancelJob
Cancels a specific job currently being processed.
cancelJob(jobId: string, reason?: string): boolean
The ID of the job to cancel
Optional reason for cancellation
True if the job was found and cancelled
cancelAllJobs
Cancels all jobs currently being processed by this worker.
cancelAllJobs(reason?: string): void
Optional reason for cancellation
close
Closes the worker and related Redis connections.
close(force?: boolean): Promise<void>
If true, does not wait for current jobs to be processed
startStalledCheckTimer
Manually starts the stalled job checker.
startStalledCheckTimer(): Promise<void>
Static Methods
RateLimitError
Creates a rate limit error to throw from a processor.
static RateLimitError(): Error
Events
The Worker class extends EventEmitter and emits the following events:
active
Emitted when a job enters the active state.
worker.on('active', (job: Job, prev: string) => {
console.log(`Job ${job.id} is now active`);
});
completed
Emitted when a job has successfully completed.
worker.on('completed', (job: Job, result: any, prev: string) => {
console.log(`Job ${job.id} completed with result:`, result);
});
failed
Emitted when a job has failed.
worker.on('failed', (job: Job | undefined, error: Error, prev: string) => {
console.log(`Job ${job?.id} failed:`, error.message);
});
progress
Emitted when a job updates its progress.
worker.on('progress', (job: Job, progress: number | object) => {
console.log(`Job ${job.id} progress:`, progress);
});
drained
Emitted when the queue has drained the waiting list.
worker.on('drained', () => {
console.log('Queue is drained');
});
paused
Emitted when the worker is paused.
worker.on('paused', () => {
console.log('Worker paused');
});
resumed
Emitted when the worker is resumed.
worker.on('resumed', () => {
console.log('Worker resumed');
});
stalled
Emitted when a job has stalled.
worker.on('stalled', (jobId: string, prev: string) => {
console.log(`Job ${jobId} stalled`);
});
error
Emitted when an error occurs.
worker.on('error', (error: Error) => {
console.error('Worker error:', error);
});
closing
Emitted when the worker is closing.
worker.on('closing', (msg: string) => {
console.log('Worker closing:', msg);
});
closed
Emitted when the worker has closed.
worker.on('closed', () => {
console.log('Worker closed');
});