Overview
Processor is the async function type that receives jobs and processes them. It’s the core of a Worker’s job processing logic.
Type Definition
type Processor<T = any, R = any, N extends string = string> = (
job: Job<T, R, N>,
token?: string,
signal?: AbortSignal
) => Promise<R>
Parameters
job
token
Worker token for the job (used internally for locking)
signal
Abort signal for job cancellation
Return Value
The result value that will be stored as the job’s return value
Examples
Basic Processor
import { Worker, Job } from 'bullmq';
const processor = async (job: Job) => {
console.log('Processing job', job.id);
console.log('Job data:', job.data);
// Do some work
await someAsyncOperation(job.data);
return { success: true };
};
const worker = new Worker('myQueue', processor, {
connection: { host: 'localhost', port: 6379 },
});
Typed Processor
interface EmailData {
to: string;
subject: string;
body: string;
}
interface EmailResult {
messageId: string;
sent: boolean;
}
const emailProcessor = async (
job: Job<EmailData, EmailResult>
): Promise<EmailResult> => {
const { to, subject, body } = job.data;
// Send email
const messageId = await sendEmail(to, subject, body);
return {
messageId,
sent: true,
};
};
const worker = new Worker<EmailData, EmailResult>(
'emails',
emailProcessor,
{
connection: { host: 'localhost', port: 6379 },
}
);
With Progress Updates
const processor = async (job: Job) => {
const items = job.data.items;
const total = items.length;
for (let i = 0; i < total; i++) {
await processItem(items[i]);
// Update progress
await job.updateProgress((i + 1) / total * 100);
}
return { processed: total };
};
With Logging
const processor = async (job: Job) => {
await job.log('Starting job processing');
try {
const result = await doSomeWork(job.data);
await job.log('Job completed successfully');
return result;
} catch (error) {
await job.log(`Error: ${error.message}`);
throw error;
}
};
With Abort Signal
const processor = async (
job: Job,
token?: string,
signal?: AbortSignal
) => {
// Check if job was cancelled
if (signal?.aborted) {
throw new Error('Job was cancelled');
}
// Long-running operation that respects abort signal
const result = await fetch('https://api.example.com/data', {
signal, // Pass abort signal to fetch
});
return result.json();
};
const worker = new Worker('myQueue', processor, {
connection: { host: 'localhost', port: 6379 },
});
// Cancel a specific job
worker.cancelJob('job-id', 'User requested cancellation');
Error Handling
const processor = async (job: Job) => {
try {
return await riskyOperation(job.data);
} catch (error) {
if (error.code === 'RATE_LIMIT') {
// Throw rate limit error to delay retry
throw Worker.RateLimitError();
}
if (error.code === 'VALIDATION_ERROR') {
// Throw unrecoverable error to fail immediately
throw new UnrecoverableError(error.message);
}
// Re-throw other errors for normal retry behavior
throw error;
}
};
Accessing Job Properties
const processor = async (job: Job) => {
console.log('Job ID:', job.id);
console.log('Job name:', job.name);
console.log('Attempt:', job.attemptsMade);
console.log('Max attempts:', job.opts.attempts);
console.log('Data:', job.data);
console.log('Timestamp:', job.timestamp);
return { processed: true };
};
Parent-Child Jobs
const parentProcessor = async (job: Job) => {
// Access children results
const childrenValues = await job.getChildrenValues();
console.log('Children results:', childrenValues);
// {
// 'bull:childQueue:child-1': { result: 'data' },
// 'bull:childQueue:child-2': { result: 'data' }
// }
// Process based on children results
const total = Object.values(childrenValues).reduce(
(sum, val: any) => sum + val.count,
0
);
return { totalFromChildren: total };
};
Sandboxed Processors
Processors can also be defined in separate files:
// processor.ts
import { Job } from 'bullmq';
module.exports = async (job: Job) => {
// Process job
return { processed: true };
};
// main.ts
import { Worker } from 'bullmq';
const worker = new Worker(
'myQueue',
'./processor.js', // Path to processor file
{
connection: { host: 'localhost', port: 6379 },
useWorkerThreads: true,
}
);
Best Practices
1. Keep Processors Idempotent
const processor = async (job: Job) => {
// Check if already processed
const existing = await db.findProcessed(job.id);
if (existing) {
return existing.result;
}
// Process and save result
const result = await doWork(job.data);
await db.saveProcessed(job.id, result);
return result;
};
2. Use Proper Error Types
import { UnrecoverableError } from 'bullmq';
const processor = async (job: Job) => {
if (!job.data.requiredField) {
// Don't retry validation errors
throw new UnrecoverableError('Missing required field');
}
return await processData(job.data);
};
3. Handle Long-Running Operations
const processor = async (job: Job) => {
const chunks = splitIntoChunks(job.data.items);
for (const chunk of chunks) {
await processChunk(chunk);
// Update progress periodically
await job.updateProgress(
(chunks.indexOf(chunk) + 1) / chunks.length * 100
);
}
return { chunksProcessed: chunks.length };
};
- Job - The job class passed to processors
- Worker - The worker class that runs processors