Skip to main content

Overview

Jobs are the fundamental units of work in BullMQ. Each job contains data to be processed and metadata about its execution.
import { Job } from 'bullmq';

// Jobs are typically created via Queue.add()
const job = await queue.add('process-order', {
  orderId: '12345',
  customerId: 'user-456',
  items: ['item1', 'item2'],
});

console.log('Job ID:', job.id);
console.log('Job name:', job.name);
console.log('Job data:', job.data);

Job Properties

Core Properties

interface Job<DataType, ResultType, NameType> {
  // Identification
  id: string;                      // Unique job identifier
  name: NameType;                  // Job name/type
  queueName: string;               // Name of the queue
  queueQualifiedName: string;      // Fully qualified name with prefix
  
  // Data
  data: DataType;                  // Job payload
  returnvalue: ResultType;         // Result after completion
  
  // Status
  progress: number | object;       // Current progress (0-100 or custom object)
  attemptsMade: number;            // Number of completed attempts
  attemptsStarted: number;         // Number of times job started processing
  stalledCounter: number;          // Times job has stalled
  
  // Timing
  timestamp: number;               // Creation timestamp
  processedOn?: number;            // When job started processing
  finishedOn?: number;             // When job completed/failed
  delay: number;                   // Delay before processing (ms)
  
  // Failure
  failedReason?: string;           // Reason for failure
  stacktrace: string[];            // Error stack traces
  
  // Configuration
  opts: JobsOptions;               // Job options
  priority: number;                // Priority (0-2097152)
  
  // Processing
  token?: string;                  // Worker lock token
  processedBy?: string;            // Worker name that processed this job
  
  // Relationships
  parent?: ParentKeys;             // Parent job for flows
  parentKey?: string;              // Fully qualified parent key
}

Job States

A job progresses through various states:
type JobState = 
  | 'waiting'           // In queue, waiting to be processed
  | 'active'            // Being processed by a worker
  | 'completed'         // Successfully completed
  | 'failed'            // Failed after all attempts
  | 'delayed'           // Scheduled for future processing
  | 'paused'            // Queue is paused
  | 'waiting-children'  // Waiting for child jobs to complete
  | 'prioritized';      // Waiting with priority

Check Job State

const state = await job.getState();
console.log('Job state:', state);

// Or use specific methods
if (await job.isWaiting()) {
  console.log('Job is waiting');
}
if (await job.isActive()) {
  console.log('Job is being processed');
}
if (await job.isCompleted()) {
  console.log('Job completed successfully');
}
if (await job.isFailed()) {
  console.log('Job failed');
}
if (await job.isDelayed()) {
  console.log('Job is delayed');
}

Job Lifecycle

Creation

import { Queue } from 'bullmq';

const queue = new Queue('myQueue');

// Create a job
const job = await queue.add('task-name', 
  { userId: '123', action: 'notify' },
  {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: true,
  }
);

Retrieval

// Get job by ID
const job = await Job.fromId(queue, 'job-123');

if (job) {
  console.log('Found job:', job.data);
} else {
  console.log('Job not found');
}

Processing

import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  console.log('Processing:', job.id);
  
  // Update progress
  await job.updateProgress(50);
  
  // Do work
  const result = await doWork(job.data);
  
  // Return result (automatically saved)
  return result;
});

Job Manipulation

Update Job Data

// Update job data
await job.updateData({
  userId: '123',
  action: 'updated-action',
  extra: 'new-field',
});

Update Progress

// Update progress as percentage
await job.updateProgress(75);

Change Delay

Reschedule a delayed job:
// Change delay to 10 seconds from now
await job.changeDelay(10000);

Change Priority

// Set higher priority (lower number = higher priority)
await job.changePriority({
  priority: 1,
  lifo: false,
});

Remove Job

// Remove job and its children
await job.remove();

// Remove job but keep children
await job.remove({ removeChildren: false });

Retry Job

// Retry a failed job
await job.retry();

// Retry a completed job
await job.retry('completed');

// Retry with reset attempts
await job.retry('failed', {
  resetAttemptsMade: true,
  resetAttemptsStarted: true,
});

Promote Delayed Job

Move a delayed job to waiting immediately:
await job.promote();

Job Logging

Add Logs

// Inside a processor
const worker = new Worker('myQueue', async (job) => {
  await job.log('Started processing');
  
  await job.log('Processing step 1');
  await processStep1(job.data);
  
  await job.log('Processing step 2');
  await processStep2(job.data);
  
  await job.log('Completed successfully');
  
  return { status: 'done' };
});

Retrieve Logs

const logs = await queue.getJobLogs('job-123');
console.log('Job logs:', logs);

// Get logs with pagination
const logsPage = await queue.getJobLogs('job-123', 0, 10);

Clear Logs

// Clear all logs
await job.clearLogs();

// Keep last 10 log entries
await job.clearLogs(10);

Job Dependencies (Flows)

Parent-Child Relationships

import { Queue, FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection: { ... } });

// Create a flow with parent and children
const flowJob = await flow.add({
  name: 'parent-job',
  queueName: 'parentQueue',
  data: { type: 'aggregate' },
  children: [
    {
      name: 'child-job-1',
      queueName: 'childQueue',
      data: { task: 'subtask-1' },
    },
    {
      name: 'child-job-2',
      queueName: 'childQueue',
      data: { task: 'subtask-2' },
    },
  ],
});

Get Child Dependencies

// Get all children dependencies
const deps = await job.getDependencies();

console.log('Processed children:', deps.processed);
console.log('Unprocessed children:', deps.unprocessed);
console.log('Failed children:', deps.failed);

// Get children values (results)
const childValues = await job.getChildrenValues();
console.log('Child results:', childValues);

// Get dependency counts
const counts = await job.getDependenciesCount();
console.log('Processed count:', counts.processed);
console.log('Failed count:', counts.failed);

Remove Child Dependency

// Remove relationship to parent
const removed = await job.removeChildDependency();
if (removed) {
  console.log('Child dependency removed');
}

Advanced Job Options

Backoff Strategies

await queue.add('task', { data: 'value' }, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000,  // 1s, 2s, 4s, 8s, 16s
  },
});

Auto-Removal

await queue.add('task', { data: 'value' }, {
  removeOnComplete: {
    count: 100,  // Keep last 100 completed jobs
  },
  removeOnFail: {
    count: 50,   // Keep last 50 failed jobs
  },
});

Stack Trace Limit

Control how many stack traces are kept:
await queue.add('task', { data: 'value' }, {
  attempts: 5,
  stackTraceLimit: 3,  // Keep last 3 stack traces
});

// Disable stack traces
await queue.add('task', { data: 'value' }, {
  attempts: 5,
  stackTraceLimit: 0,  // No stack traces saved
});

Job Deduplication

Prevent duplicate jobs from being added:
// Add job with deduplication ID
await queue.add('process-order', 
  { orderId: '12345' },
  {
    deduplication: {
      id: 'order-12345',  // Only one job with this ID can exist
    },
  }
);

// Trying to add again will be rejected
await queue.add('process-order',
  { orderId: '12345' },
  {
    deduplication: {
      id: 'order-12345',
    },
  }
);
// This will not create a new job

// Manually remove deduplication key
await job.removeDeduplicationKey();

Job Serialization

Export Job as JSON

const jobJson = job.asJSON();
console.log(JSON.stringify(jobJson, null, 2));

Create Job from JSON

const job = Job.fromJSON(queue, jobJson, 'job-123');

Waiting for Completion

Wait for a job to complete from outside a worker:
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('myQueue', {
  connection: { host: 'localhost', port: 6379 },
});

const job = await queue.add('task', { data: 'value' });

try {
  // Wait for job to complete (with 30 second timeout)
  const result = await job.waitUntilFinished(queueEvents, 30000);
  console.log('Job completed with result:', result);
} catch (err) {
  console.error('Job failed:', err.message);
}

await queueEvents.close();

Error Handling

Unrecoverable Errors

Mark a job as unrecoverable (won’t retry):
import { UnrecoverableError } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  if (job.data.invalidData) {
    // This job will not be retried
    throw new UnrecoverableError('Invalid data format');
  }
  
  return { success: true };
});

TypeScript Support

Fully typed jobs:
interface OrderData {
  orderId: string;
  items: string[];
  total: number;
}

interface OrderResult {
  processed: boolean;
  confirmationId: string;
}

type OrderJobName = 'create-order' | 'update-order' | 'cancel-order';

// Typed job creation
const job = await queue.add<OrderData, OrderResult, OrderJobName>(
  'create-order',
  {
    orderId: '12345',
    items: ['item1', 'item2'],
    total: 99.99,
  }
);

// job.data is typed as OrderData
// job.name is typed as OrderJobName

Best Practices

Set Job IDs

Use custom job IDs for idempotency and easier tracking.

Update Progress

Report progress for long-running jobs to provide visibility.

Use Deduplication

Prevent duplicate jobs with deduplication IDs.

Clean Up Jobs

Use removeOnComplete and removeOnFail to manage memory.

Log Important Events

Add logs at key checkpoints for debugging.

Handle Errors Gracefully

Use UnrecoverableError for jobs that shouldn’t retry.

Next Steps

Workers

Learn how workers process jobs

Events

Monitor job lifecycle with events

Queues

Understand queue operations

Build docs developers (and LLMs) love