Skip to main content

Quick Start

Add Jobs to a Queue

The simplest way to add jobs is using the stateless API:
# Add a job
{:ok, job} = BullMQ.Queue.add("emails", "send-welcome", %{
  to: "[email protected]",
  template: "welcome"
}, connection: :my_redis)

# Add a delayed job (delay in milliseconds)
{:ok, job} = BullMQ.Queue.add("emails", "reminder", %{message: "Hello!"},
  connection: :my_redis,
  delay: 60_000  # 1 minute
)

# Add a prioritized job (lower number = higher priority)
{:ok, job} = BullMQ.Queue.add("emails", "urgent", %{},
  connection: :my_redis,
  priority: 1
)

Process Jobs with a Worker

Create a processor function and start a worker:
defmodule MyApp.EmailWorker do
  def process(%BullMQ.Job{name: "send-welcome", data: data}) do
    MyApp.Mailer.send_welcome(data["to"], data["template"])
    {:ok, %{sent: true}}
  end

  def process(%BullMQ.Job{name: name}) do
    {:error, "Unknown job type: #{name}"}
  end
end

# Start a worker
{:ok, worker} = BullMQ.Worker.start_link(
  queue: "emails",
  connection: :my_redis,
  processor: &MyApp.EmailWorker.process/1,
  concurrency: 5
)

Adding to Supervision Tree

For production use, add workers to your application’s supervision tree:
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # Redis connection
      {Redix, name: :my_redis, host: "localhost", port: 6379},

      # BullMQ Worker
      {BullMQ.Worker,
        queue: "emails",
        connection: :my_redis,
        processor: &MyApp.EmailWorker.process/1,
        concurrency: 5
      }
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Job Options

Basic Options

BullMQ.Queue.add("tasks", "process-data", %{data: "..."},
  connection: :my_redis,
  job_id: "custom-id-123",      # Custom job ID
  priority: 1,                   # Lower = higher priority
  delay: 60_000,                 # Delay 60 seconds
  attempts: 5,                   # Retry up to 5 times
  backoff: %{
    type: "exponential",
    delay: 1000
  },
  remove_on_complete: true,      # Auto-remove on success
  remove_on_fail: 100            # Keep last 100 failed jobs
)

Job Deduplication

# Prevent duplicate jobs
BullMQ.Queue.add("tasks", "send-notification", %{user_id: 123},
  connection: :my_redis,
  deduplication: %{
    id: "notify-user-123",
    ttl: 60_000  # Deduplicate for 60 seconds
  }
)

Worker Configuration

Worker with Event Callbacks

{:ok, worker} = BullMQ.Worker.start_link(
  queue: "tasks",
  connection: :my_redis,
  processor: &process/1,
  concurrency: 10,
  on_completed: fn job, result ->
    IO.puts("Job #{job.id} completed: #{inspect(result)}")
  end,
  on_failed: fn job, reason ->
    IO.puts("Job #{job.id} failed: #{reason}")
  end,
  on_active: fn job ->
    IO.puts("Job #{job.id} started")
  end,
  on_stalled: fn job_id ->
    IO.puts("Job #{job_id} stalled")
  end
)

Rate Limiting

{:ok, worker} = BullMQ.Worker.start_link(
  queue: "api-calls",
  connection: :my_redis,
  processor: &process/1,
  limiter: %{
    max: 100,        # Max jobs
    duration: 60_000  # Per 60 seconds (100 jobs/minute)
  }
)

Job Processing

Basic Processor

def process(%BullMQ.Job{data: data} = job) do
  # Do work
  result = do_some_work(data)
  
  # Return success
  {:ok, result}
end

With Progress Updates

def process(%BullMQ.Job{} = job) do
  total = 100
  
  Enum.each(1..total, fn i ->
    do_work_step(i)
    
    # Update progress (0-100)
    BullMQ.Worker.update_progress(job, i)
  end)
  
  {:ok, "completed"}
end

Error Handling

def process(%BullMQ.Job{data: data}) do
  case risky_operation(data) do
    {:ok, result} -> {:ok, result}
    {:error, :temporary} -> {:error, "Temporary failure"}  # Will retry
    {:error, :invalid} -> {:error, :unrecoverable}  # Won't retry
  end
end

Queue Events

Subscribe to queue-level events using QueueEvents:
# Start QueueEvents
{:ok, events} = BullMQ.QueueEvents.start_link(
  queue: "tasks",
  connection: :my_redis
)

# Subscribe to events
BullMQ.QueueEvents.subscribe(events)

# Handle events
receive do
  {:bullmq_event, :completed, %{"jobId" => id}} ->
    IO.puts("Job #{id} completed!")
  
  {:bullmq_event, :failed, %{"jobId" => id, "failedReason" => reason}} ->
    IO.puts("Job #{id} failed: #{reason}")
  
  {:bullmq_event, :progress, %{"jobId" => id, "data" => progress}} ->
    IO.puts("Job #{id} progress: #{progress}%")
end

Job Schedulers (Repeatable Jobs)

Cron-based Scheduling

# Run every hour
{:ok, job} = BullMQ.JobScheduler.upsert(
  :my_redis,
  "maintenance",
  "cleanup",
  %{pattern: "0 * * * *"},  # Cron pattern
  "cleanup-job",
  %{type: "hourly"},
  prefix: "bull"
)

Interval-based Scheduling

# Run every minute
{:ok, job} = BullMQ.JobScheduler.upsert(
  :my_redis,
  "heartbeats",
  "ping",
  %{every: 60_000},  # Every 60 seconds
  "heartbeat",
  %{},
  prefix: "bull"
)

Managing Schedulers

# List all schedulers
{:ok, schedulers} = BullMQ.JobScheduler.list(
  :my_redis,
  "maintenance",
  prefix: "bull"
)

# Remove a scheduler
{:ok, removed} = BullMQ.JobScheduler.remove(
  :my_redis,
  "maintenance",
  "cleanup",
  prefix: "bull"
)

Queue Operations

Get Queue Information

# Get job counts
{:ok, counts} = BullMQ.Queue.get_counts("emails", connection: :my_redis)
# => %{waiting: 10, active: 2, delayed: 5, completed: 100, failed: 3, ...}

# Get jobs in a specific state
{:ok, jobs} = BullMQ.Queue.get_jobs("emails", [:waiting, :delayed],
  connection: :my_redis,
  start: 0,
  end: 9
)

# Get a specific job
{:ok, job} = BullMQ.Queue.get_job("emails", "job-id-123",
  connection: :my_redis
)

# Get job state
{:ok, state} = BullMQ.Queue.get_job_state("emails", "job-id-123",
  connection: :my_redis
)
# => :waiting | :active | :delayed | :completed | :failed

Pause and Resume

# Pause the queue
:ok = BullMQ.Queue.pause("emails", connection: :my_redis)

# Resume the queue
:ok = BullMQ.Queue.resume("emails", connection: :my_redis)

# Check if paused
{:ok, is_paused} = BullMQ.Queue.paused?("emails", connection: :my_redis)

Clean Up Jobs

# Drain the queue (remove all waiting jobs)
:ok = BullMQ.Queue.drain("emails", connection: :my_redis)

# Remove a specific job
:ok = BullMQ.Queue.remove_job("emails", "job-id-123",
  connection: :my_redis
)

# Retry a failed job
:ok = BullMQ.Queue.retry_job("emails", "job-id-123",
  connection: :my_redis
)

Parent-Child Jobs (Flows)

Create job dependencies using FlowProducer:
{:ok, flow} = BullMQ.FlowProducer.start_link(
  connection: :my_redis,
  prefix: "bull"
)

# Create a flow with parent and children
{:ok, flow_job} = BullMQ.FlowProducer.add(flow, %{
  name: "process-order",
  queue_name: "orders",
  data: %{order_id: 123},
  children: [
    %{
      name: "send-email",
      queue_name: "emails",
      data: %{order_id: 123}
    },
    %{
      name: "update-inventory",
      queue_name: "inventory",
      data: %{order_id: 123}
    }
  ]
})

Graceful Shutdown

# Close worker and wait for active jobs to finish
:ok = BullMQ.Worker.close(worker)

# Force close without waiting
:ok = BullMQ.Worker.close(worker, force: true)

Interoperability

Jobs added in Elixir can be processed by workers in other languages: Node.js Worker:
import { Worker } from 'bullmq';

const worker = new Worker('emails', async job => {
  console.log('Processing:', job.data);
  return { success: true };
});
Python Worker:
from bullmq import Worker

async def process(job, job_token):
    print(f"Processing: {job.data}")
    return {"success": True}

worker = Worker("emails", process)

Best Practices

  1. Use supervision trees - Always supervise workers for automatic restarts
  2. Set appropriate concurrency - Balance between throughput and resource usage
  3. Use named connections - Makes it easier to manage multiple Redis instances
  4. Handle errors gracefully - Return {:error, reason} for retries, :error or raise for failures
  5. Monitor with Telemetry - Set up telemetry handlers for observability
  6. Use rate limiting - Prevent overwhelming external services
  7. Clean up old jobs - Use remove_on_complete and remove_on_fail options

View Changelog

See what’s new in the latest version

Build docs developers (and LLMs) love