Sometimes, it is useful to break processor functions into small pieces that will be processed depending on the previous executed step. One way to handle this kind of logic is by using switch statements.
Basic Step Pattern
enum Step {
Initial ,
Second ,
Finish ,
}
const worker = new Worker (
'queueName' ,
async job => {
let step = job . data . step ;
while ( step !== Step . Finish ) {
switch ( step ) {
case Step . Initial : {
await doInitialStepStuff ();
await job . updateData ({
step: Step . Second ,
});
step = Step . Second ;
break ;
}
case Step . Second : {
await doSecondStepStuff ();
await job . updateData ({
step: Step . Finish ,
});
step = Step . Finish ;
return Step . Finish ;
}
default : {
throw new Error ( 'invalid step' );
}
}
}
},
{ connection },
);
By saving the next step value every time we complete the previous step (here, saving it in the job’s data), we can ensure that if the job errors and retries, it does so starting from the correct step.
Delaying Between Steps
You can delay a job between steps using the moveToDelayed method:
import { DelayedError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Finish ,
}
const worker = new Worker (
'queueName' ,
async ( job : Job , token ?: string ) => {
let step = job . data . step ;
while ( step !== Step . Finish ) {
switch ( step ) {
case Step . Initial : {
await doInitialStepStuff ();
await job . moveToDelayed ( Date . now () + 200 , token );
await job . updateData ({
step: Step . Second ,
});
throw new DelayedError ();
}
case Step . Second : {
await doSecondStepStuff ();
await job . updateData ({
step: Step . Finish ,
});
step = Step . Finish ;
}
default : {
throw new Error ( 'invalid step' );
}
}
}
},
{ connection },
);
Waiting for Children
A common use case is to add children at runtime and then wait for the children to complete:
import { WaitingChildrenError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Third ,
Finish ,
}
const worker = new Worker (
'parentQueueName' ,
async ( job : Job , token ?: string ) => {
let step = job . data . step ;
while ( step !== Step . Finish ) {
switch ( step ) {
case Step . Initial : {
await doInitialStepStuff ();
await childrenQueue . add (
'child-1' ,
{ foo: 'bar' },
{
parent: {
id: job . id ,
queue: job . queueQualifiedName ,
},
},
);
await job . updateData ({
step: Step . Second ,
});
step = Step . Second ;
break ;
}
case Step . Second : {
await doSecondStepStuff ();
await childrenQueue . add (
'child-2' ,
{ foo: 'bar' },
{
parent: {
id: job . id ,
queue: job . queueQualifiedName ,
},
},
);
await job . updateData ({
step: Step . Third ,
});
step = Step . Third ;
break ;
}
case Step . Third : {
const shouldWait = await job . moveToWaitingChildren ( token );
if ( ! shouldWait ) {
await job . updateData ({
step: Step . Finish ,
});
step = Step . Finish ;
return Step . Finish ;
} else {
throw new WaitingChildrenError ();
}
}
default : {
throw new Error ( 'invalid step' );
}
}
}
},
{ connection },
);
Chaining Flows
Another use case is to add flows at runtime and then wait for the children to complete:
import { FlowProducer , WaitingChildrenError , Worker } from 'bullmq' ;
enum Step {
Initial ,
Second ,
Third ,
Finish ,
}
const flow = new FlowProducer ({ connection });
const worker = new Worker (
'parentQueueName' ,
async ( job , token ) => {
let step = job . data . step ;
while ( step !== Step . Finish ) {
switch ( step ) {
case Step . Initial : {
await doInitialStepStuff ();
await flow . add ({
name: 'child-job' ,
queueName: 'childrenQueueName' ,
data: {},
children: [
{
name ,
data: { idx: 0 , foo: 'bar' },
queueName: 'grandchildrenQueueName' ,
},
{
name ,
data: { idx: 1 , foo: 'baz' },
queueName: 'grandchildrenQueueName' ,
},
],
opts: {
parent: {
id: job . id ,
queue: job . queueQualifiedName ,
},
},
});
await job . updateData ({
step: Step . Second ,
});
step = Step . Second ;
break ;
}
case Step . Second : {
await doSecondStepStuff ();
await job . updateData ({
step: Step . Third ,
});
step = Step . Third ;
break ;
}
case Step . Third : {
const shouldWait = await job . moveToWaitingChildren ( token );
if ( ! shouldWait ) {
await job . updateData ({
step: Step . Finish ,
});
step = Step . Finish ;
return Step . Finish ;
} else {
throw new WaitingChildrenError ();
}
}
default : {
throw new Error ( 'invalid step' );
}
}
}
},
{ connection },
);
Manually moving jobs using special errors does not increment the attemptsMade property. This property is incremented on regular job completion or failure (this includes retries using a backoff strategy). To control how many times a job is allowed to start processing, use the maxStartedAttempts option.
Flows Learn about parent-child job dependencies
Job Data Updating job data during processing
Move To Delayed API reference for moveToDelayed
Move To Waiting Children API reference for moveToWaitingChildren