Overview
WorkerOptions extends QueueBaseOptions and provides configuration for Worker instances.
Interface
interface WorkerOptions extends QueueBaseOptions {
name?: string;
autorun?: boolean;
concurrency?: number;
limiter?: RateLimiterOptions;
metrics?: MetricsOptions;
maximumRateLimitDelay?: number;
maxStartedAttempts?: number;
maxStalledCount?: number;
stalledInterval?: number;
removeOnComplete?: KeepJobs;
removeOnFail?: KeepJobs;
skipStalledCheck?: boolean;
skipLockRenewal?: boolean;
drainDelay?: number;
lockDuration?: number;
lockRenewTime?: number;
runRetryDelay?: number;
settings?: AdvancedOptions;
useWorkerThreads?: boolean;
}
Properties
connection
connection
ConnectionOptions
required
Options for connecting to a Redis instance
name
Optional worker name stored on every job processed by this worker
autorun
Whether to start processing jobs automatically on instantiation
concurrency
Number of jobs that a single worker can process in parallel
limiter
Rate limiter configuration
metrics
Metrics collection configuration
maximumRateLimitDelay
Maximum time in milliseconds where the job is idle while being rate limited
maxStartedAttempts
Maximum number of times a job can start processing before being moved to failed
maxStalledCount
Number of times a job can be recovered from stalled state before moving to failed
stalledInterval
Number of milliseconds between stallness checks
removeOnComplete
Specifies max age and/or count of jobs to keep when completed
removeOnFail
Specifies max age and/or count of jobs to keep when failed
skipStalledCheck
Skip stalled check for this worker
skipLockRenewal
Skip automatic lock renewal for this worker
drainDelay
Number of seconds to long poll for jobs when the queue is empty
lockDuration
Duration of the lock for the job in milliseconds
lockRenewTime
Time in milliseconds before the lock is automatically renewed (default: lockDuration / 2)
runRetryDelay
Internal option for retry delay
settings
Advanced worker settings including custom backoff strategies
useWorkerThreads
Use Worker Threads instead of Child Processes for sandboxed processors
Examples
Basic Worker
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job) => {
// Process job
return { processed: true };
}, {
connection: {
host: 'localhost',
port: 6379,
},
});
With Concurrency
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
concurrency: 10, // Process up to 10 jobs concurrently
});
With Rate Limiter
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
limiter: {
max: 100, // Max 100 jobs
duration: 60000, // Per 60 seconds
},
});
With Auto-Cleanup
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
removeOnComplete: {
age: 3600, // Keep completed jobs for 1 hour
count: 1000, // Keep max 1000 completed jobs
},
removeOnFail: {
age: 86400, // Keep failed jobs for 24 hours
},
});
Named Worker
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
name: 'worker-1', // Name appears in job.processedBy
});
With Sandboxed Processor
const worker = new Worker(
'myQueue',
'./processor.js', // Path to processor file
{
connection: {
host: 'localhost',
port: 6379,
},
useWorkerThreads: true, // Use worker threads
concurrency: 4,
}
);
With Custom Stalled Settings
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
maxStalledCount: 3, // Allow job to stall 3 times
stalledInterval: 60000, // Check for stalled jobs every minute
lockDuration: 60000, // Lock jobs for 1 minute
});
Manual Run
const worker = new Worker('myQueue', processor, {
connection: {
host: 'localhost',
port: 6379,
},
autorun: false, // Don't start automatically
});
// Start manually later
await worker.run();
KeepJobs Type
type KeepJobs = boolean | number | {
age?: number; // Max age in seconds
count?: number; // Max count to keep
};