Skip to main content
BullMQ supports graceful job cancellation using the standard AbortController and AbortSignal APIs. This allows you to cancel jobs that are currently being processed and perform cleanup operations.

How Job Cancellation Works

When you cancel a job:
  1. The worker’s cancelJob() method aborts the signal passed to the processor
  2. The processor receives the abort event and can perform cleanup
  3. The processor should throw an error to mark the job as failed
  4. The job moves to the failed state (with or without retries, depending on error type)

Processor with Cancellation Support

Add the optional signal parameter to your processor function:
import { Worker, Job } from 'bullmq';

const worker = new Worker(
  'queueName',
  async (job: Job, token?: string, signal?: AbortSignal) => {
    // Your job processing logic
    return await processJob(job.data);
  },
);
The processor function signature:
type Processor<T = any, R = any, N extends string = string> = (
  job: Job<T, R, N>,
  token?: string,
  signal?: AbortSignal,
) => Promise<R>;
  • job: The job instance
  • token: Lock token for the job
  • signal: Optional AbortSignal for cancellation detection

Cancelling Jobs

Use the Worker class methods to cancel active jobs:
import { Worker } from 'bullmq';

const worker = new Worker('queueName', processorFunction);

// Cancel a specific job by ID
const cancelled = worker.cancelJob('job-id-123');
if (cancelled) {
  console.log('Job was cancelled');
} else {
  console.log('Job was not active (already completed or not found)');
}

// Cancel with a reason
worker.cancelJob('job-id-456', 'User requested cancellation');

// Cancel all active jobs on this worker
worker.cancelAllJobs('System shutdown');

Getting Active Jobs

Find jobs to cancel:
import { Queue } from 'bullmq';

const queue = new Queue('queueName');

// Get all active jobs
const activeJobs = await queue.getActive();
console.log('Active jobs:', activeJobs.map(j => j.id));

// Cancel a specific active job
if (activeJobs.length > 0) {
  const jobId = activeJobs[0].id;
  worker.cancelJob(jobId, 'Cancelling first active job');
}
Use the event-based approach for immediate response:
import { Worker } from 'bullmq';

const worker = new Worker('queueName', async (job, token, signal) => {
  return new Promise((resolve, reject) => {
    // Listen for abort event (recommended)
    signal?.addEventListener('abort', () => {
      console.log(`Job ${job.id} cancellation requested`);
      const reason = signal.reason || 'No reason provided';
      
      // Clean up resources
      cleanupResources();
      
      // Reject to mark job as failed
      reject(new Error(`Cancelled: ${reason}`));
    });
    
    // Your processing logic
    const processData = async () => {
      const result = await performWork(job.data);
      resolve(result);
    };
    
    processData().catch(reject);
  });
});

// Later, cancel the job
worker.cancelJob(jobId, 'Resource limit exceeded');

Why Event-Based?

Immediate response - No polling delay
More efficient - No CPU wasted checking in loops
Cleaner code - Separation of concerns
Standard pattern - Matches Web APIs like fetch()

Using with Native APIs

Many APIs natively support AbortSignal:
import { Worker } from 'bullmq';

const worker = new Worker('fetchQueue', async (job, token, signal) => {
  return new Promise(async (resolve, reject) => {
    // Handle cancellation
    signal?.addEventListener('abort', () => {
      reject(new Error('Job was cancelled'));
    });
    
    // Pass signal to fetch - cancels HTTP request automatically
    const response = await fetch(job.data.url, {
      signal, // ✅ Cancels the request at network level
      method: 'GET',
      headers: job.data.headers,
    });
    
    const data = await response.json();
    resolve(data);
  });
});
APIs that support AbortSignal:
  • fetch(url, { signal }) - HTTP requests
  • addEventListener(event, handler, { signal }) - Auto-removes listener on abort
  • Database clients (Postgres, MongoDB drivers)
  • File system operations (newer Node.js APIs)

Polling Pattern (Alternative)

Check signal.aborted periodically:
import { Worker } from 'bullmq';

const worker = new Worker('batchQueue', async (job, token, signal) => {
  const items = job.data.items;
  const results = [];
  
  for (let i = 0; i < items.length; i++) {
    // Check if job has been cancelled
    if (signal?.aborted) {
      throw new Error(`Cancelled after processing ${i} items`);
    }
    
    const result = await processItem(items[i]);
    results.push(result);
    
    // Update progress
    await job.updateProgress(((i + 1) / items.length) * 100);
  }
  
  return { results, total: results.length };
});
The polling pattern is simpler but less efficient than the event-based approach. Use it for simple loops or when you can’t use event listeners.

Cancellation with Cleanup

Perform async cleanup before failing the job:
import { Worker } from 'bullmq';

const worker = new Worker('dbQueue', async (job, token, signal) => {
  // Acquire resources
  const db = await connectToDatabase();
  const cache = await connectToCache();
  
  return new Promise(async (resolve, reject) => {
    // Set up cleanup handler
    signal?.addEventListener('abort', async () => {
      try {
        console.log('Cleaning up resources...');
        
        // Close connections gracefully
        await db.close();
        await cache.disconnect();
        
        console.log('Cleanup complete');
        reject(new Error('Cancelled after cleanup'));
      } catch (cleanupError) {
        console.error('Cleanup failed:', cleanupError);
        reject(new Error('Cleanup failed during cancellation'));
      }
    });
    
    try {
      // Do your work
      const result = await processWithDatabase(db, job.data);
      await cache.set(`job:${job.id}`, result);
      resolve(result);
    } catch (error) {
      // Cleanup on error too
      await db.close();
      await cache.disconnect();
      throw error;
    }
  });
});

Job State After Cancellation

With Regular Error (Will Retry)

Throw a regular Error to allow retries:
const worker = new Worker('retryQueue', async (job, token, signal) => {
  return new Promise((resolve, reject) => {
    signal?.addEventListener('abort', () => {
      // Regular Error - job will retry if attempts remain
      reject(new Error('Cancelled, will retry'));
    });
    
    // Your work...
  });
});

// Set attempts when adding jobs
await queue.add('task', data, { attempts: 3 });
  • Job state: Moves to failed
  • Retries: Job WILL be retried if attempts remain
  • Use case: When you want the job to be retried later

With UnrecoverableError (No Retry)

Throw UnrecoverableError to prevent retries:
import { Worker, UnrecoverableError } from 'bullmq';

const worker = new Worker('noRetryQueue', async (job, token, signal) => {
  return new Promise((resolve, reject) => {
    signal?.addEventListener('abort', () => {
      // UnrecoverableError - no retries
      reject(new UnrecoverableError('Cancelled permanently'));
    });
    
    // Your work...
  });
});
  • Job state: Moves to failed
  • Retries: Job will NOT be retried
  • Use case: When cancellation should be permanent

Handling Lock Renewal Failures

When a worker loses its lock, cancel the job gracefully:
import { Worker } from 'bullmq';

const worker = new Worker(
  'myQueue',
  async (job, token, signal) => {
    return new Promise(async (resolve, reject) => {
      signal?.addEventListener('abort', async () => {
        console.log('Job cancelled - cleaning up resources');
        await cleanupResources();
        reject(new Error('Job cancelled'));
      });
      
      // Your work...
    });
  },
);

// Cancel jobs when lock renewal fails
worker.on('lockRenewalFailed', (jobIds: string[]) => {
  console.log('Lock renewal failed for jobs:', jobIds);
  jobIds.forEach(jobId => worker.cancelJob(jobId, 'Lost lock'));
});
When a worker loses the lock, it cannot move the job to failed state. The job remains active temporarily and will be moved back to waiting by the stalled job checker. This is correct behavior - trust BullMQ’s recovery mechanism.

Multi-Phase Work with Cancellation

Check cancellation at strategic points:
const worker = new Worker('multiPhaseQueue', async (job, token, signal) => {
  return new Promise(async (resolve, reject) => {
    signal?.addEventListener('abort', () => {
      reject(new Error('Cancelled'));
    });
    
    try {
      // Phase 1: Download
      if (signal?.aborted) throw new Error('Cancelled before download');
      const data = await downloadData(job.data.url);
      await job.updateProgress(33);
      
      // Phase 2: Process
      if (signal?.aborted) throw new Error('Cancelled before processing');
      const processed = await processData(data);
      await job.updateProgress(66);
      
      // Phase 3: Upload
      if (signal?.aborted) throw new Error('Cancelled before upload');
      const result = await uploadResults(processed);
      await job.updateProgress(100);
      
      resolve(result);
    } catch (error) {
      reject(error);
    }
  });
});

Cancelling Custom Operations

For operations without native AbortSignal support:
const worker = new Worker('customQueue', async (job, token, signal) => {
  // Start your operation
  const operation = startLongRunningOperation(job.data);
  
  // Set up cancellation handler
  signal?.addEventListener('abort', () => {
    operation.cancel(); // ✅ Actually stops the work
  });
  
  try {
    const result = await operation.promise;
    return result;
  } catch (error) {
    if (signal?.aborted) {
      throw new Error('Operation cancelled');
    }
    throw error;
  }
});

Backward Compatibility

Processors without signal handling still work:
// Old processor - still works
const oldWorker = new Worker('myQueue', async (job) => {
  return await processJob(job);
});

// New processor - with cancellation support
const newWorker = new Worker('myQueue', async (job, token, signal) => {
  // Can now handle cancellation
});
The signal parameter is optional. Existing processors continue to work without modification.

Sandboxed Processors and Cancellation

Sandboxed processors automatically support cancellation:
// processor.js
module.exports = async (job, token, signal) => {
  if (signal?.aborted) {
    throw new Error('Cancelled before starting');
  }
  
  signal?.addEventListener('abort', () => {
    console.log('Sandboxed job cancelled');
  });
  
  // Process...
  return result;
};

// worker.js
import { Worker } from 'bullmq';

const worker = new Worker('queue', './processor.js');

// Cancel sandboxed job
worker.cancelJob('job-id-123');
See Sandboxed Processors for more details.

Best Practices

1

Use event-based cancellation

Attach abort event listener for immediate response.
2

Clean up resources

Always cleanup in the abort handler to prevent resource leaks.
3

Choose appropriate error type

Use UnrecoverableError for permanent cancellation, regular Error for retries.
4

Check cancellation at key points

In long-running jobs, check signal.aborted between phases.
5

Combine with timeouts

Use job options for timeout-based cancellation:
await queue.add('job', data, {
  timeout: 30000, // Cancel after 30 seconds
});
6

Handle cleanup errors gracefully

Catch errors during cleanup to avoid masking the cancellation.

Stalled Jobs

Understand lock renewal failures

Graceful Shutdown

Cancel jobs during shutdown

Sandboxed Processors

Cancel sandboxed jobs

Worker Options

Configure worker behavior

API Reference

Build docs developers (and LLMs) love