Skip to main content
BullMQ has official NestJS integration through the @nestjs/bullmq package, providing decorators and modules for seamless integration with the NestJS framework.

Installation

npm install @nestjs/bullmq bullmq

Quick Start

1

Import BullModule in your root module

Configure the root BullModule with your Redis connection:
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
})
export class AppModule {}
2

Register a queue

Register queues using BullModule.registerQueue():
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
    }),
  ],
})
export class AudioModule {}
3

Create a processor

Use the @Processor decorator to create a job processor:
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('audio')
export class AudioProcessor extends WorkerHost {
  async process(job: Job<any, any, string>): Promise<any> {
    // Process your job
    console.log('Processing job:', job.id);
    console.log('Job data:', job.data);
    
    // Return result
    return { processed: true };
  }
  
  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    console.log(`Job ${job.id} completed`);
  }
  
  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error) {
    console.log(`Job ${job.id} failed:`, error);
  }
}
4

Register the processor as a provider

Add the processor to your module’s providers:
@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
    }),
  ],
  providers: [AudioProcessor],
})
export class AudioModule {}

Complete Example

Here’s a complete example with a module, processor, and service:
audio.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { AudioProcessor } from './audio.processor';
import { AudioService } from './audio.service';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'audio',
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
  ],
  providers: [AudioProcessor, AudioService],
  exports: [AudioService],
})
export class AudioModule {}
audio.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('audio')
export class AudioProcessor extends WorkerHost {
  async process(job: Job<any, any, string>): Promise<any> {
    // Simulate audio processing
    const { file, format } = job.data;
    
    console.log(`Converting ${file} to ${format}`);
    
    // Process audio file
    await this.convertAudio(file, format);
    
    return {
      success: true,
      outputFile: `${file}.${format}`,
    };
  }
  
  private async convertAudio(file: string, format: string) {
    // Your audio conversion logic
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
  
  @OnWorkerEvent('completed')
  onCompleted(job: Job) {
    console.log(`Audio conversion completed: ${job.id}`);
  }
  
  @OnWorkerEvent('failed')
  onFailed(job: Job, error: Error) {
    console.error(`Audio conversion failed: ${job.id}`, error);
  }
  
  @OnWorkerEvent('progress')
  onProgress(job: Job, progress: number) {
    console.log(`Job ${job.id} progress: ${progress}%`);
  }
}
audio.service.ts
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class AudioService {
  constructor(@InjectQueue('audio') private audioQueue: Queue) {}
  
  async convertAudio(file: string, format: string) {
    const job = await this.audioQueue.add('convert', {
      file,
      format,
    });
    
    return job.id;
  }
}

Flow Producer

For complex job workflows, register and use a FlowProducer:
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';

@Module({
  imports: [
    BullModule.registerFlowProducer({
      name: 'myflow',
    }),
  ],
})
export class AppModule {}
Inject and use the FlowProducer in your service:
import { Injectable } from '@nestjs/common';
import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';

@Injectable()
export class WorkflowService {
  constructor(
    @InjectFlowProducer('myflow') private flowProducer: FlowProducer,
  ) {}
  
  async createWorkflow() {
    const flow = await this.flowProducer.add({
      name: 'root-job',
      queueName: 'topQueue',
      data: {},
      children: [
        {
          name: 'child-job-1',
          data: { idx: 0, foo: 'bar' },
          queueName: 'childrenQueue',
        },
        {
          name: 'child-job-2',
          data: { idx: 1, foo: 'baz' },
          queueName: 'childrenQueue',
        },
      ],
    });
    
    return flow;
  }
}

Configuration Options

Override connection settings for specific queues:
BullModule.registerQueue({
  name: 'audio',
  connection: {
    host: '0.0.0.0',
    port: 6380,
  },
})
Pass worker options through the processor:
@Processor('audio', {
  concurrency: 5,
  limiter: {
    max: 10,
    duration: 1000,
  },
})
export class AudioProcessor extends WorkerHost {
  // ...
}

NestJS Producers

Learn how to add jobs to queues in NestJS

Queue Events

Listen to queue events in NestJS

External Documentation

NestJS Queues Documentation

Official NestJS documentation for queue integration

Build docs developers (and LLMs) love