Skip to main content

Overview

A Queue is the central data structure in BullMQ that holds jobs waiting to be processed. Queues are lightweight and can handle both small message-like jobs and larger long-running jobs.
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', {
  connection: {
    host: 'localhost',
    port: 6379,
  },
});
When you instantiate a Queue, BullMQ creates or connects to an existing queue in Redis. Queues are persistent and survive application restarts.

Adding Jobs

The primary method for interacting with a queue is add(), which adds jobs to the queue:

Basic Job Addition

// Add a job with name and data
await queue.add('paint-car', {
  color: 'red',
  make: 'Tesla',
  model: 'Model 3',
});

Job Options

Customize job behavior with options:
// Process this job after 5 seconds
await queue.add('send-email', 
  { to: '[email protected]', subject: 'Welcome' },
  { delay: 5000 }
);

Bulk Operations

Add multiple jobs atomically for better performance:
const jobs = await queue.addBulk([
  { name: 'job1', data: { foo: 'bar' }, opts: { priority: 1 } },
  { name: 'job2', data: { qux: 'baz' }, opts: { delay: 2000 } },
  { name: 'job3', data: { hello: 'world' } },
]);

console.log(`Added ${jobs.length} jobs`);

Job Options Reference

JobsOptions Interface

interface JobsOptions {
  // Retry configuration
  attempts?: number;              // Number of retry attempts (default: 0)
  backoff?: BackoffOptions;       // Backoff strategy for retries
  
  // Timing
  delay?: number;                 // Delay in milliseconds before processing
  timestamp?: number;             // Custom creation timestamp
  
  // Priority
  priority?: number;              // Priority (0-2097152, lower = higher priority)
  lifo?: boolean;                 // Process as Last In First Out
  
  // Job identification
  jobId?: string;                 // Custom job ID (cannot be integers or contain ':')
  
  // Cleanup
  removeOnComplete?: boolean | number | KeepJobs;  // Auto-remove on completion
  removeOnFail?: boolean | number | KeepJobs;      // Auto-remove on failure
  
  // Dependencies
  parent?: { id: string; queue: string };  // Parent job for flows
  
  // Advanced
  deduplication?: { id: string };          // Prevent duplicate jobs
  repeat?: RepeatOptions;                  // Repeating job configuration
  stackTraceLimit?: number;                // Stack trace depth for errors
}

Queue Management

Pausing and Resuming

// Pause the queue globally
// Workers will finish current jobs but won't pick new ones
await queue.pause();

// Check if paused
const isPaused = await queue.isPaused();
console.log('Queue paused:', isPaused);

Draining and Cleaning

// Remove all waiting and delayed jobs (not active ones)
await queue.drain();

// Remove all delayed jobs as well
await queue.drain(true);

// Clean completed jobs older than 1 hour
const deletedJobs = await queue.clean(3600000, 100, 'completed');
console.log(`Cleaned ${deletedJobs.length} jobs`);

// Clean failed jobs
await queue.clean(3600000, 100, 'failed');

Obliterate

Completely destroy a queue and all its data. This operation is irreversible.
// Obliterate the queue (requires no active jobs)
await queue.obliterate();

// Force obliteration even with active jobs
await queue.obliterate({ force: true });

Rate Limiting

Global Rate Limit

Limit the number of jobs processed across all workers:
// Set rate limit: max 100 jobs per 60 seconds
await queue.setGlobalRateLimit(100, 60000);

// Remove rate limit
await queue.removeGlobalRateLimit();

// Manually trigger rate limit for specific duration
await queue.rateLimit(30000);  // Rate limited for 30 seconds

Global Concurrency

Limit concurrent job processing globally:
// Maximum 5 jobs processed simultaneously across all workers
await queue.setGlobalConcurrency(5);

// Remove concurrency limit
await queue.removeGlobalConcurrency();

Job Schedulers

Create recurring jobs with job schedulers:
// Create a repeating job
await queue.upsertJobScheduler(
  'send-daily-report',
  {
    pattern: '0 9 * * *',  // Cron pattern: 9 AM daily
  },
  {
    name: 'daily-report',
    data: { type: 'summary' },
    opts: { priority: 1 },
  }
);

// Get all job schedulers
const schedulers = await queue.getJobSchedulers();

// Remove a scheduler
await queue.removeJobScheduler('send-daily-report');

Queue Events

Listen to queue-level events:
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', { connection: { ... } });

queue.on('waiting', (job) => {
  console.log(`Job ${job.id} is waiting`);
});

queue.on('progress', (jobId, progress) => {
  console.log(`Job ${jobId} progress: ${progress}%`);
});

queue.on('removed', (jobId) => {
  console.log(`Job ${jobId} was removed`);
});

queue.on('cleaned', (jobs, type) => {
  console.log(`Cleaned ${jobs.length} ${type} jobs`);
});

queue.on('error', (err) => {
  console.error('Queue error:', err);
});

Retrieving Jobs

Get Individual Jobs

import { Job } from 'bullmq';

// Get job by ID
const job = await Job.fromId(queue, 'job-123');

if (job) {
  console.log('Job data:', job.data);
  console.log('Job state:', await job.getState());
}

Get Job Counts

// Get counts for different job states
const counts = await queue.getJobCounts(
  'waiting',
  'active',
  'completed',
  'failed',
  'delayed',
  'paused'
);

console.log('Active jobs:', counts.active);
console.log('Failed jobs:', counts.failed);

Get Jobs by State

// Get waiting jobs
const waitingJobs = await queue.getWaiting(0, 10);  // First 10 jobs

// Get active jobs
const activeJobs = await queue.getActive();

// Get completed jobs
const completedJobs = await queue.getCompleted(0, 100);

// Get failed jobs
const failedJobs = await queue.getFailed(0, 100);

Queue Metadata

// Get queue version
const version = await queue.getVersion();
console.log('BullMQ version:', version);

// Check if queue is maxed out (reached concurrency limit)
const isMaxed = await queue.isMaxed();

TypeScript Support

Type-safe queues with generics:
interface EmailJob {
  to: string;
  subject: string;
  body: string;
}

interface EmailResult {
  messageId: string;
  sent: boolean;
}

type EmailJobName = 'welcome' | 'notification' | 'alert';

const emailQueue = new Queue<EmailJob, EmailResult, EmailJobName>('emails', {
  connection: { host: 'localhost', port: 6379 },
});

// Type-safe job addition
await emailQueue.add('welcome', {
  to: '[email protected]',
  subject: 'Welcome!',
  body: 'Thanks for signing up',
});

Advanced Features

Deduplication

Prevent duplicate jobs based on a unique ID:
await queue.add(
  'process-order',
  { orderId: '12345' },
  {
    deduplication: {
      id: 'order-12345',  // Only one job with this ID can exist
    },
  }
);

// Remove deduplication key manually
await queue.removeDeduplicationKey('order-12345');

Job Retry

Retry all failed jobs:
// Retry all failed jobs
await queue.retryJobs({ state: 'failed' });

// Retry only recent failures (last hour)
await queue.retryJobs({
  state: 'failed',
  timestamp: Date.now() - 3600000,
});

Job Promotion

Promote delayed jobs to waiting:
// Promote all delayed jobs
await queue.promoteJobs();

// Promote with limit
await queue.promoteJobs({ count: 10 });

Best Practices

Use Bulk Operations

When adding multiple jobs, use addBulk() for better performance through atomic operations.

Set Job TTL

Use removeOnComplete and removeOnFail to prevent Redis memory bloat.

Monitor Queue Health

Regularly check job counts and clean old jobs to maintain performance.

Handle Errors Gracefully

Always listen to the ‘error’ event to prevent unhandled exceptions.

Next Steps

Workers

Learn how to process jobs with workers

Jobs

Deep dive into job lifecycle and options

Events

Monitor queue activity with events

Build docs developers (and LLMs) love