Skip to main content

Overview

Queue triggers enable asynchronous, event-driven workflows. Steps subscribe to topics and process messages with automatic retry logic, concurrency control, and configurable delivery guarantees.

Basic Configuration

Define a queue trigger in your step config:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  email: z.string(),
  quantity: z.number(),
  petId: z.string(),
})

export const config = {
  name: 'ProcessOrder',
  triggers: [
    {
      type: 'queue',
      topic: 'process-order',
      input: orderSchema,
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  const { email, quantity, petId } = input
  
  ctx.logger.info('Processing order', { email, quantity, petId })
  
  const order = await createOrder({ email, quantity, petId })
  await ctx.state.set('orders', order.id, order)
  
  await ctx.enqueue({
    topic: 'order.completed',
    data: { orderId: order.id },
  })
}

Configuration Options

Required Fields

type
string
required
Must be "queue"
topic
string
required
Topic name to subscribe to. Use dot notation for hierarchical topics (e.g., order.created, user.updated)

Optional Fields

input
ZodSchema
Zod schema for message validation. Messages that fail validation are sent to the dead-letter queue.
config
object
Queue-specific configuration:
config: {
  type: 'fifo' | 'standard',
  maxRetries: 3,
  concurrency: 10,
  visibilityTimeout: 30000,
  delaySeconds: 0,
  backoffType: 'exponential' | 'linear',
  backoffDelayMs: 1000,
}
condition
function
Conditional function to filter messages:
condition: (input, ctx) => {
  return input.amount > 1000
}

Handler Signature

Queue handlers receive the message payload and context:
type QueueHandler<T> = (
  input: T,
  ctx: HandlerContext
) => Promise<void>

Publishing Messages

Publish messages to topics using enqueue:
export const handler: Handlers<typeof config> = async (request, ctx) => {
  await ctx.enqueue({
    topic: 'process-greeting',
    data: {
      timestamp: new Date().toISOString(),
      appName: 'MyApp',
      requestId: crypto.randomUUID(),
    },
  })
}

Queue Configuration

Queue Types

Standard Queue (default)
  • Best-effort ordering
  • Higher throughput
  • At-least-once delivery
FIFO Queue
  • Guaranteed ordering
  • Exactly-once processing
  • Lower throughput
{
  type: 'queue',
  topic: 'order.created',
  config: {
    type: 'fifo',
  },
}

Retry Configuration

Control retry behavior for failed messages:
{
  type: 'queue',
  topic: 'payment.process',
  config: {
    maxRetries: 5,
    backoffType: 'exponential',
    backoffDelayMs: 2000,
  },
}
Retry Options:
maxRetries
number
default:"3"
Maximum number of retry attempts before sending to dead-letter queue
backoffType
string
default:"exponential"
Retry backoff strategy: exponential or linear
backoffDelayMs
number
default:"1000"
Initial delay between retries in milliseconds

Concurrency Control

Limit how many messages are processed simultaneously:
{
  type: 'queue',
  topic: 'video.encode',
  config: {
    concurrency: 5, // Process up to 5 messages at once
  },
}
concurrency
number
default:"10"
Maximum number of concurrent message handlers

Visibility Timeout

Control how long a message is hidden from other consumers:
{
  type: 'queue',
  topic: 'long.process',
  config: {
    visibilityTimeout: 60000, // 60 seconds
  },
}
visibilityTimeout
number
default:"30000"
Time in milliseconds before a message becomes visible again if not completed

Delayed Messages

Delay message delivery:
{
  type: 'queue',
  topic: 'reminder.send',
  config: {
    delaySeconds: 300, // Delay 5 minutes
  },
}
delaySeconds
number
default:"0"
Initial delay before messages become available for processing

Helper Functions

Use the queue() helper for cleaner syntax:
import { queue, step } from 'motia'
import { z } from 'zod'

const orderSchema = z.object({
  email: z.string(),
  quantity: z.number(),
  petId: z.string(),
})

export const stepConfig = {
  name: 'ProcessFoodOrder',
  triggers: [
    queue('process-food-order', { input: orderSchema }),
  ],
}

export const { config, handler } = step(stepConfig, async (input, ctx) => {
  const data = ctx.getData()
  // Handler logic
})

Multi-Trigger with Queue

Combine queue triggers with HTTP:
import { http, queue, step } from 'motia'

export const stepConfig = {
  name: 'ProcessOrder',
  triggers: [
    queue('process-food-order', { input: orderSchema }),
    http('POST', '/process-food-order', { bodySchema: orderSchema }),
  ],
}

export const { config, handler } = step(stepConfig, async (input, ctx) => {
  const data = ctx.getData() // Works for both triggers
  
  const order = await createOrder(data)
  await ctx.state.set('orders', order.id, order)
  
  return ctx.match({
    http: async () => ({
      status: 200,
      body: { success: true, order },
    }),
  })
})

Conditional Processing

Filter messages based on content:
export const config = {
  name: 'HighValueOrders',
  triggers: [
    {
      type: 'queue',
      topic: 'order.created',
      input: z.object({
        amount: z.number(),
        description: z.string(),
      }),
      condition: (input) => input.amount > 1000,
    },
  ],
} as const satisfies StepConfig

Module Configuration

Configure queue adapter in motia.config.json:
{
  "modules": {
    "queue": {
      "adapter": {
        "type": "rabbitmq",
        "config": {
          "url": "amqp://localhost:5672"
        }
      }
    }
  }
}

Supported Adapters

  • builtin - In-memory queue (development only)
  • redis - Redis-backed queue
  • rabbitmq - RabbitMQ with full retry and DLQ support

Common Patterns

Chain Processing

export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Process current step
  const result = await processOrder(input)
  
  // Trigger next step
  await ctx.enqueue({
    topic: 'order.fulfilled',
    data: { orderId: result.id },
  })
}

Fan-out Pattern

export const handler: Handlers<typeof config> = async (input, ctx) => {
  const tasks = ['notify.email', 'notify.sms', 'notify.webhook']
  
  await Promise.all(
    tasks.map(topic => 
      ctx.enqueue({ topic, data: input })
    )
  )
}

Error Handling

export const handler: Handlers<typeof config> = async (input, ctx) => {
  try {
    await processMessage(input)
  } catch (error) {
    ctx.logger.error('Processing failed', { error, input })
    
    // Message will be retried based on retry config
    throw error
  }
}

State Integration

export const handler: Handlers<typeof config> = async (input, ctx) => {
  // Save to state
  await ctx.state.set('orders', input.orderId, {
    status: 'processing',
    timestamp: new Date().toISOString(),
  })
  
  // Process order
  const result = await processOrder(input)
  
  // Update state
  await ctx.state.set('orders', input.orderId, {
    status: 'completed',
    result,
  })
}
Messages that exceed maxRetries are automatically moved to the dead-letter queue for manual inspection.
Use FIFO queues when order matters (e.g., processing events for a single user in sequence). Use standard queues for higher throughput when order doesn’t matter.

Build docs developers (and LLMs) love