Overview
Queue triggers enable asynchronous, event-driven workflows. Steps subscribe to topics and process messages with automatic retry logic, concurrency control, and configurable delivery guarantees.
Basic Configuration
Define a queue trigger in your step config:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
quantity: z.number(),
petId: z.string(),
})
export const config = {
name: 'ProcessOrder',
triggers: [
{
type: 'queue',
topic: 'process-order',
input: orderSchema,
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
const { email, quantity, petId } = input
ctx.logger.info('Processing order', { email, quantity, petId })
const order = await createOrder({ email, quantity, petId })
await ctx.state.set('orders', order.id, order)
await ctx.enqueue({
topic: 'order.completed',
data: { orderId: order.id },
})
}
Configuration Options
Required Fields
Topic name to subscribe to. Use dot notation for hierarchical topics (e.g., order.created, user.updated)
Optional Fields
Zod schema for message validation. Messages that fail validation are sent to the dead-letter queue.
Queue-specific configuration:config: {
type: 'fifo' | 'standard',
maxRetries: 3,
concurrency: 10,
visibilityTimeout: 30000,
delaySeconds: 0,
backoffType: 'exponential' | 'linear',
backoffDelayMs: 1000,
}
Conditional function to filter messages:condition: (input, ctx) => {
return input.amount > 1000
}
Handler Signature
Queue handlers receive the message payload and context:
type QueueHandler<T> = (
input: T,
ctx: HandlerContext
) => Promise<void>
Publishing Messages
Publish messages to topics using enqueue:
export const handler: Handlers<typeof config> = async (request, ctx) => {
await ctx.enqueue({
topic: 'process-greeting',
data: {
timestamp: new Date().toISOString(),
appName: 'MyApp',
requestId: crypto.randomUUID(),
},
})
}
Queue Configuration
Queue Types
Standard Queue (default)
- Best-effort ordering
- Higher throughput
- At-least-once delivery
FIFO Queue
- Guaranteed ordering
- Exactly-once processing
- Lower throughput
{
type: 'queue',
topic: 'order.created',
config: {
type: 'fifo',
},
}
Retry Configuration
Control retry behavior for failed messages:
{
type: 'queue',
topic: 'payment.process',
config: {
maxRetries: 5,
backoffType: 'exponential',
backoffDelayMs: 2000,
},
}
Retry Options:
Maximum number of retry attempts before sending to dead-letter queue
backoffType
string
default:"exponential"
Retry backoff strategy: exponential or linear
Initial delay between retries in milliseconds
Concurrency Control
Limit how many messages are processed simultaneously:
{
type: 'queue',
topic: 'video.encode',
config: {
concurrency: 5, // Process up to 5 messages at once
},
}
Maximum number of concurrent message handlers
Visibility Timeout
Control how long a message is hidden from other consumers:
{
type: 'queue',
topic: 'long.process',
config: {
visibilityTimeout: 60000, // 60 seconds
},
}
Time in milliseconds before a message becomes visible again if not completed
Delayed Messages
Delay message delivery:
{
type: 'queue',
topic: 'reminder.send',
config: {
delaySeconds: 300, // Delay 5 minutes
},
}
Initial delay before messages become available for processing
Helper Functions
Use the queue() helper for cleaner syntax:
import { queue, step } from 'motia'
import { z } from 'zod'
const orderSchema = z.object({
email: z.string(),
quantity: z.number(),
petId: z.string(),
})
export const stepConfig = {
name: 'ProcessFoodOrder',
triggers: [
queue('process-food-order', { input: orderSchema }),
],
}
export const { config, handler } = step(stepConfig, async (input, ctx) => {
const data = ctx.getData()
// Handler logic
})
Multi-Trigger with Queue
Combine queue triggers with HTTP:
import { http, queue, step } from 'motia'
export const stepConfig = {
name: 'ProcessOrder',
triggers: [
queue('process-food-order', { input: orderSchema }),
http('POST', '/process-food-order', { bodySchema: orderSchema }),
],
}
export const { config, handler } = step(stepConfig, async (input, ctx) => {
const data = ctx.getData() // Works for both triggers
const order = await createOrder(data)
await ctx.state.set('orders', order.id, order)
return ctx.match({
http: async () => ({
status: 200,
body: { success: true, order },
}),
})
})
Conditional Processing
Filter messages based on content:
export const config = {
name: 'HighValueOrders',
triggers: [
{
type: 'queue',
topic: 'order.created',
input: z.object({
amount: z.number(),
description: z.string(),
}),
condition: (input) => input.amount > 1000,
},
],
} as const satisfies StepConfig
Module Configuration
Configure queue adapter in motia.config.json:
{
"modules": {
"queue": {
"adapter": {
"type": "rabbitmq",
"config": {
"url": "amqp://localhost:5672"
}
}
}
}
}
Supported Adapters
- builtin - In-memory queue (development only)
- redis - Redis-backed queue
- rabbitmq - RabbitMQ with full retry and DLQ support
Common Patterns
Chain Processing
export const handler: Handlers<typeof config> = async (input, ctx) => {
// Process current step
const result = await processOrder(input)
// Trigger next step
await ctx.enqueue({
topic: 'order.fulfilled',
data: { orderId: result.id },
})
}
Fan-out Pattern
export const handler: Handlers<typeof config> = async (input, ctx) => {
const tasks = ['notify.email', 'notify.sms', 'notify.webhook']
await Promise.all(
tasks.map(topic =>
ctx.enqueue({ topic, data: input })
)
)
}
Error Handling
export const handler: Handlers<typeof config> = async (input, ctx) => {
try {
await processMessage(input)
} catch (error) {
ctx.logger.error('Processing failed', { error, input })
// Message will be retried based on retry config
throw error
}
}
State Integration
export const handler: Handlers<typeof config> = async (input, ctx) => {
// Save to state
await ctx.state.set('orders', input.orderId, {
status: 'processing',
timestamp: new Date().toISOString(),
})
// Process order
const result = await processOrder(input)
// Update state
await ctx.state.set('orders', input.orderId, {
status: 'completed',
result,
})
}
Messages that exceed maxRetries are automatically moved to the dead-letter queue for manual inspection.
Use FIFO queues when order matters (e.g., processing events for a single user in sequence). Use standard queues for higher throughput when order doesn’t matter.