Understand how BullMQ detects and recovers stalled jobs
A stalled job is a job that was being processed by a worker but didn’t complete within the expected time frame. BullMQ automatically detects and recovers stalled jobs, moving them back to the waiting state for reprocessing.
When a worker starts processing a job, it acquires a lock with a TTL (time-to-live).
2
Lock renewal
The worker automatically renews the lock periodically while processing.
3
Stalled detection
If the lock expires (not renewed in time), the job is marked as stalled.
4
Recovery
The stalled checker moves the job back to waiting for another worker to process.
import { Worker } from 'bullmq';const worker = new Worker('queueName', processorFunction, { // Lock duration: How long before a job is considered stalled lockDuration: 30000, // 30 seconds (default) // Stalled check interval: How often to check for stalled jobs stalledInterval: 30000, // 30 seconds (default) // Lock renewal frequency (auto-calculated as lockDuration / 2) lockRenewTime: 15000, // 15 seconds});
import { Worker } from 'bullmq';import path from 'path';// Run CPU-intensive work in separate processconst worker = new Worker( 'cpuQueue', path.join(__dirname, 'cpu-processor.js'), { useWorkerThreads: true, concurrency: 4, },);
// Jobs take ~20 seconds on averageconst worker = new Worker('queue', processorFunction, { lockDuration: 30000, // 20s + 10s buffer = 30s});// Jobs take ~5 minutes on averageconst longWorker = new Worker('longQueue', longProcessor, { lockDuration: 360000, // 6 minutes});
import os from 'os';// For I/O work: High concurrency is fineconst ioWorker = new Worker('ioQueue', ioProcessor, { concurrency: 50,});// For CPU work: Match CPU coresconst cpuWorker = new Worker('cpuQueue', './cpu-processor.js', { concurrency: os.cpus().length,});
import { Worker } from 'bullmq';const worker = new Worker('queueName', processorFunction, { lockDuration: 30000, maxStalledCount: 2,});// Job was moved back to waiting due to stallworker.on('stalled', (jobId: string, prev: string) => { console.log(`Job ${jobId} stalled and moved from ${prev} back to waiting`); // Alert or log for monitoring alerting.notify(`Job ${jobId} stalled`);});// Track lock renewalworker.on('locksRenewed', ({ count, jobIds }) => { console.log(`Renewed locks for ${count} jobs:`, jobIds);});// Lock renewal failedworker.on('lockRenewalFailed', (jobIds: string[]) => { console.error('Lock renewal failed for jobs:', jobIds); // Optionally cancel these jobs jobIds.forEach(id => worker.cancelJob(id));});