Skip to main content

Overview

Processor is the async function type that receives jobs and processes them. It’s the core of a Worker’s job processing logic.

Type Definition

type Processor<T = any, R = any, N extends string = string> = (
  job: Job<T, R, N>,
  token?: string,
  signal?: AbortSignal
) => Promise<R>

Parameters

job

job
Job<T, R, N>
required
The job to be processed

token

token
string
Worker token for the job (used internally for locking)

signal

signal
AbortSignal
Abort signal for job cancellation

Return Value

result
R
The result value that will be stored as the job’s return value

Examples

Basic Processor

import { Worker, Job } from 'bullmq';

const processor = async (job: Job) => {
  console.log('Processing job', job.id);
  console.log('Job data:', job.data);
  
  // Do some work
  await someAsyncOperation(job.data);
  
  return { success: true };
};

const worker = new Worker('myQueue', processor, {
  connection: { host: 'localhost', port: 6379 },
});

Typed Processor

interface EmailData {
  to: string;
  subject: string;
  body: string;
}

interface EmailResult {
  messageId: string;
  sent: boolean;
}

const emailProcessor = async (
  job: Job<EmailData, EmailResult>
): Promise<EmailResult> => {
  const { to, subject, body } = job.data;
  
  // Send email
  const messageId = await sendEmail(to, subject, body);
  
  return {
    messageId,
    sent: true,
  };
};

const worker = new Worker<EmailData, EmailResult>(
  'emails',
  emailProcessor,
  {
    connection: { host: 'localhost', port: 6379 },
  }
);

With Progress Updates

const processor = async (job: Job) => {
  const items = job.data.items;
  const total = items.length;
  
  for (let i = 0; i < total; i++) {
    await processItem(items[i]);
    
    // Update progress
    await job.updateProgress((i + 1) / total * 100);
  }
  
  return { processed: total };
};

With Logging

const processor = async (job: Job) => {
  await job.log('Starting job processing');
  
  try {
    const result = await doSomeWork(job.data);
    await job.log('Job completed successfully');
    return result;
  } catch (error) {
    await job.log(`Error: ${error.message}`);
    throw error;
  }
};

With Abort Signal

const processor = async (
  job: Job,
  token?: string,
  signal?: AbortSignal
) => {
  // Check if job was cancelled
  if (signal?.aborted) {
    throw new Error('Job was cancelled');
  }
  
  // Long-running operation that respects abort signal
  const result = await fetch('https://api.example.com/data', {
    signal,  // Pass abort signal to fetch
  });
  
  return result.json();
};

const worker = new Worker('myQueue', processor, {
  connection: { host: 'localhost', port: 6379 },
});

// Cancel a specific job
worker.cancelJob('job-id', 'User requested cancellation');

Error Handling

const processor = async (job: Job) => {
  try {
    return await riskyOperation(job.data);
  } catch (error) {
    if (error.code === 'RATE_LIMIT') {
      // Throw rate limit error to delay retry
      throw Worker.RateLimitError();
    }
    
    if (error.code === 'VALIDATION_ERROR') {
      // Throw unrecoverable error to fail immediately
      throw new UnrecoverableError(error.message);
    }
    
    // Re-throw other errors for normal retry behavior
    throw error;
  }
};

Accessing Job Properties

const processor = async (job: Job) => {
  console.log('Job ID:', job.id);
  console.log('Job name:', job.name);
  console.log('Attempt:', job.attemptsMade);
  console.log('Max attempts:', job.opts.attempts);
  console.log('Data:', job.data);
  console.log('Timestamp:', job.timestamp);
  
  return { processed: true };
};

Parent-Child Jobs

const parentProcessor = async (job: Job) => {
  // Access children results
  const childrenValues = await job.getChildrenValues();
  
  console.log('Children results:', childrenValues);
  // {
  //   'bull:childQueue:child-1': { result: 'data' },
  //   'bull:childQueue:child-2': { result: 'data' }
  // }
  
  // Process based on children results
  const total = Object.values(childrenValues).reduce(
    (sum, val: any) => sum + val.count,
    0
  );
  
  return { totalFromChildren: total };
};

Sandboxed Processors

Processors can also be defined in separate files:
// processor.ts
import { Job } from 'bullmq';

module.exports = async (job: Job) => {
  // Process job
  return { processed: true };
};
// main.ts
import { Worker } from 'bullmq';

const worker = new Worker(
  'myQueue',
  './processor.js',  // Path to processor file
  {
    connection: { host: 'localhost', port: 6379 },
    useWorkerThreads: true,
  }
);

Best Practices

1. Keep Processors Idempotent

const processor = async (job: Job) => {
  // Check if already processed
  const existing = await db.findProcessed(job.id);
  if (existing) {
    return existing.result;
  }
  
  // Process and save result
  const result = await doWork(job.data);
  await db.saveProcessed(job.id, result);
  
  return result;
};

2. Use Proper Error Types

import { UnrecoverableError } from 'bullmq';

const processor = async (job: Job) => {
  if (!job.data.requiredField) {
    // Don't retry validation errors
    throw new UnrecoverableError('Missing required field');
  }
  
  return await processData(job.data);
};

3. Handle Long-Running Operations

const processor = async (job: Job) => {
  const chunks = splitIntoChunks(job.data.items);
  
  for (const chunk of chunks) {
    await processChunk(chunk);
    
    // Update progress periodically
    await job.updateProgress(
      (chunks.indexOf(chunk) + 1) / chunks.length * 100
    );
  }
  
  return { chunksProcessed: chunks.length };
};
  • Job - The job class passed to processors
  • Worker - The worker class that runs processors

Build docs developers (and LLMs) love