BullMQ provides built-in support for OpenTelemetry, the industry-standard observability framework. Telemetry enables you to trace jobs through their complete lifecycle, gain insights into system performance, and debug complex distributed applications.
Overview
Telemetry helps you:
Track job lifecycle : Observe jobs from creation through completion
Trace distributed systems : Follow jobs across multiple services
Monitor performance : Measure processing times and identify bottlenecks
Debug issues : Understand what happened when things go wrong
Correlate events : Connect jobs with external API calls and database queries
OpenTelemetry Support
BullMQ implements the OpenTelemetry specification, which provides:
Traces : Follow the path of jobs through your system
Spans : Measure the duration of specific operations
Metrics : Track job counts, durations, and rates (see Metrics )
Context propagation : Link related operations across services
BullMQ’s telemetry interface is flexible enough to support other telemetry backends in the future.
Installation
Install the BullMQ OpenTelemetry package:
Basic Setup
Adding Telemetry to Queues
import { Queue } from 'bullmq' ;
import { BullMQOtel } from 'bullmq-otel' ;
const telemetry = new BullMQOtel ( 'my-app' );
const queue = new Queue ( 'myQueue' , {
connection: {
host: '127.0.0.1' ,
port: 6379 ,
},
telemetry ,
});
// Jobs added to this queue will be traced
await queue . add ( 'task' , { userId: 123 });
Adding Telemetry to Workers
import { Worker } from 'bullmq' ;
import { BullMQOtel } from 'bullmq-otel' ;
const telemetry = new BullMQOtel ( 'my-app' );
const worker = new Worker (
'myQueue' ,
async job => {
// Job processing is automatically traced
return await processJob ( job . data );
},
{
connection: {
host: '127.0.0.1' ,
port: 6379 ,
},
telemetry ,
},
);
Configuration Options
Basic Configuration
import { BullMQOtel } from 'bullmq-otel' ;
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
meterName: 'my-app' ,
version: '1.0.0' ,
});
Name for the tracer. Use your application name for easier filtering.
Name for the meter (used with metrics).
Version string for both tracer and meter. Useful for tracking changes over time.
Enabling Metrics
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
version: '1.0.0' ,
enableMetrics: true , // Enable OpenTelemetry metrics
});
Enable OpenTelemetry metrics collection. When enabled, BullMQ automatically records job counts, durations, and other metrics.
See the OpenTelemetry Metrics section below for details on available metrics.
Backward Compatibility
The original constructor is still supported:
// Old style (traces only)
const telemetry = new BullMQOtel ( 'my-app' , '1.0.0' );
// New style (traces + optional metrics)
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
version: '1.0.0' ,
enableMetrics: true ,
});
Running Jaeger Locally
For local development, use Jaeger to visualize traces:
# Using Docker
docker run -d --name jaeger \
-p 16686:16686 \
-p 4318:4318 \
jaegertracing/all-in-one:latest
Access the Jaeger UI at: http://localhost:16686
Complete Example
import { Queue , Worker } from 'bullmq' ;
import { BullMQOtel } from 'bullmq-otel' ;
import { trace } from '@opentelemetry/api' ;
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node' ;
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http' ;
import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base' ;
import { Resource } from '@opentelemetry/resources' ;
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions' ;
// Configure OpenTelemetry
const resource = Resource . default (). merge (
new Resource ({
[SemanticResourceAttributes. SERVICE_NAME ]: 'my-app' ,
[SemanticResourceAttributes. SERVICE_VERSION ]: '1.0.0' ,
}),
);
const provider = new NodeTracerProvider ({ resource });
const exporter = new OTLPTraceExporter ({
url: 'http://localhost:4318/v1/traces' ,
});
provider . addSpanProcessor ( new SimpleSpanProcessor ( exporter ));
provider . register ();
// Create BullMQ instances with telemetry
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
version: '1.0.0' ,
enableMetrics: true ,
});
const queue = new Queue ( 'tasks' , {
connection: { host: '127.0.0.1' , port: 6379 },
telemetry ,
});
const worker = new Worker (
'tasks' ,
async job => {
console . log ( `Processing job ${ job . id } ` );
// Your processing logic
await new Promise ( resolve => setTimeout ( resolve , 1000 ));
return { processed: true };
},
{
connection: { host: '127.0.0.1' , port: 6379 },
telemetry ,
},
);
// Add jobs
await queue . add ( 'task1' , { userId: 123 });
await queue . add ( 'task2' , { userId: 456 });
console . log ( 'Jobs added - view traces at http://localhost:16686' );
OpenTelemetry Metrics
When enableMetrics: true is set, BullMQ automatically records the following metrics:
Counters
Metric Name Description bullmq.jobs.completedNumber of jobs that completed successfully bullmq.jobs.failedNumber of jobs that failed (after all retries exhausted) bullmq.jobs.delayedNumber of jobs moved to delayed state (including retry delays) bullmq.jobs.retriedNumber of jobs that were retried immediately bullmq.jobs.waitingNumber of jobs moved back to waiting state bullmq.jobs.waiting_childrenNumber of jobs moved to waiting-children state
Histograms
Metric Name Description Unit bullmq.job.durationJob processing duration milliseconds
Metric Attributes
All metrics include these attributes for filtering and grouping:
Attribute Description bullmq.queue.nameName of the queue bullmq.job.nameName of the job bullmq.job.statusStatus of the job (completed, failed, delayed, etc.)
Configuring Metrics Export
import {
MeterProvider ,
PeriodicExportingMetricReader ,
} from '@opentelemetry/sdk-metrics' ;
import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http' ;
import { metrics } from '@opentelemetry/api' ;
// Configure the metrics exporter
const metricExporter = new OTLPMetricExporter ({
url: 'http://localhost:4318/v1/metrics' ,
});
const meterProvider = new MeterProvider ({
readers: [
new PeriodicExportingMetricReader ({
exporter: metricExporter ,
exportIntervalMillis: 10000 , // Export every 10 seconds
}),
],
});
// Set the global meter provider
metrics . setGlobalMeterProvider ( meterProvider );
// Now create BullMQ instances with metrics enabled
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
enableMetrics: true ,
});
Set up the meter provider before creating BullMQ instances with telemetry enabled.
Custom Metric Options
You can pre-configure metrics with custom options:
import { BullMQOtel } from 'bullmq-otel' ;
const telemetry = new BullMQOtel ({
tracerName: 'my-app' ,
meterName: 'my-app' ,
version: '1.0.0' ,
enableMetrics: true ,
});
// Pre-configure a counter with custom options
telemetry . meter . createCounter ( 'bullmq.jobs.completed' , {
description: 'Custom description for completed jobs' ,
unit: '1' ,
});
// Pre-configure the duration histogram
telemetry . meter . createHistogram ( 'bullmq.job.duration' , {
description: 'Custom job processing duration' ,
unit: 's' , // Using seconds instead of milliseconds
});
The BullMQOtelMeter caches all created counters and histograms by name. When BullMQ internally calls createCounter or createHistogram with the same name, the cached instance is returned, effectively using your custom options.
Tracing Custom Operations
Add custom spans within your job processor:
import { Worker } from 'bullmq' ;
import { trace } from '@opentelemetry/api' ;
import { BullMQOtel } from 'bullmq-otel' ;
const telemetry = new BullMQOtel ( 'my-app' );
const tracer = trace . getTracer ( 'my-app' );
const worker = new Worker (
'tasks' ,
async job => {
// BullMQ automatically creates a span for the job
// Add custom spans for specific operations
// Span 1: Fetch user data
await tracer . startActiveSpan ( 'fetch-user' , async span => {
try {
const user = await db . findUser ( job . data . userId );
span . setAttribute ( 'user.id' , user . id );
span . setAttribute ( 'user.email' , user . email );
return user ;
} finally {
span . end ();
}
});
// Span 2: Call external API
await tracer . startActiveSpan ( 'external-api-call' , async span => {
try {
const response = await fetch ( 'https://api.example.com/data' );
span . setAttribute ( 'http.status_code' , response . status );
return response . json ();
} finally {
span . end ();
}
});
return { completed: true };
},
{
telemetry ,
},
);
Distributed Tracing
Trace jobs across multiple services:
import { Queue , Worker } from 'bullmq' ;
import { BullMQOtel } from 'bullmq-otel' ;
import { trace , context , propagation } from '@opentelemetry/api' ;
// Service 1: Receives HTTP request and creates job
const telemetry1 = new BullMQOtel ( 'service-1' );
const queue = new Queue ( 'tasks' , { telemetry: telemetry1 });
app . post ( '/process' , async ( req , res ) => {
const tracer = trace . getTracer ( 'service-1' );
await tracer . startActiveSpan ( 'http-request' , async span => {
try {
// Create job - trace context is automatically propagated
const job = await queue . add ( 'task' , req . body );
span . setAttribute ( 'job.id' , job . id );
res . json ({ jobId: job . id });
} finally {
span . end ();
}
});
});
// Service 2: Processes the job
const telemetry2 = new BullMQOtel ( 'service-2' );
const worker = new Worker (
'tasks' ,
async job => {
// This span is automatically linked to the HTTP request span
console . log ( 'Processing job with trace context' );
return await processData ( job . data );
},
{ telemetry: telemetry2 },
);
Benefits for Large Applications
Telemetry is especially valuable in large, distributed systems:
Track Job Sources
import { trace } from '@opentelemetry/api' ;
const tracer = trace . getTracer ( 'my-app' );
await tracer . startActiveSpan ( 'create-user-job' , async span => {
span . setAttribute ( 'source' , 'web-api' );
span . setAttribute ( 'user.id' , userId );
span . setAttribute ( 'endpoint' , '/api/users' );
await queue . add ( 'create-user' , { userId });
span . end ();
});
Monitor Job Interactions
const worker = new Worker ( 'tasks' , async job => {
const tracer = trace . getTracer ( 'my-app' );
// Trace database operations
await tracer . startActiveSpan ( 'db-query' , async span => {
const result = await db . query ( 'SELECT * FROM users WHERE id = ?' , [ job . data . userId ]);
span . setAttribute ( 'db.rows_returned' , result . length );
span . end ();
return result ;
});
// Trace external API calls
await tracer . startActiveSpan ( 'external-api' , async span => {
const response = await fetch ( 'https://api.example.com/data' );
span . setAttribute ( 'http.url' , response . url );
span . setAttribute ( 'http.status' , response . status );
span . end ();
return response . json ();
});
}, {
telemetry ,
});
Observability Backends
OpenTelemetry integrates with many observability platforms:
Jaeger (open source, local development)
Grafana Tempo (open source)
Datadog
New Relic
Honeycomb
Lightstep
AWS X-Ray
Google Cloud Trace
Refer to the OpenTelemetry documentation for backend-specific configuration.
Best Practices
Use consistent naming
Use the same tracerName across all services for easier filtering in your observability backend.
Include version information
Set the version parameter to track changes and correlate issues with deployments.
Add custom attributes
Use span.setAttribute() to add context-specific information to your traces.
Trace external calls
Create spans for database queries, API calls, and other I/O operations to identify bottlenecks.
Start with sampling in production
Use sampling to reduce overhead and costs in high-volume production environments.
Enable metrics for production
Use enableMetrics: true to collect quantitative data alongside traces.
Sampling Configuration
For high-volume production systems, use sampling:
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node' ;
import { TraceIdRatioBasedSampler } from '@opentelemetry/sdk-trace-base' ;
const provider = new NodeTracerProvider ({
sampler: new TraceIdRatioBasedSampler ( 0.1 ), // Sample 10% of traces
resource ,
});
Telemetry adds minimal overhead (typically < 1ms per job)
Metrics are aggregated efficiently in memory
Spans are batched before export
Sampling reduces data volume in high-traffic systems
Metrics Built-in BullMQ metrics tracking
Queue Events Real-time job event monitoring
Going to Production Production deployment best practices
Workers Overview Configure and manage workers
API Reference