Overview
JobProgress represents the progress of a job. It can be either a number (percentage) or an object with custom progress data.Type Definition
type JobProgress = number | object
Usage
Number Progress
Use a number (typically 0-100) to represent percentage completion:const processor = async (job: Job) => {
const total = job.data.items.length;
for (let i = 0; i < total; i++) {
await processItem(job.data.items[i]);
// Update progress as percentage
const percentComplete = ((i + 1) / total) * 100;
await job.updateProgress(percentComplete);
}
return { processed: total };
};
Object Progress
Use an object for more detailed progress information:const processor = async (job: Job) => {
const steps = ['download', 'process', 'upload'];
for (let i = 0; i < steps.length; i++) {
await job.updateProgress({
step: steps[i],
stepNumber: i + 1,
totalSteps: steps.length,
percentComplete: ((i + 1) / steps.length) * 100,
});
await performStep(steps[i], job.data);
}
return { completed: true };
};
Examples
Simple Percentage
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job) => {
await job.updateProgress(0);
await step1();
await job.updateProgress(33);
await step2();
await job.updateProgress(66);
await step3();
await job.updateProgress(100);
return { done: true };
});
Detailed Progress Object
interface ProcessingProgress {
currentFile: string;
filesProcessed: number;
totalFiles: number;
percentComplete: number;
errors: string[];
}
const worker = new Worker('fileProcessor', async (job) => {
const files = job.data.files;
const errors: string[] = [];
for (let i = 0; i < files.length; i++) {
const progress: ProcessingProgress = {
currentFile: files[i],
filesProcessed: i,
totalFiles: files.length,
percentComplete: (i / files.length) * 100,
errors,
};
await job.updateProgress(progress);
try {
await processFile(files[i]);
} catch (error) {
errors.push(`${files[i]}: ${error.message}`);
}
}
// Final progress
await job.updateProgress({
currentFile: null,
filesProcessed: files.length,
totalFiles: files.length,
percentComplete: 100,
errors,
});
return { processed: files.length, errors: errors.length };
});
Multi-Stage Progress
interface StageProgress {
stage: string;
stageNumber: number;
totalStages: number;
stageProgress: number;
overallProgress: number;
details: string;
}
const worker = new Worker('complexTask', async (job) => {
const stages = [
{ name: 'validation', weight: 0.1 },
{ name: 'processing', weight: 0.7 },
{ name: 'finalization', weight: 0.2 },
];
let overallProgress = 0;
for (let i = 0; i < stages.length; i++) {
const stage = stages[i];
const progress: StageProgress = {
stage: stage.name,
stageNumber: i + 1,
totalStages: stages.length,
stageProgress: 0,
overallProgress,
details: `Starting ${stage.name}`,
};
await job.updateProgress(progress);
// Simulate stage processing with sub-progress
for (let j = 0; j <= 100; j += 10) {
await new Promise(resolve => setTimeout(resolve, 100));
progress.stageProgress = j;
progress.overallProgress = overallProgress + (j / 100) * stage.weight * 100;
progress.details = `${stage.name}: ${j}% complete`;
await job.updateProgress(progress);
}
overallProgress += stage.weight * 100;
}
return { completed: true };
});
Listening to Progress
Worker Events
const worker = new Worker('myQueue', processor);
worker.on('progress', (job, progress) => {
if (typeof progress === 'number') {
console.log(`Job ${job.id}: ${progress}%`);
} else {
console.log(`Job ${job.id}:`, progress);
}
});
QueueEvents
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('myQueue');
queueEvents.on('progress', ({ jobId, data }) => {
if (typeof data === 'number') {
console.log(`Job ${jobId}: ${data}%`);
} else {
console.log(`Job ${jobId}:`, JSON.stringify(data, null, 2));
}
});
Queue Instance
import { Queue } from 'bullmq';
const queue = new Queue('myQueue');
queue.on('progress', (job, progress) => {
console.log(`Progress update for ${job.id}:`, progress);
});
Best Practices
1. Update Progress Periodically
Don’t update too frequently:const processor = async (job: Job) => {
const items = job.data.items;
let lastUpdate = Date.now();
const UPDATE_INTERVAL = 1000; // 1 second
for (let i = 0; i < items.length; i++) {
await processItem(items[i]);
// Only update every second
const now = Date.now();
if (now - lastUpdate > UPDATE_INTERVAL) {
await job.updateProgress((i / items.length) * 100);
lastUpdate = now;
}
}
// Final update
await job.updateProgress(100);
return { processed: items.length };
};
2. Include Meaningful Information
interface ProgressInfo {
status: string;
current: number;
total: number;
percent: number;
eta?: number; // Estimated time remaining
message?: string;
}
const processor = async (job: Job) => {
const startTime = Date.now();
const total = job.data.items.length;
for (let i = 0; i < total; i++) {
await processItem(job.data.items[i]);
const elapsed = Date.now() - startTime;
const rate = i / elapsed; // items per ms
const remaining = total - i;
const eta = remaining / rate;
const progress: ProgressInfo = {
status: 'processing',
current: i + 1,
total,
percent: ((i + 1) / total) * 100,
eta: Math.round(eta),
message: `Processed ${i + 1} of ${total} items`,
};
await job.updateProgress(progress);
}
return { processed: total };
};
3. Handle Progress Errors Gracefully
const processor = async (job: Job) => {
try {
await job.updateProgress({ status: 'starting' });
} catch (error) {
console.warn('Failed to update progress:', error);
// Continue processing even if progress update fails
}
// ... rest of processing
};
Related
- Job - Job class with updateProgress method
- Worker - Worker class that emits progress events
- QueueEvents - Listen to progress events
