Overview
Stream triggers enable real-time, reactive workflows based on stream events. React to subscriptions, item updates, and stream operations with full access to subscription context and authentication data.
Stream Configuration
First, define a stream in a .stream.ts file:
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
dueDate: z.string().optional(),
completedAt: z.string().optional(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo',
schema: todoSchema,
onJoin: async (subscription, context, authContext) => {
context.logger.info('User joined stream', {
groupId: subscription.groupId,
authContext,
})
// Track watchers
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'increment', path: 'watching', by: 1 },
])
return { unauthorized: false }
},
onLeave: async (subscription, context, authContext) => {
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'decrement', path: 'watching', by: 1 },
])
context.logger.info('User left stream', { groupId: subscription.groupId })
},
}
export type Todo = z.infer<typeof todoSchema>
Stream Trigger Types
Motia provides three stream trigger types:
Stream Item Changes
React to stream item updates:
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'TodoUpdated',
triggers: [
{
type: 'stream',
stream_name: 'todo',
group_id: 'active-todos',
condition: (input) => {
return input.new_value?.completed === true
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('Todo completed', {
itemId: input.item_id,
todo: input.new_value,
})
await ctx.enqueue({
topic: 'analytics.todo-completed',
data: { todoId: input.item_id },
})
}
Join Triggers
React when users join a stream:
export const config = {
name: 'UserJoinedChat',
triggers: [
{
type: 'stream:join',
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('User joined', { subscription: input })
// Send welcome message
await ctx.streams.chat.push('messages', input.groupId, {
type: 'system',
message: `User ${input.userId} joined`,
timestamp: new Date().toISOString(),
})
}
Leave Triggers
React when users leave a stream:
export const config = {
name: 'UserLeftChat',
triggers: [
{
type: 'stream:leave',
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('User left', { subscription: input })
// Cleanup user presence
await ctx.state.delete('presence', input.userId)
}
Configuration Options
Stream Item Trigger
Name of the stream to monitor
Specific group to monitor (optional)
Specific item to monitor (optional)
Filter function for stream updates:condition: (input) => {
return input.new_value?.status === 'active'
}
Join/Leave Triggers
Either "stream:join" or "stream:leave"
No additional configuration needed - these triggers fire for all join/leave events.
Handler Signatures
Stream Update Handler
type StreamUpdateInput<T> = {
stream_name: string
group_id: string
item_id: string
old_value: T | null
new_value: T | null
}
type StreamUpdateHandler<T> = (
input: StreamUpdateInput<T>,
ctx: HandlerContext
) => Promise<void>
Join/Leave Handler
type SubscriptionInput = {
streamName: string
groupId: string
userId: string
connectionId: string
}
type SubscriptionHandler = (
input: SubscriptionInput,
ctx: HandlerContext
) => Promise<void>
Stream Operations
Use ctx.streams to interact with streams in handlers:
Push Updates
await ctx.streams.todo.push('todos', groupId, {
id: todoId,
description: 'New todo',
createdAt: new Date().toISOString(),
})
Update Items
await ctx.streams.todo.update('todos', todoId, [
{ type: 'set', path: 'completed', value: true },
{ type: 'set', path: 'completedAt', value: new Date().toISOString() },
])
Increment/Decrement
await ctx.streams.stats.update('counters', groupId, [
{ type: 'increment', path: 'views', by: 1 },
])
Delete Items
await ctx.streams.todo.delete('todos', todoId)
Common Patterns
Parallel Processing Coordination
Track completion of parallel tasks:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'ProcessParallelStep',
triggers: [
{
type: 'queue',
topic: 'parallel.step.process',
input: z.object({
traceId: z.string(),
stepIndex: z.number(),
}),
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('Processing step', { stepIndex: input.stepIndex })
// Do work
await processStep(input.stepIndex)
// Update completion counter in stream
await ctx.streams.parallelMerge.update('merge-groups', input.traceId, [
{ type: 'increment', path: 'completedSteps', by: 1 },
])
}
Then react when all complete:
export const config = {
name: 'ParallelComplete',
triggers: [
{
type: 'stream',
stream_name: 'parallelMerge',
group_id: 'merge-groups',
condition: (input) => {
return (
!!input.new_value &&
input.new_value.totalSteps === input.new_value.completedSteps
)
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
ctx.logger.info('All parallel steps completed', {
traceId: input.item_id,
total: input.new_value.totalSteps,
})
}
Presence Tracking
Track active users:
// Stream config with join/leave
export const config: StreamConfig = {
name: 'chat',
schema: messageSchema,
onJoin: async (subscription, context, authContext) => {
await context.state.set('presence', subscription.userId, {
roomId: subscription.groupId,
joinedAt: new Date().toISOString(),
userId: subscription.userId,
})
return { unauthorized: false }
},
onLeave: async (subscription, context) => {
await context.state.delete('presence', subscription.userId)
},
}
Real-time Analytics
Update analytics on stream events:
export const config = {
name: 'ViewCounterUpdated',
triggers: [
{
type: 'stream',
stream_name: 'analytics',
condition: (input) => {
return input.new_value?.views !== input.old_value?.views
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
const delta = input.new_value.views - (input.old_value?.views || 0)
await ctx.enqueue({
topic: 'analytics.record',
data: {
metric: 'views',
delta,
timestamp: new Date().toISOString(),
},
})
}
Notification System
Notify users on stream changes:
export const config = {
name: 'NotifyWatchers',
triggers: [
{
type: 'stream',
stream_name: 'todo',
condition: (input) => {
return (
input.old_value?.status !== 'completed' &&
input.new_value?.status === 'completed'
)
},
},
],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, ctx) => {
const watchers = await ctx.state.list('watchers')
for (const watcher of watchers) {
await ctx.enqueue({
topic: 'notification.send',
data: {
userId: watcher.userId,
message: `Todo ${input.item_id} completed`,
},
})
}
}
Module Configuration
Configure the stream module in motia.config.json:
{
"modules": {
"stream": {
"port": 3112,
"host": "0.0.0.0",
"auth_function": "authenticate-stream",
"adapter": {
"type": "redis",
"config": {
"url": "redis://localhost:6379"
}
}
}
}
}
Module Options
Function to authenticate stream connections
Storage adapter configuration
Supported Adapters
- kv_store - Local key-value store (development only)
- redis - Redis-backed streams (production)
Authentication
Define an auth function for stream connections:
export const config = {
name: 'AuthenticateStream',
// Auth functions are called internally, not triggered
} as const satisfies StepConfig
export const handler = async (authContext: any, ctx: any) => {
const token = authContext.token
if (!token) {
return { authorized: false, reason: 'No token provided' }
}
const user = await validateToken(token)
if (!user) {
return { authorized: false, reason: 'Invalid token' }
}
return {
authorized: true,
userId: user.id,
metadata: { role: user.role },
}
}
Stream triggers run on port 3112 by default. Connect clients via WebSocket at ws://localhost:3112
Use stream triggers for real-time coordination and state triggers for persistent state changes. Streams are optimized for ephemeral, real-time data.
Keep stream items small - they’re pushed to all connected clients. For large data, store in state and push only references.