Overview
FlowJob defines a job that can have child dependencies, creating a tree-like structure where children are processed before their parents.
Interface
interface FlowJob {
name: string;
queueName: string;
data?: any;
prefix?: string;
opts?: JobsOptions;
children?: FlowJob[];
}
Properties
name
queueName
The name of the queue where this job will be added
data
The payload data for this job
prefix
Custom prefix for this job’s queue (overrides default)
opts
Job options (excluding debounce, deduplication, and repeat for parent jobs)
children
Array of child jobs that must complete before this job can be processed
Examples
Simple Flow
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({
connection: {
host: 'localhost',
port: 6379,
},
});
const flow = await flowProducer.add({
name: 'parent-task',
queueName: 'parentQueue',
data: { userId: 123 },
children: [
{
name: 'child-task-1',
queueName: 'childQueue',
data: { step: 1 },
},
{
name: 'child-task-2',
queueName: 'childQueue',
data: { step: 2 },
},
],
});
Nested Flow
const flow = await flowProducer.add({
name: 'grandparent',
queueName: 'queue1',
data: { level: 0 },
children: [
{
name: 'parent',
queueName: 'queue2',
data: { level: 1 },
children: [
{
name: 'child-1',
queueName: 'queue3',
data: { level: 2, id: 1 },
},
{
name: 'child-2',
queueName: 'queue3',
data: { level: 2, id: 2 },
},
],
},
],
});
Cross-Queue Flow
const flow = await flowProducer.add({
name: 'process-video',
queueName: 'videos',
data: {
videoId: 'abc123',
url: 's3://bucket/video.mp4',
},
children: [
{
name: 'transcode-1080p',
queueName: 'encoding',
data: { resolution: '1080p' },
},
{
name: 'transcode-720p',
queueName: 'encoding',
data: { resolution: '720p' },
},
{
name: 'generate-thumbnail',
queueName: 'images',
data: { timestamp: 0 },
},
],
});
With Job Options
const flow = await flowProducer.add({
name: 'parent',
queueName: 'parentQueue',
data: { parentData: true },
opts: {
priority: 1,
removeOnComplete: 1000,
},
children: [
{
name: 'child',
queueName: 'childQueue',
data: { childData: true },
opts: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
},
],
});
Multiple Independent Parents
const flows = await flowProducer.addBulk([
{
name: 'report-1',
queueName: 'reports',
data: { reportId: 1 },
children: [
{ name: 'fetch-data', queueName: 'data', data: { source: 'db1' } },
{ name: 'process-data', queueName: 'processing', data: {} },
],
},
{
name: 'report-2',
queueName: 'reports',
data: { reportId: 2 },
children: [
{ name: 'fetch-data', queueName: 'data', data: { source: 'db2' } },
{ name: 'process-data', queueName: 'processing', data: {} },
],
},
]);
Flow Execution
Processing Order
- All child jobs are added to their respective queues
- The parent job is added in
waiting-children state
- Workers process the child jobs
- When all children complete, the parent moves to
waiting state
- The parent job is processed
- The parent can access children’s results
Accessing Child Results
In the parent job processor:
const parentWorker = new Worker('parentQueue', async (job) => {
// Get results from completed children
const childrenValues = await job.getChildrenValues();
console.log(childrenValues);
// {
// 'bull:childQueue:child-1-id': { result: 'from child 1' },
// 'bull:childQueue:child-2-id': { result: 'from child 2' }
// }
// Use children results in processing
return {
processedChildren: Object.keys(childrenValues).length,
};
});
FlowChildJob
For child jobs, a more restrictive type is used:
type FlowChildJob = Omit<FlowJob, 'opts'> & {
opts?: Omit<JobsOptions, 'debounce' | 'deduplication' | 'parent' | 'repeat'>;
};
Child jobs cannot have:
parent option (automatically set)
repeat option
debounce/deduplication option