Skip to main content
Job producers add jobs to queues in NestJS. Producers are typically application services (Nest providers) that inject queues to add jobs.

Basic Producer

1

Inject the queue into your service

Use the @InjectQueue() decorator to inject a queue by name:
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
}
The @InjectQueue() decorator identifies the queue by its name, as provided in registerQueue().
2

Add jobs to the queue

Call the queue’s add() method to add jobs:
async convertAudio(file: string, format: string) {
  const job = await this.audioQueue.add('convert', {
    file,
    format,
  });
  
  return job.id;
}

Complete Producer Example

audio.service.ts
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
  
  async convertAudio(file: string, format: string) {
    const job = await this.audioQueue.add('convert', {
      file,
      format,
    });
    
    return {
      jobId: job.id,
      status: 'queued',
    };
  }
  
  async convertWithOptions(file: string, format: string) {
    const job = await this.audioQueue.add(
      'convert',
      {
        file,
        format,
      },
      {
        // Job options
        priority: 1,
        delay: 5000, // Delay 5 seconds
        attempts: 3,
        backoff: {
          type: 'exponential',
          delay: 1000,
        },
      }
    );
    
    return job.id;
  }
  
  async getJobStatus(jobId: string) {
    const job = await this.audioQueue.getJob(jobId);
    
    if (!job) {
      return null;
    }
    
    return {
      id: job.id,
      state: await job.getState(),
      progress: job.progress,
      data: job.data,
    };
  }
}

Job Options

Customize job behavior with various options:
await this.audioQueue.add('convert', data, {
  priority: 1, // Lower number = higher priority
});

Flow Producers

For complex workflows with parent-child job relationships, use FlowProducer:
1

Register the FlowProducer

import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.registerFlowProducer({
      name: 'videoflow',
    }),
  ],
})
export class VideoModule {}
2

Inject the FlowProducer

import { Injectable } from '@nestjs/common';
import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';

@Injectable()
export class VideoService {
  constructor(
    @InjectFlowProducer('videoflow') private flowProducer: FlowProducer,
  ) {}
}
The @InjectFlowProducer() decorator identifies the flow producer by its name, as provided in registerFlowProducer().
3

Create a flow

async processVideo(videoId: string) {
  const flow = await this.flowProducer.add({
    name: 'process-video',
    queueName: 'video',
    data: { videoId },
    children: [
      {
        name: 'extract-audio',
        data: { videoId },
        queueName: 'audio',
      },
      {
        name: 'generate-thumbnail',
        data: { videoId },
        queueName: 'image',
      },
      {
        name: 'transcode',
        data: { videoId, format: 'mp4' },
        queueName: 'video',
      },
    ],
  });
  
  return flow;
}

Bulk Job Operations

Add multiple jobs efficiently:
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class BatchService {
  constructor(@InjectQueue('batch') private batchQueue: Queue) {}
  
  async processBatch(items: any[]) {
    const jobs = items.map((item, index) => ({
      name: 'process',
      data: item,
      opts: {
        jobId: `batch-${Date.now()}-${index}`,
      },
    }));
    
    const results = await this.batchQueue.addBulk(jobs);
    
    return {
      total: results.length,
      jobIds: results.map(job => job.id),
    };
  }
}

Controller Example

Integrate producers with NestJS controllers:
audio.controller.ts
import { Controller, Post, Body, Get, Param } from '@nestjs/common';
import { AudioService } from './audio.service';

@Controller('audio')
export class AudioController {
  constructor(private readonly audioService: AudioService) {}
  
  @Post('convert')
  async convert(
    @Body() body: { file: string; format: string }
  ) {
    const jobId = await this.audioService.convertAudio(
      body.file,
      body.format
    );
    
    return {
      message: 'Job queued successfully',
      jobId,
    };
  }
  
  @Get('status/:jobId')
  async getStatus(@Param('jobId') jobId: string) {
    const status = await this.audioService.getJobStatus(jobId);
    
    if (!status) {
      return { error: 'Job not found' };
    }
    
    return status;
  }
}

Best Practices

Job IDs

Use custom job IDs for deduplication:
await queue.add('task', data, {
  jobId: `unique-${userId}`,
});

Error Handling

Always handle errors when adding jobs:
try {
  await queue.add('task', data);
} catch (error) {
  // Handle error
}

Job Options

Set appropriate timeouts and attempts:
await queue.add('task', data, {
  attempts: 3,
  backoff: 1000,
});

Auto-removal

Clean up completed jobs automatically:
await queue.add('task', data, {
  removeOnComplete: true,
});

NestJS Integration

Learn about NestJS integration basics

Queue Events

Listen to queue events in NestJS

External Documentation

NestJS Queues Documentation

Official NestJS documentation for queue integration

Build docs developers (and LLMs) love