The continueParentOnFailure option allows a parent job to start processing as soon as a child job fails, while the removeUnprocessedChildren method enables dynamic cleanup of unprocessed child jobs. Additionally, you can use the getFailedChildrenValues() method to determine whether the parent is processing due to a child failure or because all children completed successfully.
Key Features
continueParentOnFailure Trigger parent processing on child failure
removeUnprocessedChildren Clean up remaining unprocessed children
getFailedChildrenValues Determine why parent is processing
continueParentOnFailure Option
When set to true on a child job, the continueParentOnFailure option causes the parent job to begin processing immediately if that child fails. This contrasts with the default behavior, where the parent waits for all children to finish.
Key Behavior
The parent moves to the active state as soon as a child with this option fails
Other children may still be running or unprocessed
Ideal for scenarios requiring immediate parent intervention
Basic Usage
import { FlowProducer } from 'bullmq' ;
const flow = new FlowProducer ({ connection });
const originalTree = await flow . add ({
name: 'root-job' ,
queueName: 'topQueueName' ,
data: {},
children: [
{
name: 'child-job-1' ,
data: { idx: 0 , foo: 'bar' },
queueName: 'childrenQueueName' ,
opts: { continueParentOnFailure: true }, // Parent processes if this child fails
},
{
name: 'child-job-2' ,
data: { idx: 1 , foo: 'baz' },
queueName: 'childrenQueueName' ,
},
{
name: 'child-job-3' ,
data: { idx: 2 , foo: 'qux' },
queueName: 'childrenQueueName' ,
},
],
});
removeUnprocessedChildren Method
This method, available on a job instance, removes all unprocessed child jobs (those in waiting or delayed states) from the queue. It’s particularly useful when paired with continueParentOnFailure to clean up remaining children after a failure.
Key Behavior
Only affects children that haven’t started processing
Active, completed, or failed children remain intact
Call within the parent’s processor for dynamic cleanup
await job . removeUnprocessedChildren ();
getFailedChildrenValues Method
The getFailedChildrenValues() method returns an object mapping the IDs of failed child jobs to their failure error messages. This allows the parent job to determine why it’s processing—whether due to a child failure or because all children completed successfully.
Return Value
An object where keys are job IDs and values are error messages
Example: { "job-id-1": "Upload failed" }
Empty object if no children failed
const failedChildren = await job . getFailedChildrenValues ();
Complete Example
Here’s a comprehensive example combining all three features:
import { FlowProducer , Worker } from 'bullmq' ;
const flow = new FlowProducer ({ connection });
// Define the flow
const originalTree = await flow . add ({
name: 'root-job' ,
queueName: 'topQueueName' ,
data: {},
children: [
{
name: 'child-job-1' ,
data: { idx: 0 , foo: 'bar' },
queueName: 'childrenQueueName' ,
opts: { continueParentOnFailure: true }, // Parent processes if this child fails
},
{
name: 'child-job-2' ,
data: { idx: 1 , foo: 'baz' },
queueName: 'childrenQueueName' ,
},
{
name: 'child-job-3' ,
data: { idx: 2 , foo: 'qux' },
queueName: 'childrenQueueName' ,
},
],
});
// Processor for the parent job
const processor = async ( job ) => {
// Check if any children failed
const failedChildren = await job . getFailedChildrenValues ();
const hasFailedChildren = Object . keys ( failedChildren ). length > 0 ;
if ( hasFailedChildren ) {
// Path 1: A child failed, triggering continueParentOnFailure
console . log ( `Parent job ${ job . name } triggered by child failure(s):` , failedChildren );
// Remove unprocessed children
await job . removeUnprocessedChildren ();
console . log ( 'Unprocessed child jobs have been removed.' );
// Additional cleanup or error handling can go here
await performCleanup ( job . data );
// Optionally throw an error to mark parent as failed
throw new Error ( 'Workflow aborted due to child failure' );
} else {
// Path 2: All children completed successfully
console . log ( `Parent job ${ job . name } processing after all children completed successfully.` );
// Proceed with normal parent logic (e.g., aggregating results)
const childrenValues = await job . getChildrenValues ();
return processResults ( childrenValues );
}
};
const parentWorker = new Worker ( 'topQueueName' , processor , { connection });
Execution Flow
Child with continueParentOnFailure fails
child-job-1 encounters an error and moves to failed state.
Parent immediately becomes active
The parent job (root-job) is moved to the active state, even though child-job-2 and child-job-3 may still be waiting.
Parent processor runs
The parent worker picks up the job and executes the processor.
Check failed children
getFailedChildrenValues() returns the failed child information.
Clean up unprocessed children
removeUnprocessedChildren() removes child-job-2 and child-job-3 from the queue.
Handle failure path
Parent performs cleanup, notification, or other failure handling logic.
Use Cases
Consider a workflow where child jobs upload files to different servers. If one upload fails, the parent can react immediately: const uploadFlow = await flowProducer . add ({
name: 'upload-batch' ,
queueName: 'uploads' ,
data: { batchId: '123' },
children: [
{
name: 'upload-to-server-1' ,
queueName: 'file-uploads' ,
data: { server: 'server-1' },
opts: { continueParentOnFailure: true },
},
{
name: 'upload-to-server-2' ,
queueName: 'file-uploads' ,
data: { server: 'server-2' },
opts: { continueParentOnFailure: true },
},
{
name: 'upload-to-server-3' ,
queueName: 'file-uploads' ,
data: { server: 'server-3' },
opts: { continueParentOnFailure: true },
},
],
});
const uploadWorker = new Worker ( 'uploads' , async job => {
const failedChildren = await job . getFailedChildrenValues ();
if ( Object . keys ( failedChildren ). length > 0 ) {
// One upload failed, cancel remaining uploads
await job . removeUnprocessedChildren ();
// Clean up successfully uploaded files
const childrenValues = await job . getChildrenValues ();
for ( const [ childKey , result ] of Object . entries ( childrenValues )) {
await deleteFile ( result . fileUrl );
}
throw new Error ( 'Upload batch failed' );
}
// All uploads succeeded
return { status: 'success' , batchId: job . data . batchId };
});
Run tests in parallel and stop remaining tests if any critical test fails: const testFlow = await flowProducer . add ({
name: 'test-suite' ,
queueName: 'test-results' ,
data: { suite: 'integration' },
children: [
{
name: 'auth-tests' ,
queueName: 'test-runner' ,
data: { type: 'auth' },
opts: { continueParentOnFailure: true }, // Critical test
},
{
name: 'api-tests' ,
queueName: 'test-runner' ,
data: { type: 'api' },
},
{
name: 'ui-tests' ,
queueName: 'test-runner' ,
data: { type: 'ui' },
},
],
});
const resultsWorker = new Worker ( 'test-results' , async job => {
const failedChildren = await job . getFailedChildrenValues ();
if ( Object . keys ( failedChildren ). length > 0 ) {
// Critical test failed, stop remaining tests
await job . removeUnprocessedChildren ();
return {
status: 'failed' ,
failedTests: failedChildren ,
message: 'Test suite aborted due to critical failure' ,
};
}
// All tests passed
const testResults = await job . getChildrenValues ();
return {
status: 'passed' ,
results: testResults ,
};
});
Deploy to multiple regions and rollback if any deployment fails: const deploymentFlow = await flowProducer . add ({
name: 'deploy-global' ,
queueName: 'deployments' ,
data: { version: 'v2.0.0' },
children: [
{
name: 'deploy-us-east' ,
queueName: 'deploy-region' ,
data: { region: 'us-east-1' },
opts: { continueParentOnFailure: true },
},
{
name: 'deploy-eu-west' ,
queueName: 'deploy-region' ,
data: { region: 'eu-west-1' },
opts: { continueParentOnFailure: true },
},
{
name: 'deploy-ap-south' ,
queueName: 'deploy-region' ,
data: { region: 'ap-south-1' },
opts: { continueParentOnFailure: true },
},
],
});
const deploymentWorker = new Worker ( 'deployments' , async job => {
const failedChildren = await job . getFailedChildrenValues ();
if ( Object . keys ( failedChildren ). length > 0 ) {
// One deployment failed, cancel pending and rollback successful
await job . removeUnprocessedChildren ();
// Rollback successful deployments
const successfulDeployments = await job . getChildrenValues ();
for ( const [ childKey , result ] of Object . entries ( successfulDeployments )) {
await rollbackDeployment ( result . region );
}
throw new Error ( `Deployment failed in: ${ Object . keys ( failedChildren ). join ( ', ' ) } ` );
}
return { status: 'deployed' , version: job . data . version };
});
Decision Logic Example
Here’s a pattern for handling both success and failure paths:
const processor = async ( job ) => {
const failedChildren = await job . getFailedChildrenValues ();
const hasFailedChildren = Object . keys ( failedChildren ). length > 0 ;
if ( hasFailedChildren ) {
// Failure path: triggered by continueParentOnFailure
console . error ( 'Child failures detected:' , failedChildren );
// Get counts of different child states
const { processed , unprocessed , failed } = await job . getDependenciesCount ();
console . log ( `Status - Processed: ${ processed } , Unprocessed: ${ unprocessed } , Failed: ${ failed } ` );
// Remove any unprocessed children
if ( unprocessed > 0 ) {
await job . removeUnprocessedChildren ();
console . log ( `Removed ${ unprocessed } unprocessed children` );
}
// Perform cleanup for processed children
const childrenValues = await job . getChildrenValues ();
await cleanupSuccessfulChildren ( childrenValues );
// Notify about failure
await notifyFailure ( job . data , failedChildren );
throw new Error ( 'Workflow failed due to child failures' );
} else {
// Success path: all children completed successfully
console . log ( 'All children completed successfully' );
const childrenValues = await job . getChildrenValues ();
const result = await aggregateResults ( childrenValues );
return result ;
}
};
Monitoring and Debugging
You can monitor the behavior using queue events:
import { QueueEvents } from 'bullmq' ;
const queueEvents = new QueueEvents ( 'topQueueName' , { connection });
queueEvents . on ( 'active' , ({ jobId }) => {
console . log ( `Parent job ${ jobId } became active` );
});
queueEvents . on ( 'completed' , async ({ jobId }) => {
const job = await queue . getJob ( jobId );
const failedChildren = await job . getFailedChildrenValues ();
if ( Object . keys ( failedChildren ). length > 0 ) {
console . log ( `Parent ${ jobId } completed after handling child failures` );
} else {
console . log ( `Parent ${ jobId } completed with all children successful` );
}
});
Best Practices
Always Check Failed Children Use getFailedChildrenValues() to distinguish between success and failure paths in your parent processor.
Clean Up Resources Call removeUnprocessedChildren() to prevent unnecessary processing of remaining children.
Handle Both Paths Implement logic for both successful completion and failure-triggered processing.
Notify Stakeholders Send appropriate notifications based on whether processing was triggered by success or failure.
Be careful when using continueParentOnFailure with expensive child jobs. Unprocessed children will remain in the queue until explicitly removed, potentially consuming resources.
Combine continueParentOnFailure with removeUnprocessedChildren() to implement fail-fast patterns where you want to abort the entire workflow as soon as any critical child fails.
API Reference