Skip to main content

Overview

Throw WaitingChildrenError from a job processor to move the job to the waiting-children state, indicating that it’s waiting for child jobs to complete.

Class

class WaitingChildrenError extends Error {
  constructor(message?: string);
}

Usage

This error is typically handled internally by BullMQ when using FlowProducer. You rarely need to throw it manually.
import { WaitingChildrenError } from 'bullmq';

const processor = async (job) => {
  // Manually move job to waiting-children state
  throw new WaitingChildrenError('Waiting for child jobs');
};

Behavior

When WaitingChildrenError is thrown:
  1. The job is moved from active to waiting-children state
  2. The job waits for its child jobs to complete
  3. Once all children are completed, the job moves to waiting state
  4. The parent job is then processed and can access children results

Common Use Case: FlowProducer

FlowProducer automatically handles this for parent-child jobs:
import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({
  connection: { host: 'localhost', port: 6379 },
});

const flow = await flowProducer.add({
  name: 'parent',
  queueName: 'parentQueue',
  data: { parentData: true },
  children: [
    {
      name: 'child-1',
      queueName: 'childQueue',
      data: { childData: 1 },
    },
    {
      name: 'child-2',
      queueName: 'childQueue',
      data: { childData: 2 },
    },
  ],
});

// Parent job automatically goes to waiting-children state
// WaitingChildrenError is handled internally

Manual Child Job Creation

You can manually create child jobs and use WaitingChildrenError:
import { Queue, Worker, WaitingChildrenError } from 'bullmq';

const parentQueue = new Queue('parentQueue');
const childQueue = new Queue('childQueue');

// Add parent job
const parentJob = await parentQueue.add('parent', { items: [1, 2, 3] });

// Parent processor
const parentWorker = new Worker('parentQueue', async (job) => {
  // Check if children have been created
  if (!job.data.childrenCreated) {
    // Create child jobs
    for (const item of job.data.items) {
      await childQueue.add(
        'child',
        { item },
        {
          parent: {
            id: job.id,
            queue: `bull:parentQueue`,
          },
        }
      );
    }
    
    // Mark children as created
    await job.updateData({
      ...job.data,
      childrenCreated: true,
    });
    
    // Move to waiting-children state
    throw new WaitingChildrenError();
  }
  
  // Children are complete, process results
  const childrenValues = await job.getChildrenValues();
  console.log('Children results:', childrenValues);
  
  return {
    processedChildren: Object.keys(childrenValues).length,
  };
});

Accessing Child Results

When children complete, the parent can access their results:
const parentWorker = new Worker('parentQueue', async (job) => {
  // This only runs after all children complete
  const childrenValues = await job.getChildrenValues();
  
  console.log(childrenValues);
  // {
  //   'bull:childQueue:child-1-id': { result: 'from child 1' },
  //   'bull:childQueue:child-2-id': { result: 'from child 2' }
  // }
  
  // Process aggregate results
  const total = Object.values(childrenValues).reduce(
    (sum, val: any) => sum + val.count,
    0
  );
  
  return { total };
});

Monitoring Waiting Children

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('parentQueue');

queueEvents.on('waiting-children', ({ jobId }) => {
  console.log(`Job ${jobId} is waiting for children`);
});

queueEvents.on('waiting', ({ jobId }) => {
  console.log(`Job ${jobId} children completed, now waiting`);
});

Checking Waiting Children State

const job = await queue.getJob('job-id');

const isWaitingChildren = await job.isWaitingChildren();
if (isWaitingChildren) {
  console.log('Job is waiting for children');
  
  // Get children status
  const deps = await job.getDependencies();
  console.log('Processed:', deps.processed);
  console.log('Unprocessed:', deps.unprocessed);
}

Child Job Failure Handling

Control how parent jobs handle child failures:
await flowProducer.add({
  name: 'parent',
  queueName: 'parentQueue',
  data: {},
  children: [
    {
      name: 'child',
      queueName: 'childQueue',
      data: {},
      opts: {
        // Fail parent if this child fails
        failParentOnFailure: true,
      },
    },
    {
      name: 'another-child',
      queueName: 'childQueue',
      data: {},
      opts: {
        // Ignore this child's failure
        ignoreDependencyOnFailure: true,
      },
    },
  ],
});

Example: Multi-Stage Processing

import { FlowProducer, Worker } from 'bullmq';

const flowProducer = new FlowProducer({
  connection: { host: 'localhost', port: 6379 },
});

// Create a flow with multiple stages
await flowProducer.add({
  name: 'generate-report',
  queueName: 'reports',
  data: { reportId: '123' },
  children: [
    {
      name: 'fetch-data',
      queueName: 'data',
      data: { source: 'database' },
    },
    {
      name: 'fetch-external',
      queueName: 'data',
      data: { source: 'api' },
    },
  ],
});

// Worker for data jobs
const dataWorker = new Worker('data', async (job) => {
  return await fetchData(job.data.source);
});

// Worker for report jobs (runs after children)
const reportWorker = new Worker('reports', async (job) => {
  // Children are complete
  const childrenValues = await job.getChildrenValues();
  
  const data = Object.values(childrenValues);
  return await generateReport(data);
});

When to Use WaitingChildrenError

  • When using FlowProducer (handled automatically)
  • Manual parent-child job relationships
  • Multi-stage job processing
  • Aggregating results from parallel jobs

Build docs developers (and LLMs) love