Delayed jobs are placed into a special “delayed set” instead of being processed immediately. After the delay time has passed, the job moves to the waiting queue and is processed as a regular job.
The delay time is quite accurate in most cases, but it’s not guaranteed to be exact. Processing depends on worker availability and the number of other delayed jobs scheduled at that time.
Use the delay option with the amount of time in milliseconds to delay the job:
import { Queue } from 'bullmq';const myQueue = new Queue('Paint');// Add a job that will be delayed by at least 5 secondsawait myQueue.add('house', { color: 'white' }, { delay: 5000 });
From src/classes/job.ts:88:
export class Job { /** * An amount of milliseconds to wait until this job can be processed. * @defaultValue 0 */ delay = 0;}
You can reschedule a delayed job after inserting it using the changeDelay method:
import { Job, Queue } from 'bullmq';const queue = new Queue('Paint');const job = await Job.create(queue, 'test', { foo: 'bar' }, { delay: 2000 });// Reschedule the job to execute 4000ms (4 seconds) from nowawait job.changeDelay(4000);
From src/classes/job.ts:1048:
/** * Change delay of a delayed job. * * Reschedules a delayed job by setting a new delay from the current time. * For example, calling changeDelay(5000) will reschedule the job to execute * 5000 milliseconds (5 seconds) from now, regardless of the original delay. * * @param delay - milliseconds from now when the job should be processed. * @returns void * @throws JobNotExist * This exception is thrown if jobId is missing. * @throws JobNotInState * This exception is thrown if job is not in delayed state. */async changeDelay(delay: number): Promise<void> { await this.scripts.changeDelay(this.id, delay); this.delay = delay;}
Only jobs currently in the delayed state can have their delay changed. Attempting to change the delay of a job in another state will throw an error.
You can manually move an active job to the delayed state:
/** * Moves the job to the delay set. * * @param timestamp - timestamp when the job should be moved back to "wait" * @param token - token to check job is locked by current worker */await job.moveToDelayed(Date.now() + 5000, token);
interface DefaultJobOptions { /** * An amount of milliseconds to wait until this job can be processed. * Note that for accurate delays, worker and producers * should have their clocks synchronized. * @defaultValue 0 */ delay?: number;}