Skip to main content

Creating a Queue

Queues are used to add jobs that will be processed by workers:
from bullmq import Queue

queue = Queue("myQueue")

# Add a job with data
await queue.add("myJob", {"foo": "bar"})

# Close when done
await queue.close()

Adding Jobs

Basic Job

from bullmq import Queue

queue = Queue("emailQueue")

# Add a simple job
job = await queue.add("sendEmail", {
    "to": "[email protected]",
    "subject": "Welcome!",
    "body": "Thanks for signing up."
})

print(f"Job added with ID: {job.id}")

Delayed Job

# Process this job after 60 seconds
job = await queue.add(
    "reminder",
    {"message": "Don't forget!"},
    {"delay": 60000}  # Delay in milliseconds
)

Job with Options

job = await queue.add(
    "processData",
    {"data": "..."},
    {
        "attempts": 3,  # Retry up to 3 times
        "backoff": {
            "type": "exponential",
            "delay": 1000
        },
        "removeOnComplete": True,
        "removeOnFail": 100  # Keep last 100 failed jobs
    }
)

Job Deduplication

Prevent duplicate jobs from being added to the queue:

Simple Deduplication

# Deduplicates until job completes or fails
job = await queue.add(
    "paint",
    {"color": "white"},
    {
        "deduplication": {
            "id": "custom-dedup-id"
        }
    }
)

Throttle Mode

# Deduplicates for a specific time window
job = await queue.add(
    "paint",
    {"color": "white"},
    {
        "deduplication": {
            "id": "custom-dedup-id",
            "ttl": 5000  # 5 seconds
        }
    }
)

Debounce Mode

# Replaces pending job with latest data
job = await queue.add(
    "paint",
    {"color": "white"},
    {
        "deduplication": {
            "id": "custom-dedup-id",
            "ttl": 5000,
            "extend": True,  # Extend TTL on each duplicate
            "replace": True  # Replace job data with latest
        },
        "delay": 5000  # Must be delayed for replace to work
    }
)

Creating a Worker

Workers process jobs from the queue using a processor function:
from bullmq import Worker
import asyncio
import signal

async def process(job, job_token):
    # job.data contains the data passed to queue.add()
    print(f"Processing job {job.id} with data: {job.data}")
    
    # Do some work
    result = await do_something_async(job.data)
    
    # Return the result
    return result

async def main():
    # Create shutdown event
    shutdown_event = asyncio.Event()

    def signal_handler(signal, frame):
        print("Signal received, shutting down.")
        shutdown_event.set()

    # Handle shutdown signals
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    # Create worker
    worker = Worker(
        "myQueue",
        process,
        {"connection": "rediss://<user>:<password>@<host>:<port>"}
    )

    # Wait for shutdown signal
    await shutdown_event.wait()

    # Clean up
    print("Cleaning up worker...")
    await worker.close()
    print("Worker shut down successfully.")

if __name__ == "__main__":
    asyncio.run(main())

Worker Options

worker = Worker(
    "myQueue",
    process_function,
    {
        "connection": "redis://localhost:6379",
        "concurrency": 5,  # Process up to 5 jobs concurrently
        "maxStalledCount": 1,  # Max times a job can be stalled
        "stalledInterval": 30000,  # Check for stalled jobs every 30s
        "lockDuration": 30000,  # Lock duration in ms
    }
)

Job Progress

Update job progress from within the processor:
async def process(job, job_token):
    total_steps = 100
    
    for i in range(total_steps):
        # Do work
        await do_work_step(i)
        
        # Update progress (0-100)
        await job.update_progress((i + 1) / total_steps * 100)
    
    return {"completed": True}

Error Handling

Recoverable Errors

async def process(job, job_token):
    try:
        result = await risky_operation(job.data)
        return result
    except TemporaryError as e:
        # Job will be retried based on attempts configuration
        raise e

Unrecoverable Errors

from bullmq.custom_errors import UnrecoverableError

async def process(job, job_token):
    if not is_valid(job.data):
        # Job will immediately fail without retries
        raise UnrecoverableError("Invalid job data")
    
    return await process_data(job.data)

Connection Options

Using Connection String

from bullmq import Queue, Worker

# For Queue
queue = Queue(
    "myQueue",
    {"connection": "rediss://<user>:<password>@<host>:<port>"}
)

# For Worker
worker = Worker(
    "myQueue",
    process,
    {"connection": "rediss://<user>:<password>@<host>:<port>"}
)

Local Redis

# Connect to local Redis (default: localhost:6379)
queue = Queue("myQueue")
worker = Worker("myQueue", process)

Complete Example

Here’s a complete example combining queue and worker:
import asyncio
import signal
from bullmq import Queue, Worker

# Processor function
async def send_email(job, job_token):
    print(f"Sending email to {job.data['to']}")
    await asyncio.sleep(1)  # Simulate sending
    return {"sent": True, "messageId": "12345"}

async def add_jobs():
    """Add some jobs to the queue"""
    queue = Queue("emailQueue")
    
    # Add jobs
    await queue.add("sendEmail", {"to": "[email protected]"})
    await queue.add("sendEmail", {"to": "[email protected]"})
    await queue.add("sendEmail", {"to": "[email protected]"})
    
    await queue.close()
    print("Jobs added!")

async def run_worker():
    """Start the worker"""
    shutdown_event = asyncio.Event()

    def signal_handler(sig, frame):
        shutdown_event.set()

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    worker = Worker("emailQueue", send_email)
    print("Worker started. Press Ctrl+C to stop.")

    await shutdown_event.wait()
    await worker.close()
    print("Worker stopped.")

if __name__ == "__main__":
    # Run add_jobs() to add jobs, or run_worker() to process them
    asyncio.run(add_jobs())
    # asyncio.run(run_worker())

Interoperability

Jobs added with Python can be processed by workers in other languages: Node.js Worker:
import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async job => {
  console.log('Processing:', job.data);
  return { success: true };
});
Elixir Worker:
BullMQ.Worker.start_link(
  queue: "myQueue",
  connection: :my_redis,
  processor: fn job -> 
    IO.inspect(job.data)
    {:ok, %{success: true}}
  end
)

Best Practices

  1. Always close connections - Call await queue.close() and await worker.close() when done
  2. Use meaningful job names - Makes debugging and monitoring easier
  3. Set appropriate retry attempts - Not all jobs should retry infinitely
  4. Use deduplication - Prevent duplicate jobs when idempotency is important
  5. Handle errors gracefully - Distinguish between recoverable and unrecoverable errors
  6. Monitor job progress - Use progress updates for long-running jobs

View Changelog

See what’s new in the latest version

Build docs developers (and LLMs) love