BullMQ supports graceful job cancellation using the standard AbortController and AbortSignal APIs. This allows you to cancel jobs that are currently being processed and perform cleanup operations.
How Job Cancellation Works
When you cancel a job:
The worker’s cancelJob() method aborts the signal passed to the processor
The processor receives the abort event and can perform cleanup
The processor should throw an error to mark the job as failed
The job moves to the failed state (with or without retries, depending on error type)
Processor with Cancellation Support
Add the optional signal parameter to your processor function:
import { Worker , Job } from 'bullmq' ;
const worker = new Worker (
'queueName' ,
async ( job : Job , token ?: string , signal ?: AbortSignal ) => {
// Your job processing logic
return await processJob ( job . data );
},
);
The processor function signature:
type Processor < T = any , R = any , N extends string = string > = (
job : Job < T , R , N >,
token ?: string ,
signal ?: AbortSignal ,
) => Promise < R >;
job : The job instance
token : Lock token for the job
signal : Optional AbortSignal for cancellation detection
Cancelling Jobs
Use the Worker class methods to cancel active jobs:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'queueName' , processorFunction );
// Cancel a specific job by ID
const cancelled = worker . cancelJob ( 'job-id-123' );
if ( cancelled ) {
console . log ( 'Job was cancelled' );
} else {
console . log ( 'Job was not active (already completed or not found)' );
}
// Cancel with a reason
worker . cancelJob ( 'job-id-456' , 'User requested cancellation' );
// Cancel all active jobs on this worker
worker . cancelAllJobs ( 'System shutdown' );
Getting Active Jobs
Find jobs to cancel:
import { Queue } from 'bullmq' ;
const queue = new Queue ( 'queueName' );
// Get all active jobs
const activeJobs = await queue . getActive ();
console . log ( 'Active jobs:' , activeJobs . map ( j => j . id ));
// Cancel a specific active job
if ( activeJobs . length > 0 ) {
const jobId = activeJobs [ 0 ]. id ;
worker . cancelJob ( jobId , 'Cancelling first active job' );
}
Handling Cancellation (Recommended Pattern)
Use the event-based approach for immediate response:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'queueName' , async ( job , token , signal ) => {
return new Promise (( resolve , reject ) => {
// Listen for abort event (recommended)
signal ?. addEventListener ( 'abort' , () => {
console . log ( `Job ${ job . id } cancellation requested` );
const reason = signal . reason || 'No reason provided' ;
// Clean up resources
cleanupResources ();
// Reject to mark job as failed
reject ( new Error ( `Cancelled: ${ reason } ` ));
});
// Your processing logic
const processData = async () => {
const result = await performWork ( job . data );
resolve ( result );
};
processData (). catch ( reject );
});
});
// Later, cancel the job
worker . cancelJob ( jobId , 'Resource limit exceeded' );
Why Event-Based?
✅ Immediate response - No polling delay
✅ More efficient - No CPU wasted checking in loops
✅ Cleaner code - Separation of concerns
✅ Standard pattern - Matches Web APIs like fetch()
Using with Native APIs
Many APIs natively support AbortSignal:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'fetchQueue' , async ( job , token , signal ) => {
return new Promise ( async ( resolve , reject ) => {
// Handle cancellation
signal ?. addEventListener ( 'abort' , () => {
reject ( new Error ( 'Job was cancelled' ));
});
// Pass signal to fetch - cancels HTTP request automatically
const response = await fetch ( job . data . url , {
signal , // ✅ Cancels the request at network level
method: 'GET' ,
headers: job . data . headers ,
});
const data = await response . json ();
resolve ( data );
});
});
APIs that support AbortSignal:
fetch(url, { signal }) - HTTP requests
addEventListener(event, handler, { signal }) - Auto-removes listener on abort
Database clients (Postgres, MongoDB drivers)
File system operations (newer Node.js APIs)
Polling Pattern (Alternative)
Check signal.aborted periodically:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'batchQueue' , async ( job , token , signal ) => {
const items = job . data . items ;
const results = [];
for ( let i = 0 ; i < items . length ; i ++ ) {
// Check if job has been cancelled
if ( signal ?. aborted ) {
throw new Error ( `Cancelled after processing ${ i } items` );
}
const result = await processItem ( items [ i ]);
results . push ( result );
// Update progress
await job . updateProgress ((( i + 1 ) / items . length ) * 100 );
}
return { results , total: results . length };
});
The polling pattern is simpler but less efficient than the event-based approach. Use it for simple loops or when you can’t use event listeners.
Cancellation with Cleanup
Perform async cleanup before failing the job:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'dbQueue' , async ( job , token , signal ) => {
// Acquire resources
const db = await connectToDatabase ();
const cache = await connectToCache ();
return new Promise ( async ( resolve , reject ) => {
// Set up cleanup handler
signal ?. addEventListener ( 'abort' , async () => {
try {
console . log ( 'Cleaning up resources...' );
// Close connections gracefully
await db . close ();
await cache . disconnect ();
console . log ( 'Cleanup complete' );
reject ( new Error ( 'Cancelled after cleanup' ));
} catch ( cleanupError ) {
console . error ( 'Cleanup failed:' , cleanupError );
reject ( new Error ( 'Cleanup failed during cancellation' ));
}
});
try {
// Do your work
const result = await processWithDatabase ( db , job . data );
await cache . set ( `job: ${ job . id } ` , result );
resolve ( result );
} catch ( error ) {
// Cleanup on error too
await db . close ();
await cache . disconnect ();
throw error ;
}
});
});
Job State After Cancellation
With Regular Error (Will Retry)
Throw a regular Error to allow retries:
const worker = new Worker ( 'retryQueue' , async ( job , token , signal ) => {
return new Promise (( resolve , reject ) => {
signal ?. addEventListener ( 'abort' , () => {
// Regular Error - job will retry if attempts remain
reject ( new Error ( 'Cancelled, will retry' ));
});
// Your work...
});
});
// Set attempts when adding jobs
await queue . add ( 'task' , data , { attempts: 3 });
Job state : Moves to failed
Retries : Job WILL be retried if attempts remain
Use case : When you want the job to be retried later
With UnrecoverableError (No Retry)
Throw UnrecoverableError to prevent retries:
import { Worker , UnrecoverableError } from 'bullmq' ;
const worker = new Worker ( 'noRetryQueue' , async ( job , token , signal ) => {
return new Promise (( resolve , reject ) => {
signal ?. addEventListener ( 'abort' , () => {
// UnrecoverableError - no retries
reject ( new UnrecoverableError ( 'Cancelled permanently' ));
});
// Your work...
});
});
Job state : Moves to failed
Retries : Job will NOT be retried
Use case : When cancellation should be permanent
Handling Lock Renewal Failures
When a worker loses its lock, cancel the job gracefully:
import { Worker } from 'bullmq' ;
const worker = new Worker (
'myQueue' ,
async ( job , token , signal ) => {
return new Promise ( async ( resolve , reject ) => {
signal ?. addEventListener ( 'abort' , async () => {
console . log ( 'Job cancelled - cleaning up resources' );
await cleanupResources ();
reject ( new Error ( 'Job cancelled' ));
});
// Your work...
});
},
);
// Cancel jobs when lock renewal fails
worker . on ( 'lockRenewalFailed' , ( jobIds : string []) => {
console . log ( 'Lock renewal failed for jobs:' , jobIds );
jobIds . forEach ( jobId => worker . cancelJob ( jobId , 'Lost lock' ));
});
When a worker loses the lock, it cannot move the job to failed state. The job remains active temporarily and will be moved back to waiting by the stalled job checker. This is correct behavior - trust BullMQ’s recovery mechanism.
Multi-Phase Work with Cancellation
Check cancellation at strategic points:
const worker = new Worker ( 'multiPhaseQueue' , async ( job , token , signal ) => {
return new Promise ( async ( resolve , reject ) => {
signal ?. addEventListener ( 'abort' , () => {
reject ( new Error ( 'Cancelled' ));
});
try {
// Phase 1: Download
if ( signal ?. aborted ) throw new Error ( 'Cancelled before download' );
const data = await downloadData ( job . data . url );
await job . updateProgress ( 33 );
// Phase 2: Process
if ( signal ?. aborted ) throw new Error ( 'Cancelled before processing' );
const processed = await processData ( data );
await job . updateProgress ( 66 );
// Phase 3: Upload
if ( signal ?. aborted ) throw new Error ( 'Cancelled before upload' );
const result = await uploadResults ( processed );
await job . updateProgress ( 100 );
resolve ( result );
} catch ( error ) {
reject ( error );
}
});
});
Cancelling Custom Operations
For operations without native AbortSignal support:
const worker = new Worker ( 'customQueue' , async ( job , token , signal ) => {
// Start your operation
const operation = startLongRunningOperation ( job . data );
// Set up cancellation handler
signal ?. addEventListener ( 'abort' , () => {
operation . cancel (); // ✅ Actually stops the work
});
try {
const result = await operation . promise ;
return result ;
} catch ( error ) {
if ( signal ?. aborted ) {
throw new Error ( 'Operation cancelled' );
}
throw error ;
}
});
Backward Compatibility
Processors without signal handling still work:
// Old processor - still works
const oldWorker = new Worker ( 'myQueue' , async ( job ) => {
return await processJob ( job );
});
// New processor - with cancellation support
const newWorker = new Worker ( 'myQueue' , async ( job , token , signal ) => {
// Can now handle cancellation
});
The signal parameter is optional. Existing processors continue to work without modification.
Sandboxed Processors and Cancellation
Sandboxed processors automatically support cancellation:
// processor.js
module . exports = async ( job , token , signal ) => {
if ( signal ?. aborted ) {
throw new Error ( 'Cancelled before starting' );
}
signal ?. addEventListener ( 'abort' , () => {
console . log ( 'Sandboxed job cancelled' );
});
// Process...
return result ;
};
// worker.js
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'queue' , './processor.js' );
// Cancel sandboxed job
worker . cancelJob ( 'job-id-123' );
See Sandboxed Processors for more details.
Best Practices
Use event-based cancellation
Attach abort event listener for immediate response.
Clean up resources
Always cleanup in the abort handler to prevent resource leaks.
Choose appropriate error type
Use UnrecoverableError for permanent cancellation, regular Error for retries.
Check cancellation at key points
In long-running jobs, check signal.aborted between phases.
Combine with timeouts
Use job options for timeout-based cancellation: await queue . add ( 'job' , data , {
timeout: 30000 , // Cancel after 30 seconds
});
Handle cleanup errors gracefully
Catch errors during cleanup to avoid masking the cancellation.
Stalled Jobs Understand lock renewal failures
Graceful Shutdown Cancel jobs during shutdown
Sandboxed Processors Cancel sandboxed jobs
Worker Options Configure worker behavior
API Reference