Skip to main content

Overview

The FlowProducer class allows you to add jobs with dependencies between them in a tree-like structure called a flow. Whenever the children of a given parent are completed, the parent will be processed.

Constructor

new FlowProducer(opts?: QueueBaseOptions)
opts
QueueBaseOptions
Configuration options for the flow producer

Example

import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({
  connection: {
    host: 'localhost',
    port: 6379,
  },
});

const flow = await flowProducer.add({
  name: 'parent-job',
  queueName: 'parentQueue',
  data: { foo: 'bar' },
  children: [
    {
      name: 'child-job-1',
      queueName: 'childQueue',
      data: { baz: 'qux' },
    },
    {
      name: 'child-job-2',
      queueName: 'childQueue',
      data: { hello: 'world' },
    },
  ],
});

Methods

add

Adds a flow (tree of jobs) to the queue.
add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>
flow
FlowJob
required
An object with a tree-like structure where children jobs will be processed before their parents
opts
FlowOpts
Options that will be applied to the flow
jobNode
JobNode
The job tree structure with created job instances

FlowJob Structure

interface FlowJob {
  name: string;              // Job name
  queueName: string;         // Queue name where job will be added
  data?: any;                // Job data
  prefix?: string;           // Custom prefix for this job's queue
  opts?: JobsOptions;        // Job options
  children?: FlowJob[];      // Child jobs (processed before parent)
}

addBulk

Adds multiple flows atomically.
addBulk(flows: FlowJob[]): Promise<JobNode[]>
flows
FlowJob[]
required
An array of flow objects
jobNodes
JobNode[]
Array of job tree structures with created job instances

getFlow

Retrieves a flow by its root job.
getFlow(opts: NodeOpts): Promise<JobNode>
opts
NodeOpts
required
Options for getting a flow

NodeOpts

interface NodeOpts {
  queueName: string;      // Root job queue name
  id: string;             // Root job ID
  prefix?: string;        // Prefix included in job key
  depth?: number;         // Maximum depth/levels to visit (default: 10)
  maxChildren?: number;   // Maximum children per type (default: 20)
}

close

Closes the connection.
close(): Promise<void>

disconnect

Force disconnects the connection.
disconnect(): Promise<void>

Flow Concepts

Job Dependencies

In a flow, jobs can have parent-child relationships:
  • Child jobs are processed first
  • Parent jobs wait for all children to complete
  • Parent jobs can access children’s return values

Cross-Queue Flows

Jobs in a flow can be in different queues:
const flow = await flowProducer.add({
  name: 'process-video',
  queueName: 'videos',
  data: { videoId: '123' },
  children: [
    {
      name: 'transcode',
      queueName: 'encoding',
      data: { format: 'mp4' },
    },
    {
      name: 'generate-thumbnail',
      queueName: 'images',
      data: { timestamp: 0 },
    },
  ],
});

Nested Dependencies

Flows can be nested to create complex dependency trees:
const flow = await flowProducer.add({
  name: 'grandparent',
  queueName: 'queue1',
  children: [
    {
      name: 'parent',
      queueName: 'queue2',
      children: [
        { name: 'child1', queueName: 'queue3' },
        { name: 'child2', queueName: 'queue3' },
      ],
    },
  ],
});

JobNode Type

interface JobNode {
  job: Job;              // The job instance
  children?: JobNode[];  // Child job nodes
}

Events

The FlowProducer class extends EventEmitter and emits the following events:

error

Emitted when an error occurs.
flowProducer.on('error', (error: Error) => {
  console.error('Flow producer error:', error);
});

Build docs developers (and LLMs) love