Skip to main content

Overview

FlowJob defines a job that can have child dependencies, creating a tree-like structure where children are processed before their parents.

Interface

interface FlowJob {
  name: string;
  queueName: string;
  data?: any;
  prefix?: string;
  opts?: JobsOptions;
  children?: FlowJob[];
}

Properties

name

name
string
required
The name of the job

queueName

queueName
string
required
The name of the queue where this job will be added

data

data
any
The payload data for this job

prefix

prefix
string
Custom prefix for this job’s queue (overrides default)

opts

opts
JobsOptions
Job options (excluding debounce, deduplication, and repeat for parent jobs)

children

children
FlowJob[]
Array of child jobs that must complete before this job can be processed

Examples

Simple Flow

import { FlowProducer } from 'bullmq';

const flowProducer = new FlowProducer({
  connection: {
    host: 'localhost',
    port: 6379,
  },
});

const flow = await flowProducer.add({
  name: 'parent-task',
  queueName: 'parentQueue',
  data: { userId: 123 },
  children: [
    {
      name: 'child-task-1',
      queueName: 'childQueue',
      data: { step: 1 },
    },
    {
      name: 'child-task-2',
      queueName: 'childQueue',
      data: { step: 2 },
    },
  ],
});

Nested Flow

const flow = await flowProducer.add({
  name: 'grandparent',
  queueName: 'queue1',
  data: { level: 0 },
  children: [
    {
      name: 'parent',
      queueName: 'queue2',
      data: { level: 1 },
      children: [
        {
          name: 'child-1',
          queueName: 'queue3',
          data: { level: 2, id: 1 },
        },
        {
          name: 'child-2',
          queueName: 'queue3',
          data: { level: 2, id: 2 },
        },
      ],
    },
  ],
});

Cross-Queue Flow

const flow = await flowProducer.add({
  name: 'process-video',
  queueName: 'videos',
  data: {
    videoId: 'abc123',
    url: 's3://bucket/video.mp4',
  },
  children: [
    {
      name: 'transcode-1080p',
      queueName: 'encoding',
      data: { resolution: '1080p' },
    },
    {
      name: 'transcode-720p',
      queueName: 'encoding',
      data: { resolution: '720p' },
    },
    {
      name: 'generate-thumbnail',
      queueName: 'images',
      data: { timestamp: 0 },
    },
  ],
});

With Job Options

const flow = await flowProducer.add({
  name: 'parent',
  queueName: 'parentQueue',
  data: { parentData: true },
  opts: {
    priority: 1,
    removeOnComplete: 1000,
  },
  children: [
    {
      name: 'child',
      queueName: 'childQueue',
      data: { childData: true },
      opts: {
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
      },
    },
  ],
});

Multiple Independent Parents

const flows = await flowProducer.addBulk([
  {
    name: 'report-1',
    queueName: 'reports',
    data: { reportId: 1 },
    children: [
      { name: 'fetch-data', queueName: 'data', data: { source: 'db1' } },
      { name: 'process-data', queueName: 'processing', data: {} },
    ],
  },
  {
    name: 'report-2',
    queueName: 'reports',
    data: { reportId: 2 },
    children: [
      { name: 'fetch-data', queueName: 'data', data: { source: 'db2' } },
      { name: 'process-data', queueName: 'processing', data: {} },
    ],
  },
]);

Flow Execution

Processing Order

  1. All child jobs are added to their respective queues
  2. The parent job is added in waiting-children state
  3. Workers process the child jobs
  4. When all children complete, the parent moves to waiting state
  5. The parent job is processed
  6. The parent can access children’s results

Accessing Child Results

In the parent job processor:
const parentWorker = new Worker('parentQueue', async (job) => {
  // Get results from completed children
  const childrenValues = await job.getChildrenValues();
  
  console.log(childrenValues);
  // {
  //   'bull:childQueue:child-1-id': { result: 'from child 1' },
  //   'bull:childQueue:child-2-id': { result: 'from child 2' }
  // }
  
  // Use children results in processing
  return {
    processedChildren: Object.keys(childrenValues).length,
  };
});

FlowChildJob

For child jobs, a more restrictive type is used:
type FlowChildJob = Omit<FlowJob, 'opts'> & {
  opts?: Omit<JobsOptions, 'debounce' | 'deduplication' | 'parent' | 'repeat'>;
};
Child jobs cannot have:
  • parent option (automatically set)
  • repeat option
  • debounce/deduplication option

Build docs developers (and LLMs) love