Skip to main content
Sometimes it is necessary to atomically add flows in bulk. For example, there could be a requirement that all the flows must be created or none of them. Also, adding flows in bulk can be faster since it reduces the number of roundtrips to Redis.

Using addBulk

The addBulk method on FlowProducer allows you to add multiple flows in a single atomic operation:
import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection });

const trees = await flow.addBulk([
  {
    name: 'root-job-1',
    queueName: 'rootQueueName-1',
    data: {},
    children: [
      {
        name,
        data: { idx: 0, foo: 'bar' },
        queueName: 'childrenQueueName-1',
      },
    ],
  },
  {
    name: 'root-job-2',
    queueName: 'rootQueueName-2',
    data: {},
    children: [
      {
        name,
        data: { idx: 1, foo: 'baz' },
        queueName: 'childrenQueueName-2',
      },
    ],
  },
]);

Atomic Execution

This call can only succeed or fail - all or none of the jobs will be added. This ensures data consistency when adding multiple related flows.
The addBulk method returns an array of JobNode objects, one for each flow added.

FlowJob Interface

Each flow in the array follows the same FlowJob interface as the add method:
interface FlowJobBase<T> {
  name: string;
  queueName: string;
  data?: any;
  prefix?: string;
  opts?: Omit<T, 'debounce' | 'deduplication' | 'repeat'>;
  children?: FlowChildJob[];
}

type FlowJob = FlowJobBase<JobsOptions>;

Example Use Cases

When you need to create multiple independent workflows that should all be created together or not at all:
const flows = await flowProducer.addBulk([
  {
    name: 'process-order-1',
    queueName: 'orders',
    data: { orderId: 1 },
    children: [
      { name: 'validate', queueName: 'validation', data: { orderId: 1 } },
      { name: 'charge', queueName: 'payment', data: { orderId: 1 } },
    ],
  },
  {
    name: 'process-order-2',
    queueName: 'orders',
    data: { orderId: 2 },
    children: [
      { name: 'validate', queueName: 'validation', data: { orderId: 2 } },
      { name: 'charge', queueName: 'payment', data: { orderId: 2 } },
    ],
  },
]);
Reduce Redis roundtrips by adding multiple flows at once instead of calling add multiple times:
// Less efficient - multiple Redis roundtrips
for (const flowData of flowsArray) {
  await flowProducer.add(flowData);
}

// More efficient - single Redis roundtrip
await flowProducer.addBulk(flowsArray);

Benefits

Atomicity

All flows are added or none are - ensures data consistency

Performance

Reduces Redis roundtrips for better performance

Error Handling

Simpler error handling - single point of failure

Transactional

Guarantees all-or-nothing behavior

API Reference

Build docs developers (and LLMs) love