Overview
The FlowProducer class allows you to add jobs with dependencies between them in a tree-like structure called a flow. Whenever the children of a given parent are completed, the parent will be processed.
Constructor
new FlowProducer(opts?: QueueBaseOptions)
Configuration options for the flow producer
Example
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({
connection: {
host: 'localhost',
port: 6379,
},
});
const flow = await flowProducer.add({
name: 'parent-job',
queueName: 'parentQueue',
data: { foo: 'bar' },
children: [
{
name: 'child-job-1',
queueName: 'childQueue',
data: { baz: 'qux' },
},
{
name: 'child-job-2',
queueName: 'childQueue',
data: { hello: 'world' },
},
],
});
Methods
add
Adds a flow (tree of jobs) to the queue.
add(flow: FlowJob, opts?: FlowOpts): Promise<JobNode>
An object with a tree-like structure where children jobs will be processed before their parents
Options that will be applied to the flow
The job tree structure with created job instances
FlowJob Structure
interface FlowJob {
name: string; // Job name
queueName: string; // Queue name where job will be added
data?: any; // Job data
prefix?: string; // Custom prefix for this job's queue
opts?: JobsOptions; // Job options
children?: FlowJob[]; // Child jobs (processed before parent)
}
addBulk
Adds multiple flows atomically.
addBulk(flows: FlowJob[]): Promise<JobNode[]>
Array of job tree structures with created job instances
getFlow
Retrieves a flow by its root job.
getFlow(opts: NodeOpts): Promise<JobNode>
Options for getting a flow
NodeOpts
interface NodeOpts {
queueName: string; // Root job queue name
id: string; // Root job ID
prefix?: string; // Prefix included in job key
depth?: number; // Maximum depth/levels to visit (default: 10)
maxChildren?: number; // Maximum children per type (default: 20)
}
close
Closes the connection.
disconnect
Force disconnects the connection.
disconnect(): Promise<void>
Flow Concepts
Job Dependencies
In a flow, jobs can have parent-child relationships:
- Child jobs are processed first
- Parent jobs wait for all children to complete
- Parent jobs can access children’s return values
Cross-Queue Flows
Jobs in a flow can be in different queues:
const flow = await flowProducer.add({
name: 'process-video',
queueName: 'videos',
data: { videoId: '123' },
children: [
{
name: 'transcode',
queueName: 'encoding',
data: { format: 'mp4' },
},
{
name: 'generate-thumbnail',
queueName: 'images',
data: { timestamp: 0 },
},
],
});
Nested Dependencies
Flows can be nested to create complex dependency trees:
const flow = await flowProducer.add({
name: 'grandparent',
queueName: 'queue1',
children: [
{
name: 'parent',
queueName: 'queue2',
children: [
{ name: 'child1', queueName: 'queue3' },
{ name: 'child2', queueName: 'queue3' },
],
},
],
});
JobNode Type
interface JobNode {
job: Job; // The job instance
children?: JobNode[]; // Child job nodes
}
Events
The FlowProducer class extends EventEmitter and emits the following events:
error
Emitted when an error occurs.
flowProducer.on('error', (error: Error) => {
console.error('Flow producer error:', error);
});