Skip to main content

Overview

Throw RateLimitError from a job processor to indicate that the job should be retried later due to rate limiting.

Class

class RateLimitError extends Error {
  constructor(message?: string);
}

Creating Rate Limit Errors

Use the static method on the Worker class:
import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  // Throw rate limit error
  throw Worker.RateLimitError();
});
Or import the class directly:
import { RateLimitError } from 'bullmq';

const processor = async (job) => {
  throw new RateLimitError();
};

Usage

import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  // Check rate limit
  const canProceed = await checkRateLimit(job.data.userId);
  
  if (!canProceed) {
    // Rate limited, throw error to delay retry
    throw Worker.RateLimitError();
  }
  
  // Process the job
  return await processJob(job.data);
});

Examples

API Rate Limiting

const processor = async (job) => {
  try {
    return await callExternalAPI(job.data);
  } catch (error) {
    if (error.statusCode === 429) {
      // Too Many Requests
      throw Worker.RateLimitError();
    }
    throw error;
  }
};

Custom Rate Limiter

import { RateLimiter } from 'some-rate-limiter';

const limiter = new RateLimiter({
  max: 100,
  duration: 60000, // 100 requests per minute
});

const processor = async (job) => {
  const allowed = await limiter.consume(job.data.userId);
  
  if (!allowed) {
    throw Worker.RateLimitError();
  }
  
  return await processForUser(job.data.userId);
};

Service-Specific Rate Limits

const RATE_LIMITS = {
  'github': { max: 5000, duration: 3600000 },  // 5000/hour
  'twitter': { max: 300, duration: 900000 },   // 300/15min
};

const processor = async (job) => {
  const { service, endpoint } = job.data;
  const limit = RATE_LIMITS[service];
  
  const count = await getRequestCount(service);
  
  if (count >= limit.max) {
    throw Worker.RateLimitError();
  }
  
  await incrementRequestCount(service);
  return await callAPI(service, endpoint);
};

Token Bucket Algorithm

class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;
    this.tokens = capacity;
    this.refillRate = refillRate;
    this.lastRefill = Date.now();
  }
  
  consume() {
    this.refill();
    if (this.tokens > 0) {
      this.tokens--;
      return true;
    }
    return false;
  }
  
  refill() {
    const now = Date.now();
    const timePassed = now - this.lastRefill;
    const tokensToAdd = (timePassed / 1000) * this.refillRate;
    this.tokens = Math.min(this.capacity, this.tokens + tokensToAdd);
    this.lastRefill = now;
  }
}

const bucket = new TokenBucket(100, 10); // 100 capacity, 10/second

const processor = async (job) => {
  if (!bucket.consume()) {
    throw Worker.RateLimitError();
  }
  
  return await processJob(job.data);
};

Behavior

When RateLimitError is thrown:
  1. The job is moved back to the wait state
  2. A rate limit TTL is set on the queue
  3. Workers will wait before fetching new jobs
  4. The job’s attemptsMade counter is not incremented
  5. The job will be retried when the rate limit expires

Global Rate Limiting

Set a global rate limit on the queue:
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', {
  connection: { host: 'localhost', port: 6379 },
});

// Set global rate limit: 100 jobs per minute
await queue.setGlobalRateLimit(100, 60000);
Then workers will automatically respect this limit:
const worker = new Worker('myQueue', processor, {
  connection: { host: 'localhost', port: 6379 },
});
// Worker automatically respects global rate limit

Worker-Level Rate Limiting

const worker = new Worker('myQueue', processor, {
  connection: { host: 'localhost', port: 6379 },
  limiter: {
    max: 10,        // Max 10 jobs
    duration: 1000, // Per 1 second
  },
});

Combining with Other Limits

const processor = async (job) => {
  // Check external API rate limit
  const externalLimit = await checkExternalAPILimit();
  if (!externalLimit) {
    throw Worker.RateLimitError();
  }
  
  // Check user-specific rate limit
  const userLimit = await checkUserLimit(job.data.userId);
  if (!userLimit) {
    throw Worker.RateLimitError();
  }
  
  // Process the job
  return await callAPI(job.data);
};

Monitoring Rate Limits

import { Worker } from 'bullmq';

const worker = new Worker('myQueue', processor);

worker.on('failed', (job, error) => {
  if (error.message === 'bullmq:rateLimitExceeded') {
    console.log(`Job ${job.id} rate limited`);
    // Track rate limit hits in monitoring
  }
});

Dynamic Rate Limits

const getRateLimit = async (userId) => {
  const user = await getUser(userId);
  
  // Different limits for different user tiers
  if (user.tier === 'premium') {
    return { max: 1000, duration: 60000 };
  } else if (user.tier === 'basic') {
    return { max: 100, duration: 60000 };
  } else {
    return { max: 10, duration: 60000 };
  }
};

const processor = async (job) => {
  const limit = await getRateLimit(job.data.userId);
  const allowed = await checkLimit(job.data.userId, limit);
  
  if (!allowed) {
    throw Worker.RateLimitError();
  }
  
  return await processJob(job.data);
};

When to Use RateLimitError

Use RateLimitError when:
  • External API returns 429 (Too Many Requests)
  • Internal rate limits are exceeded
  • Service quota limits are reached
  • Need to throttle job processing
  • Protecting downstream services
Don’t use RateLimitError for:
  • Validation errors - use UnrecoverableError
  • Temporary delays - use DelayedError
  • Normal retry logic - use job attempts
  • Permanent failures - throw regular Error

Build docs developers (and LLMs) love