Skip to main content

Overview

JobProgress represents the progress of a job. It can be either a number (percentage) or an object with custom progress data.

Type Definition

type JobProgress = number | object

Usage

Number Progress

Use a number (typically 0-100) to represent percentage completion:
const processor = async (job: Job) => {
  const total = job.data.items.length;
  
  for (let i = 0; i < total; i++) {
    await processItem(job.data.items[i]);
    
    // Update progress as percentage
    const percentComplete = ((i + 1) / total) * 100;
    await job.updateProgress(percentComplete);
  }
  
  return { processed: total };
};

Object Progress

Use an object for more detailed progress information:
const processor = async (job: Job) => {
  const steps = ['download', 'process', 'upload'];
  
  for (let i = 0; i < steps.length; i++) {
    await job.updateProgress({
      step: steps[i],
      stepNumber: i + 1,
      totalSteps: steps.length,
      percentComplete: ((i + 1) / steps.length) * 100,
    });
    
    await performStep(steps[i], job.data);
  }
  
  return { completed: true };
};

Examples

Simple Percentage

import { Worker } from 'bullmq';

const worker = new Worker('myQueue', async (job) => {
  await job.updateProgress(0);
  
  await step1();
  await job.updateProgress(33);
  
  await step2();
  await job.updateProgress(66);
  
  await step3();
  await job.updateProgress(100);
  
  return { done: true };
});

Detailed Progress Object

interface ProcessingProgress {
  currentFile: string;
  filesProcessed: number;
  totalFiles: number;
  percentComplete: number;
  errors: string[];
}

const worker = new Worker('fileProcessor', async (job) => {
  const files = job.data.files;
  const errors: string[] = [];
  
  for (let i = 0; i < files.length; i++) {
    const progress: ProcessingProgress = {
      currentFile: files[i],
      filesProcessed: i,
      totalFiles: files.length,
      percentComplete: (i / files.length) * 100,
      errors,
    };
    
    await job.updateProgress(progress);
    
    try {
      await processFile(files[i]);
    } catch (error) {
      errors.push(`${files[i]}: ${error.message}`);
    }
  }
  
  // Final progress
  await job.updateProgress({
    currentFile: null,
    filesProcessed: files.length,
    totalFiles: files.length,
    percentComplete: 100,
    errors,
  });
  
  return { processed: files.length, errors: errors.length };
});

Multi-Stage Progress

interface StageProgress {
  stage: string;
  stageNumber: number;
  totalStages: number;
  stageProgress: number;
  overallProgress: number;
  details: string;
}

const worker = new Worker('complexTask', async (job) => {
  const stages = [
    { name: 'validation', weight: 0.1 },
    { name: 'processing', weight: 0.7 },
    { name: 'finalization', weight: 0.2 },
  ];
  
  let overallProgress = 0;
  
  for (let i = 0; i < stages.length; i++) {
    const stage = stages[i];
    
    const progress: StageProgress = {
      stage: stage.name,
      stageNumber: i + 1,
      totalStages: stages.length,
      stageProgress: 0,
      overallProgress,
      details: `Starting ${stage.name}`,
    };
    
    await job.updateProgress(progress);
    
    // Simulate stage processing with sub-progress
    for (let j = 0; j <= 100; j += 10) {
      await new Promise(resolve => setTimeout(resolve, 100));
      
      progress.stageProgress = j;
      progress.overallProgress = overallProgress + (j / 100) * stage.weight * 100;
      progress.details = `${stage.name}: ${j}% complete`;
      
      await job.updateProgress(progress);
    }
    
    overallProgress += stage.weight * 100;
  }
  
  return { completed: true };
});

Listening to Progress

Worker Events

const worker = new Worker('myQueue', processor);

worker.on('progress', (job, progress) => {
  if (typeof progress === 'number') {
    console.log(`Job ${job.id}: ${progress}%`);
  } else {
    console.log(`Job ${job.id}:`, progress);
  }
});

QueueEvents

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('myQueue');

queueEvents.on('progress', ({ jobId, data }) => {
  if (typeof data === 'number') {
    console.log(`Job ${jobId}: ${data}%`);
  } else {
    console.log(`Job ${jobId}:`, JSON.stringify(data, null, 2));
  }
});

Queue Instance

import { Queue } from 'bullmq';

const queue = new Queue('myQueue');

queue.on('progress', (job, progress) => {
  console.log(`Progress update for ${job.id}:`, progress);
});

Best Practices

1. Update Progress Periodically

Don’t update too frequently:
const processor = async (job: Job) => {
  const items = job.data.items;
  let lastUpdate = Date.now();
  const UPDATE_INTERVAL = 1000; // 1 second
  
  for (let i = 0; i < items.length; i++) {
    await processItem(items[i]);
    
    // Only update every second
    const now = Date.now();
    if (now - lastUpdate > UPDATE_INTERVAL) {
      await job.updateProgress((i / items.length) * 100);
      lastUpdate = now;
    }
  }
  
  // Final update
  await job.updateProgress(100);
  
  return { processed: items.length };
};

2. Include Meaningful Information

interface ProgressInfo {
  status: string;
  current: number;
  total: number;
  percent: number;
  eta?: number;  // Estimated time remaining
  message?: string;
}

const processor = async (job: Job) => {
  const startTime = Date.now();
  const total = job.data.items.length;
  
  for (let i = 0; i < total; i++) {
    await processItem(job.data.items[i]);
    
    const elapsed = Date.now() - startTime;
    const rate = i / elapsed; // items per ms
    const remaining = total - i;
    const eta = remaining / rate;
    
    const progress: ProgressInfo = {
      status: 'processing',
      current: i + 1,
      total,
      percent: ((i + 1) / total) * 100,
      eta: Math.round(eta),
      message: `Processed ${i + 1} of ${total} items`,
    };
    
    await job.updateProgress(progress);
  }
  
  return { processed: total };
};

3. Handle Progress Errors Gracefully

const processor = async (job: Job) => {
  try {
    await job.updateProgress({ status: 'starting' });
  } catch (error) {
    console.warn('Failed to update progress:', error);
    // Continue processing even if progress update fails
  }
  
  // ... rest of processing
};
  • Job - Job class with updateProgress method
  • Worker - Worker class that emits progress events
  • QueueEvents - Listen to progress events

Build docs developers (and LLMs) love