When a worker completes a job, you often need to return data that can be accessed by other parts of your application. BullMQ provides several ways to handle job results.
Basic Return Values
Return data from your processor function:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'tasks' , async job => {
// Perform async processing
const result = await doSomeAsyncProcessing ( job . data );
// Return the result
return result ;
});
The return value can be any JSON-serializable object: strings, numbers, booleans, objects, arrays, or null.
Accessing Return Values
From Worker Events
Listen to the completed event on the worker:
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'tasks' , async job => {
const data = await processData ( job . data );
return { success: true , data };
});
worker . on ( 'completed' , ( job , returnvalue ) => {
console . log ( `Job ${ job . id } completed with result:` , returnvalue );
// Output: { success: true, data: ... }
});
From Queue Events
Use QueueEvents to listen across all workers:
import { QueueEvents } from 'bullmq' ;
const queueEvents = new QueueEvents ( 'tasks' );
queueEvents . on ( 'completed' , ({ jobId , returnvalue }) => {
console . log ( `Job ${ jobId } completed with:` , returnvalue );
});
QueueEvents provides a global view of all job events, regardless of which worker processed the job.
From the Job Instance
Retrieve the return value later:
import { Queue , Job } from 'bullmq' ;
const queue = new Queue ( 'tasks' );
// Add a job
const job = await queue . add ( 'process-user' , { userId: 123 });
// Wait for completion
await job . waitUntilFinished ( queueEvents );
// Access the return value
const result = job . returnvalue ;
console . log ( 'Result:' , result );
Using getJob
Fetch the job and its result later:
import { Queue } from 'bullmq' ;
const queue = new Queue ( 'tasks' );
// Add a job and store its ID
const job = await queue . add ( 'process-data' , { value: 42 });
const jobId = job . id ;
// Later, retrieve the job
const retrievedJob = await queue . getJob ( jobId );
if ( retrievedJob && await retrievedJob . isCompleted ()) {
console . log ( 'Return value:' , retrievedJob . returnvalue );
}
Return Value Examples
Simple Values
// Return a string
const worker1 = new Worker ( 'tasks' , async job => {
return 'Success' ;
});
// Return a number
const worker2 = new Worker ( 'calculations' , async job => {
return 42 ;
});
// Return a boolean
const worker3 = new Worker ( 'checks' , async job => {
return true ;
});
Complex Objects
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'user-processing' , async job => {
const user = await fetchUser ( job . data . userId );
const profile = await enrichProfile ( user );
const stats = await calculateStats ( user );
return {
userId: user . id ,
email: user . email ,
profileComplete: profile . completeness ,
stats: {
posts: stats . postCount ,
followers: stats . followerCount ,
},
processedAt: new Date (). toISOString (),
};
});
worker . on ( 'completed' , ( job , result ) => {
console . log ( 'User processed:' , result . userId );
console . log ( 'Profile completeness:' , result . profileComplete );
console . log ( 'Stats:' , result . stats );
});
Arrays
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'batch-process' , async job => {
const items = job . data . items ;
const results = [];
for ( const item of items ) {
const processed = await processItem ( item );
results . push ({
id: item . id ,
status: processed . success ? 'completed' : 'failed' ,
timestamp: Date . now (),
});
}
return results ;
});
Storing Results Reliably
Storing data in the completed event handler is less reliable because the event handler could fail while the job still completes successfully.
Not recommended:
// ❌ Less reliable - event handler can fail
worker . on ( 'completed' , async ( job , result ) => {
// If this fails, the job is still marked completed
// but the result isn't stored!
await database . saveResult ( result );
});
Recommended:
// ✅ More reliable - part of job processing
const worker = new Worker ( 'tasks' , async job => {
const result = await processData ( job . data );
// Store result as part of the job
await database . saveResult ( result );
// Return the result as well
return result ;
});
If storing the result fails in the processor, the job will fail and can be retried. If storing fails in an event handler, the job is already completed and the failure goes unnoticed.
Using a Results Queue
For robust microservice architectures, use a dedicated results queue:
import { Queue , Worker } from 'bullmq' ;
// Processing queue
const taskQueue = new Queue ( 'tasks' );
const resultsQueue = new Queue ( 'results' );
// Task worker that produces results
const taskWorker = new Worker ( 'tasks' , async job => {
const result = await processTask ( job . data );
// Send result to results queue
await resultsQueue . add ( 'store-result' , {
taskId: job . id ,
result: result ,
processedAt: new Date (). toISOString (),
});
return result ;
});
// Results worker that stores results
const resultsWorker = new Worker ( 'results' , async job => {
// Reliably store the result
await database . saveResult ( job . data );
// Notify other services if needed
await notificationService . send ({
type: 'task-completed' ,
taskId: job . data . taskId ,
});
});
Benefits:
Reliability : Results are stored in Redis until successfully processed
Decoupling : Result storage can be handled by a separate service
Retry logic : Failed storage operations are automatically retried
Resilience : If the results service is down, results queue up automatically
Microservice Pattern
Chain services together using result queues:
import { Queue , Worker } from 'bullmq' ;
// Service 1: Data Fetching
const fetchQueue = new Queue ( 'fetch-data' );
const processQueue = new Queue ( 'process-data' );
const fetchWorker = new Worker ( 'fetch-data' , async job => {
const data = await externalAPI . fetch ( job . data . url );
// Send to processing queue
await processQueue . add ( 'process' , {
originalJobId: job . id ,
data: data ,
});
return { fetched: true , recordCount: data . length };
});
// Service 2: Data Processing
const storageQueue = new Queue ( 'store-data' );
const processWorker = new Worker ( 'process-data' , async job => {
const processed = await transform ( job . data . data );
// Send to storage queue
await storageQueue . add ( 'store' , {
originalJobId: job . data . originalJobId ,
processed: processed ,
});
return { processed: true , itemCount: processed . length };
});
// Service 3: Data Storage
const storageWorker = new Worker ( 'store-data' , async job => {
await database . bulkInsert ( job . data . processed );
return {
stored: true ,
originalJobId: job . data . originalJobId ,
};
});
Combining Return Values with Progress
import { Worker } from 'bullmq' ;
const worker = new Worker ( 'batch-upload' , async job => {
const files = job . data . files ;
const uploaded = [];
for ( let i = 0 ; i < files . length ; i ++ ) {
const file = files [ i ];
// Report progress
await job . updateProgress ({
current: i + 1 ,
total: files . length ,
currentFile: file . name ,
});
// Upload and track result
const result = await uploadFile ( file );
uploaded . push ({
filename: file . name ,
url: result . url ,
size: result . size ,
});
}
// Return final results
return {
totalFiles: files . length ,
uploaded: uploaded ,
totalSize: uploaded . reduce (( sum , f ) => sum + f . size , 0 ),
};
});
worker . on ( 'progress' , ( job , progress ) => {
console . log ( `Upload progress: ${ progress . current } / ${ progress . total } ` );
console . log ( `Current file: ${ progress . currentFile } ` );
});
worker . on ( 'completed' , ( job , result ) => {
console . log ( `Uploaded ${ result . totalFiles } files` );
console . log ( `Total size: ${ result . totalSize } bytes` );
});
TypeScript Type Safety
Define return value types:
import { Worker , Job } from 'bullmq' ;
interface TaskData {
userId : string ;
operation : string ;
}
interface TaskResult {
success : boolean ;
message : string ;
data ?: any ;
timestamp : number ;
}
const worker = new Worker < TaskData , TaskResult >(
'tasks' ,
async ( job : Job < TaskData , TaskResult >) => {
const result : TaskResult = {
success: true ,
message: `Processed ${ job . data . operation } for ${ job . data . userId } ` ,
timestamp: Date . now (),
};
return result ; // Type-checked!
},
);
worker . on ( 'completed' , ( job , returnvalue : TaskResult ) => {
// returnvalue is typed as TaskResult
console . log ( returnvalue . message );
console . log ( returnvalue . timestamp );
});
Best Practices
Return meaningful data
Include useful information like IDs, counts, timestamps, and status indicators.
Keep return values JSON-serializable
Avoid functions, circular references, or non-serializable objects.
Store critical data in the processor
Don’t rely solely on event handlers for important data persistence.
Use results queues for microservices
Decouple services and improve reliability with dedicated results queues.
Include context in return values
Add timestamps, job IDs, and other metadata to make results self-describing.
Handle large results appropriately
For large data, store in external storage (S3, database) and return references.
Handling Large Results
For large data, avoid storing everything in Redis:
import { Worker } from 'bullmq' ;
import { uploadToS3 } from './storage' ;
const worker = new Worker ( 'large-data' , async job => {
const largeData = await generateLargeDataset ( job . data );
// Upload to external storage
const s3Url = await uploadToS3 ( largeData , {
bucket: 'my-bucket' ,
key: `results/ ${ job . id } .json` ,
});
// Return reference, not the data itself
return {
dataUrl: s3Url ,
size: largeData . length ,
generatedAt: new Date (). toISOString (),
};
});
Workers Overview Learn about worker basics
Queue Events Monitor job events globally
Job API Job instance methods and properties
Retrying Failing Jobs Handle job failures
API Reference