Control the rate at which jobs are processed in your BullMQ queues
BullMQ provides queue rate limiting capabilities to control how many jobs are processed within a given time period. This is useful for respecting external API limits, protecting downstream services, or managing resource consumption.
The rate limiter is global across all workers for the same queue. If you have 10 workers with the settings above, only 10 jobs total will be processed per second across all workers.
Jobs that get rate limited will stay in the waiting state until the rate limit window resets.
Sometimes you need dynamic rate limiting based on runtime conditions, such as receiving a 429 Too Many Requests response from an API:
import { Worker } from 'bullmq';const worker = new Worker( 'myQueue', async (job) => { try { const response = await fetch(job.data.url); if (response.status === 429) { // Get retry-after header (in seconds) const retryAfter = response.headers.get('retry-after'); const delayMs = retryAfter ? parseInt(retryAfter) * 1000 : 5000; // Apply manual rate limit await worker.rateLimit(delayMs); // Throw special error to move job back to wait throw Worker.RateLimitError(); } return response.json(); } catch (error) { if (error instanceof Worker.RateLimitError) { throw error; } // Handle other errors throw new Error(`Failed to fetch: ${error.message}`); } }, { connection: { host: 'localhost', port: 6379, }, limiter: { max: 1, duration: 500, }, },);
You must include limiter options in your worker configuration for manual rate limiting to work. The limiter.max value determines if rate limit validation is executed.
When using manual rate limiting, you must throw Worker.RateLimitError() to differentiate rate limiting from actual job failures. This ensures the job returns to the waiting state instead of being marked as failed.
Rate limiting controls how many jobs are processed over a time period:
limiter: { max: 100, duration: 60000 } // 100 jobs per minute
Concurrency controls how many jobs run simultaneously:
concurrency: 10 // 10 jobs at the same time
These work together:
import { Worker } from 'bullmq';const worker = new Worker('tasks', async job => { return await processJob(job);}, { concurrency: 5, // Max 5 jobs running at once limiter: { max: 100, // Max 100 jobs per minute duration: 60000, },});