Overview
Throw WaitingChildrenError from a job processor to move the job to the waiting-children state, indicating that it’s waiting for child jobs to complete.
Class
class WaitingChildrenError extends Error {
constructor(message?: string);
}
Usage
This error is typically handled internally by BullMQ when using FlowProducer. You rarely need to throw it manually.
import { WaitingChildrenError } from 'bullmq';
const processor = async (job) => {
// Manually move job to waiting-children state
throw new WaitingChildrenError('Waiting for child jobs');
};
Behavior
When WaitingChildrenError is thrown:
- The job is moved from
active to waiting-children state
- The job waits for its child jobs to complete
- Once all children are completed, the job moves to
waiting state
- The parent job is then processed and can access children results
Common Use Case: FlowProducer
FlowProducer automatically handles this for parent-child jobs:
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({
connection: { host: 'localhost', port: 6379 },
});
const flow = await flowProducer.add({
name: 'parent',
queueName: 'parentQueue',
data: { parentData: true },
children: [
{
name: 'child-1',
queueName: 'childQueue',
data: { childData: 1 },
},
{
name: 'child-2',
queueName: 'childQueue',
data: { childData: 2 },
},
],
});
// Parent job automatically goes to waiting-children state
// WaitingChildrenError is handled internally
Manual Child Job Creation
You can manually create child jobs and use WaitingChildrenError:
import { Queue, Worker, WaitingChildrenError } from 'bullmq';
const parentQueue = new Queue('parentQueue');
const childQueue = new Queue('childQueue');
// Add parent job
const parentJob = await parentQueue.add('parent', { items: [1, 2, 3] });
// Parent processor
const parentWorker = new Worker('parentQueue', async (job) => {
// Check if children have been created
if (!job.data.childrenCreated) {
// Create child jobs
for (const item of job.data.items) {
await childQueue.add(
'child',
{ item },
{
parent: {
id: job.id,
queue: `bull:parentQueue`,
},
}
);
}
// Mark children as created
await job.updateData({
...job.data,
childrenCreated: true,
});
// Move to waiting-children state
throw new WaitingChildrenError();
}
// Children are complete, process results
const childrenValues = await job.getChildrenValues();
console.log('Children results:', childrenValues);
return {
processedChildren: Object.keys(childrenValues).length,
};
});
Accessing Child Results
When children complete, the parent can access their results:
const parentWorker = new Worker('parentQueue', async (job) => {
// This only runs after all children complete
const childrenValues = await job.getChildrenValues();
console.log(childrenValues);
// {
// 'bull:childQueue:child-1-id': { result: 'from child 1' },
// 'bull:childQueue:child-2-id': { result: 'from child 2' }
// }
// Process aggregate results
const total = Object.values(childrenValues).reduce(
(sum, val: any) => sum + val.count,
0
);
return { total };
});
Monitoring Waiting Children
import { QueueEvents } from 'bullmq';
const queueEvents = new QueueEvents('parentQueue');
queueEvents.on('waiting-children', ({ jobId }) => {
console.log(`Job ${jobId} is waiting for children`);
});
queueEvents.on('waiting', ({ jobId }) => {
console.log(`Job ${jobId} children completed, now waiting`);
});
Checking Waiting Children State
const job = await queue.getJob('job-id');
const isWaitingChildren = await job.isWaitingChildren();
if (isWaitingChildren) {
console.log('Job is waiting for children');
// Get children status
const deps = await job.getDependencies();
console.log('Processed:', deps.processed);
console.log('Unprocessed:', deps.unprocessed);
}
Child Job Failure Handling
Control how parent jobs handle child failures:
await flowProducer.add({
name: 'parent',
queueName: 'parentQueue',
data: {},
children: [
{
name: 'child',
queueName: 'childQueue',
data: {},
opts: {
// Fail parent if this child fails
failParentOnFailure: true,
},
},
{
name: 'another-child',
queueName: 'childQueue',
data: {},
opts: {
// Ignore this child's failure
ignoreDependencyOnFailure: true,
},
},
],
});
Example: Multi-Stage Processing
import { FlowProducer, Worker } from 'bullmq';
const flowProducer = new FlowProducer({
connection: { host: 'localhost', port: 6379 },
});
// Create a flow with multiple stages
await flowProducer.add({
name: 'generate-report',
queueName: 'reports',
data: { reportId: '123' },
children: [
{
name: 'fetch-data',
queueName: 'data',
data: { source: 'database' },
},
{
name: 'fetch-external',
queueName: 'data',
data: { source: 'api' },
},
],
});
// Worker for data jobs
const dataWorker = new Worker('data', async (job) => {
return await fetchData(job.data.source);
});
// Worker for report jobs (runs after children)
const reportWorker = new Worker('reports', async (job) => {
// Children are complete
const childrenValues = await job.getChildrenValues();
const data = Object.values(childrenValues);
return await generateReport(data);
});
When to Use WaitingChildrenError
- When using FlowProducer (handled automatically)
- Manual parent-child job relationships
- Multi-stage job processing
- Aggregating results from parallel jobs