Skip to main content

Creating a Queue

<?php

use BullMQ\Queue;

// Create a queue with default connection (localhost:6379)
$queue = new Queue('my-queue');

// Or with custom Redis connection
$queue = new Queue('my-queue', [
    'connection' => [
        'host' => 'redis.example.com',
        'port' => 6379,
        'password' => 'your-password',
    ],
]);

Adding Jobs

Basic Job

use BullMQ\Queue;

$queue = new Queue('email-queue');

$job = $queue->add('send-email', [
    'to' => '[email protected]',
    'subject' => 'Welcome!',
    'body' => 'Thanks for signing up.',
]);

echo "Job added with ID: " . $job->id . "\n";

Delayed Job

// Process this job after 60 seconds
$job = $queue->add('reminder', $data, [
    'delay' => 60000, // Delay in milliseconds
]);

Priority Job

// Lower number = higher priority
$job = $queue->add('urgent-task', $data, [
    'priority' => 1,
]);

Custom Job ID

$job = $queue->add('process-order', $orderData, [
    'jobId' => 'order-' . $orderId,
]);

Job with Retry Settings

$job = $queue->add('flaky-operation', $data, [
    'attempts' => 3,
    'backoff' => [
        'type' => 'exponential',
        'delay' => 1000,
    ],
]);

Job with Removal Policy

$job = $queue->add('task', $data, [
    'removeOnComplete' => true,
    'removeOnFail' => 100, // Keep last 100 failed jobs
]);

LIFO (Last In, First Out)

// Process newest jobs first
$job = $queue->add('task', $data, [
    'lifo' => true,
]);

Adding Multiple Jobs

$jobs = $queue->addBulk([
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
    ['name' => 'email', 'data' => ['to' => '[email protected]']],
]);

echo "Added " . count($jobs) . " jobs\n";

Job Options Reference

OptionTypeDescription
jobIdstringCustom job ID
delayintDelay in milliseconds before processing
priorityintPriority (lower = higher priority)
attemptsintNumber of retry attempts
backoffarray/intBackoff strategy for retries
lifoboolProcess newest jobs first
removeOnCompletebool/int/arrayRemove job on completion
removeOnFailbool/int/arrayRemove job on failure
timestampintJob creation timestamp (ms)
parentarrayParent job reference for flows

Getting Job Information

Get a Specific Job

$job = $queue->getJob('job-id');
if ($job) {
    echo "Job name: " . $job->name . "\n";
    echo "Job data: " . json_encode($job->data) . "\n";
    echo "Job state: " . $queue->getJobState($job->id) . "\n";
}

Get Jobs by State

$waitingJobs = $queue->getWaiting(0, 10);
$activeJobs = $queue->getActive(0, 10);
$delayedJobs = $queue->getDelayed(0, 10);
$completedJobs = $queue->getCompleted(0, 10);
$failedJobs = $queue->getFailed(0, 10);

Get Job Counts

$counts = $queue->getJobCounts();
echo "Waiting: " . $counts['waiting'] . "\n";
echo "Active: " . $counts['active'] . "\n";
echo "Delayed: " . $counts['delayed'] . "\n";
echo "Completed: " . $counts['completed'] . "\n";
echo "Failed: " . $counts['failed'] . "\n";

Get Count by Type

$pending = $queue->getJobCountByTypes('waiting', 'delayed');
echo "Pending jobs: " . $pending . "\n";

Get Counts by Priority

$priorityCounts = $queue->getCountsPerPriority([0, 1, 2, 3]);
echo "Priority 0: " . $priorityCounts[0] . "\n";
echo "Priority 1: " . $priorityCounts[1] . "\n";

Queue Management

Pause and Resume

// Pause the queue
$queue->pause();
echo "Queue paused: " . ($queue->isPaused() ? 'yes' : 'no') . "\n";

// Resume the queue
$queue->resume();

Remove a Job

$removed = $queue->remove('job-id');
if ($removed) {
    echo "Job removed\n";
}

Clean Old Jobs

// Clean completed jobs older than 1 hour
$cleaned = $queue->clean(
    grace: 3600000,  // 1 hour in milliseconds
    limit: 100,
    type: 'completed'
);

echo "Cleaned $cleaned jobs\n";

Retry Failed Jobs

$queue->retryJobs([
    'count' => 100,      // Max jobs to retry per iteration
    'state' => 'failed', // State to retry from: 'failed' or 'completed'
    'timestamp' => time() * 1000, // Only retry jobs before this timestamp
]);

Promote Delayed Jobs

// Move delayed jobs to waiting state
$queue->promoteJobs(['count' => 100]);

Drain the Queue

// Remove all waiting jobs
$queue->drain();

Obliterate the Queue

// Remove all queue data (use with caution!)
$queue->obliterate(['force' => true]);

Parent-Child Jobs (Flows)

// Add a child job with a parent reference
$childJob = $queue->add('child-task', $childData, [
    'parent' => [
        'id' => 'parent-job-id',
        'queue' => 'bull:parent-queue',
    ],
]);

Job States

Jobs can be in one of the following states:
  • waiting - Job is waiting to be processed
  • active - Job is currently being processed
  • delayed - Job is delayed and waiting for its delay to expire
  • completed - Job has been successfully processed
  • failed - Job has failed after all retry attempts
  • paused - Job is in a paused queue
  • prioritized - Job is in the prioritized set
  • waiting-children - Parent job waiting for child jobs to complete

Connection Management

Using a Connection Array

$queue = new Queue('my-queue', [
    'connection' => [
        'host' => 'localhost',
        'port' => 6379,
        'database' => 0,
        'password' => null,
        'username' => null,
    ],
]);

Using a Redis URI

$queue = new Queue('my-queue', [
    'connection' => 'redis://user:password@localhost:6379/0',
]);

Sharing a Connection

use BullMQ\RedisConnection;

$connection = new RedisConnection([
    'host' => 'localhost',
    'port' => 6379,
]);

$queue1 = new Queue('queue-1', ['connection' => $connection]);
$queue2 = new Queue('queue-2', ['connection' => $connection]);

Custom Prefix

$queue = new Queue('my-queue', [
    'prefix' => 'myapp',
]);

Close Connection

// Close the queue connection when done
$queue->close();

Complete Example

<?php

require_once __DIR__ . '/vendor/autoload.php';

use BullMQ\Queue;

// Create queue
$queue = new Queue('email-queue', [
    'connection' => [
        'host' => 'localhost',
        'port' => 6379,
    ],
]);

// Add jobs
try {
    // Simple job
    $job1 = $queue->add('welcome-email', [
        'to' => '[email protected]',
        'name' => 'John Doe',
    ]);
    echo "Added job: {$job1->id}\n";

    // Delayed job
    $job2 = $queue->add('reminder-email', [
        'to' => '[email protected]',
        'message' => 'Complete your profile!',
    ], [
        'delay' => 3600000, // 1 hour
    ]);
    echo "Added delayed job: {$job2->id}\n";

    // Priority job
    $job3 = $queue->add('urgent-email', [
        'to' => '[email protected]',
        'message' => 'Server alert!',
    ], [
        'priority' => 1,
    ]);
    echo "Added priority job: {$job3->id}\n";

    // Bulk jobs
    $bulkJobs = $queue->addBulk([
        ['name' => 'newsletter', 'data' => ['to' => '[email protected]']],
        ['name' => 'newsletter', 'data' => ['to' => '[email protected]']],
        ['name' => 'newsletter', 'data' => ['to' => '[email protected]']],
    ]);
    echo "Added " . count($bulkJobs) . " bulk jobs\n";

    // Get queue statistics
    $counts = $queue->getJobCounts();
    echo "Queue statistics:\n";
    echo "  Waiting: {$counts['waiting']}\n";
    echo "  Active: {$counts['active']}\n";
    echo "  Delayed: {$counts['delayed']}\n";

} catch (\Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
} finally {
    // Clean up
    $queue->close();
}

Interoperability with Workers

Jobs added via PHP can be processed by workers in other languages:

Node.js Worker

import { Worker } from 'bullmq';

const worker = new Worker('email-queue', async job => {
  console.log(`Processing ${job.name}:`, job.data);
  // Send email...
  return { success: true };
});

Python Worker

from bullmq import Worker

async def process(job, job_token):
    print(f"Processing {job.name}: {job.data}")
    # Send email...
    return {"success": True}

worker = Worker("email-queue", process)

Elixir Worker

BullMQ.Worker.start_link(
  queue: "email-queue",
  connection: :my_redis,
  processor: fn job -> 
    IO.inspect({job.name, job.data})
    # Send email...
    {:ok, %{success: true}}
  end
)

Best Practices

  1. Reuse connections - Share RedisConnection instances across queues
  2. Close connections - Call $queue->close() when done
  3. Use meaningful job names - Makes debugging easier
  4. Set appropriate retry attempts - Not all jobs should retry infinitely
  5. Use custom job IDs - For idempotency and deduplication
  6. Clean old jobs - Regularly clean completed/failed jobs to save memory
  7. Handle errors gracefully - Wrap queue operations in try-catch blocks
  8. Use bulk operations - When adding multiple jobs, use addBulk()

Error Handling

try {
    $queue = new Queue('my-queue');
    $job = $queue->add('task', $data);
    echo "Job added: {$job->id}\n";
} catch (\RuntimeException $e) {
    echo "Error: " . $e->getMessage() . "\n";
    // Log error, retry, or handle gracefully
} finally {
    if (isset($queue)) {
        $queue->close();
    }
}

View Changelog

See what’s new in the latest version

Build docs developers (and LLMs) love