Skip to main content
BullMQ supports parent-child relationships between jobs. The basic idea is that a parent job will not be moved to the wait status (i.e. where it could be picked up by a worker) until all its children jobs have been processed successfully. Apart from that, a parent or a child job are no different from regular jobs. This functionality enables the creation of flows where jobs are the node of trees of arbitrary depth.
Flows are added to a queue using the FlowProducer class.

FlowJob Interface

In order to create flows, you must use the FlowProducer class. The add method accepts an object with the following interface:
interface FlowJobBase<T> {
  name: string;
  queueName: string;
  data?: any;
  prefix?: string;
  opts?: Omit<T, 'debounce' | 'deduplication' | 'repeat'>;
  children?: FlowChildJob[];
}

type FlowChildJob = FlowJobBase<
  Omit<JobsOptions, 'debounce' | 'deduplication' | 'parent' | 'repeat'>
>;

type FlowJob = FlowJobBase<JobsOptions>;

Adding a Flow

import { FlowProducer } from 'bullmq';

// A FlowProducer constructor takes an optional "connection"
// object otherwise it connects to a local redis instance.
const flowProducer = new FlowProducer();

const flow = await flowProducer.add({
  name: 'renovate-interior',
  queueName: 'renovate',
  children: [
    { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' },
    { name: 'paint', data: { place: 'walls' }, queueName: 'steps' },
    { name: 'fix', data: { place: 'floor' }, queueName: 'steps' },
  ],
});
The above code will atomically add 4 jobs: one to the “renovate” queue, and 3 to the “steps” queue. When the 3 jobs in the “steps” queue are completed, the parent job in the “renovate” queue will be processed as a regular job.
Note that the parent queue does not need to be the same queue as the one used for the children.
If a jobId option is provided, make sure that it does not contain a colon : as this is considered a separator.

Accessing Child Results

When the parent job is processed, it is possible to access the results generated by its child jobs. For example, let’s assume the following worker for the child jobs:
import { Worker } from 'bullmq';

const stepsWorker = new Worker('steps', async job => {
  await performStep(job.data);

  if (job.name === 'paint') {
    return 2500;
  } else if (job.name === 'fix') {
    return 1750;
  }
});
We can implement a parent worker that sums the costs of the children’s jobs using the getChildrenValues method:
import { Worker } from 'bullmq';

const renovateWorker = new Worker('renovate', async job => {
  const childrenValues = await job.getChildrenValues();

  const totalCosts = Object.values(childrenValues).reduce(
    (prev, cur) => prev + cur,
    0,
  );

  await sendInvoice(totalCosts);
});

Deep Hierarchies

It is possible to add as deep job hierarchies as needed. Here’s an example where jobs depend on each other, allowing serial execution:
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();

const queueName = 'assembly-line';
const chain = await flowProducer.add({
  name: 'car',
  data: { step: 'engine' },
  queueName,
  children: [
    {
      name: 'car',
      data: { step: 'wheels' },
      queueName,
      children: [{ name: 'car', data: { step: 'chassis' }, queueName }],
    },
  ],
});
The order of processing would be: chassis, wheels and finally engine.

Job Getters

There are several methods available to get jobs related to a flow:

Get Dependencies

Get all the direct dependencies (children) of a given job:
const dependencies = await job.getDependencies();
Or get specific types of children with pagination:
const { processed, nextProcessedCursor } = await job.getDependencies({
  processed: {
    count: 5,
    cursor: 0,
  },
});

const { unprocessed, nextUnprocessedCursor } = await job.getDependencies({
  unprocessed: {
    count: 5,
    cursor: 0,
  },
});

const { failed, nextFailedCursor } = await job.getDependencies({
  failed: {
    count: 5,
    cursor: 0,
  },
});

const { ignored, nextIgnoredCursor } = await job.getDependencies({
  ignored: {
    count: 5,
    cursor: 0,
  },
});

Get Dependencies Count

Get all the different counts of children by type:
const { failed, ignored, processed, unprocessed } =
  await job.getDependenciesCount();
Or be specific:
const { failed } = await job.getDependenciesCount({
  failed: true,
});

const { ignored, processed } = await job.getDependenciesCount({
  ignored: true,
  processed: true,
});

Get Children Values

Get all the values produced by the children of a given job:
const values = await job.getChildrenValues();

Parent Key Property

A new property is available in the Job class, parentKey, with a fully qualified key for the job parent.

Waiting Children State

Parent jobs that have not yet had their children completed will be in the “waiting-children” state:
const state = await job.getState();
// state will be "waiting-children"

Queue Options

You can provide queue-specific options for each queue used in the flow:
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();

const queueName = 'assembly-line';
const chain = await flowProducer.add(
  {
    name: 'car',
    data: { step: 'engine' },
    queueName,
    children: [
      {
        name: 'car',
        data: { step: 'wheels' },
        queueName,
      },
    ],
  },
  {
    queuesOptions: {
      [queueName]: {
        defaultJobOptions: {
          removeOnComplete: true,
        },
      },
    },
  },
);
Queue options are defined in the context of their instances. You should provide your configurations in the second parameter to avoid unexpected behaviors.

Jobs Removal

BullMQ provides seamless removal functionality for jobs that are part of a flow. When removing a job that is part of a flow, there are several important considerations:
  1. If a parent job is removed, all its children will also be removed.
  2. If a child job is removed, its parent dependency to said child is also removed, and if the child was the last child in the dependencies list, the parent job will be completed.
  3. Since a job can be both a parent and a child in a large flow, both 1 and 2 will occur if removing such a job.
  4. If any of the jobs that would be removed happen to be locked, none of the jobs will be removed, and an exception will be thrown.
Removing a job can be done using either the Job or the Queue class:
await job.remove();
// or
await queue.remove(job.id);

Add Flows in Bulk

Atomically add multiple flows at once

Get Flow Tree

Retrieve job hierarchies with children and grandchildren

Parent Dependencies

Manage complex parent-child job relationships

Fail Parent

Make parent jobs fail when children fail

API Reference

Further Reading

Build docs developers (and LLMs) love