Skip to main content

Overview

Stream triggers enable real-time, reactive workflows based on stream events. React to subscriptions, item updates, and stream operations with full access to subscription context and authentication data.

Stream Configuration

First, define a stream in a .stream.ts file:
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  description: z.string(),
  createdAt: z.string(),
  dueDate: z.string().optional(),
  completedAt: z.string().optional(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'todo',
  schema: todoSchema,
  
  onJoin: async (subscription, context, authContext) => {
    context.logger.info('User joined stream', {
      groupId: subscription.groupId,
      authContext,
    })
    
    // Track watchers
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'increment', path: 'watching', by: 1 },
    ])
    
    return { unauthorized: false }
  },
  
  onLeave: async (subscription, context, authContext) => {
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'decrement', path: 'watching', by: 1 },
    ])
    
    context.logger.info('User left stream', { groupId: subscription.groupId })
  },
}

export type Todo = z.infer<typeof todoSchema>

Stream Trigger Types

Motia provides three stream trigger types:

Stream Item Changes

React to stream item updates:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'TodoUpdated',
  triggers: [
    {
      type: 'stream',
      stream_name: 'todo',
      group_id: 'active-todos',
      condition: (input) => {
        return input.new_value?.completed === true
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Todo completed', {
    itemId: input.item_id,
    todo: input.new_value,
  })
  
  await ctx.enqueue({
    topic: 'analytics.todo-completed',
    data: { todoId: input.item_id },
  })
}

Join Triggers

React when users join a stream:
export const config = {
  name: 'UserJoinedChat',
  triggers: [
    {
      type: 'stream:join',
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('User joined', { subscription: input })
  
  // Send welcome message
  await ctx.streams.chat.push('messages', input.groupId, {
    type: 'system',
    message: `User ${input.userId} joined`,
    timestamp: new Date().toISOString(),
  })
}

Leave Triggers

React when users leave a stream:
export const config = {
  name: 'UserLeftChat',
  triggers: [
    {
      type: 'stream:leave',
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('User left', { subscription: input })
  
  // Cleanup user presence
  await ctx.state.delete('presence', input.userId)
}

Configuration Options

Stream Item Trigger

type
string
required
Must be "stream"
stream_name
string
Name of the stream to monitor
group_id
string
Specific group to monitor (optional)
item_id
string
Specific item to monitor (optional)
condition
function
Filter function for stream updates:
condition: (input) => {
  return input.new_value?.status === 'active'
}

Join/Leave Triggers

type
string
required
Either "stream:join" or "stream:leave"
No additional configuration needed - these triggers fire for all join/leave events.

Handler Signatures

Stream Update Handler

type StreamUpdateInput<T> = {
  stream_name: string
  group_id: string
  item_id: string
  old_value: T | null
  new_value: T | null
}

type StreamUpdateHandler<T> = (
  input: StreamUpdateInput<T>,
  ctx: HandlerContext
) => Promise<void>

Join/Leave Handler

type SubscriptionInput = {
  streamName: string
  groupId: string
  userId: string
  connectionId: string
}

type SubscriptionHandler = (
  input: SubscriptionInput,
  ctx: HandlerContext
) => Promise<void>

Stream Operations

Use ctx.streams to interact with streams in handlers:

Push Updates

await ctx.streams.todo.push('todos', groupId, {
  id: todoId,
  description: 'New todo',
  createdAt: new Date().toISOString(),
})

Update Items

await ctx.streams.todo.update('todos', todoId, [
  { type: 'set', path: 'completed', value: true },
  { type: 'set', path: 'completedAt', value: new Date().toISOString() },
])

Increment/Decrement

await ctx.streams.stats.update('counters', groupId, [
  { type: 'increment', path: 'views', by: 1 },
])

Delete Items

await ctx.streams.todo.delete('todos', todoId)

Common Patterns

Parallel Processing Coordination

Track completion of parallel tasks:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'ProcessParallelStep',
  triggers: [
    {
      type: 'queue',
      topic: 'parallel.step.process',
      input: z.object({
        traceId: z.string(),
        stepIndex: z.number(),
      }),
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Processing step', { stepIndex: input.stepIndex })
  
  // Do work
  await processStep(input.stepIndex)
  
  // Update completion counter in stream
  await ctx.streams.parallelMerge.update('merge-groups', input.traceId, [
    { type: 'increment', path: 'completedSteps', by: 1 },
  ])
}
Then react when all complete:
export const config = {
  name: 'ParallelComplete',
  triggers: [
    {
      type: 'stream',
      stream_name: 'parallelMerge',
      group_id: 'merge-groups',
      condition: (input) => {
        return (
          !!input.new_value &&
          input.new_value.totalSteps === input.new_value.completedSteps
        )
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('All parallel steps completed', {
    traceId: input.item_id,
    total: input.new_value.totalSteps,
  })
}

Presence Tracking

Track active users:
// Stream config with join/leave
export const config: StreamConfig = {
  name: 'chat',
  schema: messageSchema,
  
  onJoin: async (subscription, context, authContext) => {
    await context.state.set('presence', subscription.userId, {
      roomId: subscription.groupId,
      joinedAt: new Date().toISOString(),
      userId: subscription.userId,
    })
    
    return { unauthorized: false }
  },
  
  onLeave: async (subscription, context) => {
    await context.state.delete('presence', subscription.userId)
  },
}

Real-time Analytics

Update analytics on stream events:
export const config = {
  name: 'ViewCounterUpdated',
  triggers: [
    {
      type: 'stream',
      stream_name: 'analytics',
      condition: (input) => {
        return input.new_value?.views !== input.old_value?.views
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  const delta = input.new_value.views - (input.old_value?.views || 0)
  
  await ctx.enqueue({
    topic: 'analytics.record',
    data: {
      metric: 'views',
      delta,
      timestamp: new Date().toISOString(),
    },
  })
}

Notification System

Notify users on stream changes:
export const config = {
  name: 'NotifyWatchers',
  triggers: [
    {
      type: 'stream',
      stream_name: 'todo',
      condition: (input) => {
        return (
          input.old_value?.status !== 'completed' &&
          input.new_value?.status === 'completed'
        )
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, ctx) => {
  const watchers = await ctx.state.list('watchers')
  
  for (const watcher of watchers) {
    await ctx.enqueue({
      topic: 'notification.send',
      data: {
        userId: watcher.userId,
        message: `Todo ${input.item_id} completed`,
      },
    })
  }
}

Module Configuration

Configure the stream module in motia.config.json:
{
  "modules": {
    "stream": {
      "port": 3112,
      "host": "0.0.0.0",
      "auth_function": "authenticate-stream",
      "adapter": {
        "type": "redis",
        "config": {
          "url": "redis://localhost:6379"
        }
      }
    }
  }
}

Module Options

port
number
default:"3112"
WebSocket server port
host
string
default:"0.0.0.0"
Host address to bind to
auth_function
string
Function to authenticate stream connections
adapter
object
Storage adapter configuration

Supported Adapters

  • kv_store - Local key-value store (development only)
  • redis - Redis-backed streams (production)

Authentication

Define an auth function for stream connections:
export const config = {
  name: 'AuthenticateStream',
  // Auth functions are called internally, not triggered
} as const satisfies StepConfig

export const handler = async (authContext: any, ctx: any) => {
  const token = authContext.token
  
  if (!token) {
    return { authorized: false, reason: 'No token provided' }
  }
  
  const user = await validateToken(token)
  
  if (!user) {
    return { authorized: false, reason: 'Invalid token' }
  }
  
  return {
    authorized: true,
    userId: user.id,
    metadata: { role: user.role },
  }
}
Stream triggers run on port 3112 by default. Connect clients via WebSocket at ws://localhost:3112
Use stream triggers for real-time coordination and state triggers for persistent state changes. Streams are optimized for ephemeral, real-time data.
Keep stream items small - they’re pushed to all connected clients. For large data, store in state and push only references.

Build docs developers (and LLMs) love