A Queue is the central data structure in BullMQ that holds jobs waiting to be processed. Queues are lightweight and can handle both small message-like jobs and larger long-running jobs.
import { Queue } from 'bullmq';const queue = new Queue('myQueue', { connection: { host: 'localhost', port: 6379, },});
When you instantiate a Queue, BullMQ creates or connects to an existing queue in Redis. Queues are persistent and survive application restarts.
// Pause the queue globally// Workers will finish current jobs but won't pick new onesawait queue.pause();// Check if pausedconst isPaused = await queue.isPaused();console.log('Queue paused:', isPaused);
// Remove all waiting and delayed jobs (not active ones)await queue.drain();// Remove all delayed jobs as wellawait queue.drain(true);// Clean completed jobs older than 1 hourconst deletedJobs = await queue.clean(3600000, 100, 'completed');console.log(`Cleaned ${deletedJobs.length} jobs`);// Clean failed jobsawait queue.clean(3600000, 100, 'failed');
Completely destroy a queue and all its data. This operation is irreversible.
// Obliterate the queue (requires no active jobs)await queue.obliterate();// Force obliteration even with active jobsawait queue.obliterate({ force: true });
Limit the number of jobs processed across all workers:
// Set rate limit: max 100 jobs per 60 secondsawait queue.setGlobalRateLimit(100, 60000);// Remove rate limitawait queue.removeGlobalRateLimit();// Manually trigger rate limit for specific durationawait queue.rateLimit(30000); // Rate limited for 30 seconds
// Maximum 5 jobs processed simultaneously across all workersawait queue.setGlobalConcurrency(5);// Remove concurrency limitawait queue.removeGlobalConcurrency();
// Get queue versionconst version = await queue.getVersion();console.log('BullMQ version:', version);// Check if queue is maxed out (reached concurrency limit)const isMaxed = await queue.isMaxed();
await queue.add( 'process-order', { orderId: '12345' }, { deduplication: { id: 'order-12345', // Only one job with this ID can exist }, });// Remove deduplication key manuallyawait queue.removeDeduplicationKey('order-12345');