Streaming
Motia provides built-in real-time streaming capabilities through WebSockets, allowing you to push live updates from long-running or asynchronous workflows to clients without polling.Stream Configuration
Define streams using aStreamConfig file:
// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
dueDate: z.string().optional(),
completedAt: z.string().optional(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo',
schema: todoSchema,
}
export type Todo = z.infer<typeof todoSchema>
Stream Operations
All steps can access streams via thestreams context:
Set, Update, Delete
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
description: z.string(),
dueDate: z.string().optional(),
})
export const config = {
name: 'CreateTodo',
triggers: [{ type: 'http', method: 'POST', path: '/todos', bodySchema }],
flows: ['todo'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { streams, logger }) => {
const todo = {
id: crypto.randomUUID(),
description: request.body.description,
dueDate: request.body.dueDate,
createdAt: new Date().toISOString(),
}
// Set stream item - broadcasts to all subscribers
await streams.todo.set('todos', todo.id, todo)
logger.info('Todo created and broadcast', { todoId: todo.id })
return {
status: 201,
body: { todo },
}
}
Atomic Updates
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'CompleteTodo',
triggers: [{ type: 'http', method: 'PATCH', path: '/todos/:id/complete' }],
flows: ['todo'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
const { id } = request.params
// Atomic update - broadcasts to subscribers
await streams.todo.update('todos', id, [
{ type: 'set', path: 'completedAt', value: new Date().toISOString() },
{ type: 'merge', path: 'metadata', value: { completedBy: 'user-123' } },
])
return { status: 200, body: { success: true } }
}
Delete Items
export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
const { id } = request.params
// Delete and broadcast removal
await streams.todo.delete('todos', id)
return { status: 204 }
}
List Operations
export const handler: Handlers<typeof config> = async (_, { streams }) => {
// List all items in a group
const todos = await streams.todo.list('todos')
// List all groups in a stream
const groups = await streams.todo.listGroups()
return { status: 200, body: { todos, groups } }
}
Stream Lifecycle Hooks
React to client connections and disconnections:// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const todoSchema = z.object({
id: z.string(),
description: z.string(),
createdAt: z.string(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'todo',
schema: todoSchema,
onJoin: async (subscription, context, authContext) => {
// Track active watchers
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'increment', path: 'watching', by: 1 },
])
context.logger.info('Todo stream joined', {
groupId: subscription.groupId,
authContext
})
// Return authorization decision
return { unauthorized: false }
},
onLeave: async (subscription, context, authContext) => {
// Decrement watcher count
await context.streams.inbox.update('watching', subscription.groupId, [
{ type: 'decrement', path: 'watching', by: 1 },
])
context.logger.info('Todo stream left', { groupId: subscription.groupId })
},
}
export type Todo = z.infer<typeof todoSchema>
Stream Client Usage
Connect to streams from the browser or Node.js:Browser Client
import { StreamClient } from '@motiadev/stream-client-browser'
const client = new StreamClient({
url: 'ws://localhost:3222',
})
await client.connect()
// Subscribe to a stream group
const todoSub = client.group('todo', 'todos')
// Listen for all events
todoSub.on('*', (state) => {
console.log('Todos updated:', state)
renderTodoList(state)
})
// Listen for specific events
todoSub.on('create', (newTodo) => {
console.log('New todo created:', newTodo)
})
todoSub.on('update', (updatedTodo) => {
console.log('Todo updated:', updatedTodo)
})
todoSub.on('delete', (deletedTodo) => {
console.log('Todo deleted:', deletedTodo)
})
// Get current state
const currentState = todoSub.getState()
console.log('Current todos:', currentState)
React Integration
import { StreamClient } from '@motiadev/stream-client-react'
import { useEffect, useState } from 'react'
interface Todo {
id: string
description: string
completedAt?: string
}
function TodoList() {
const [todos, setTodos] = useState<Todo[]>([])
const [client] = useState(() => new StreamClient({ url: 'ws://localhost:3222' }))
useEffect(() => {
client.connect()
const subscription = client.group<Todo>('todo', 'todos')
// Update state on any change
subscription.on('*', (state) => {
setTodos(state)
})
return () => {
subscription.close()
client.disconnect()
}
}, [])
return (
<div>
<h1>Todos ({todos.length})</h1>
<ul>
{todos.map((todo) => (
<li key={todo.id}>
<span style={{ textDecoration: todo.completedAt ? 'line-through' : 'none' }}>
{todo.description}
</span>
</li>
))}
</ul>
</div>
)
}
Node.js Client
import { StreamClient } from '@motiadev/stream-client-node'
const client = new StreamClient({
url: 'ws://localhost:3222',
})
await client.connect()
const subscription = client.group('metrics', 'system-metrics')
subscription.on('update', (metrics) => {
console.log('System metrics updated:', metrics)
// Process metrics, send alerts, etc.
})
Parallel Merge with Streams
Coordinate parallel operations with real-time progress updates:Stream Definition
// streams/parallel-merge.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'
const parallelMergeSchema = z.object({
startedAt: z.number(),
totalSteps: z.number(),
completedSteps: z.number(),
})
export const config: StreamConfig = {
baseConfig: { storageType: 'default' },
name: 'parallelMerge',
schema: parallelMergeSchema,
}
export type ParallelMergeStreamItem = z.infer<typeof parallelMergeSchema>
Initiate Parallel Work
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
traceId: z.string().default(() => crypto.randomUUID()),
totalSteps: z.number().default(3),
})
export const config = {
name: 'StartStreamParallelMerge',
triggers: [{ type: 'http', method: 'POST', path: '/stream-parallel-merge', bodySchema }],
enqueues: ['step.process'],
flows: ['stream-parallel-merge'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { streams, enqueue, logger }) => {
const { traceId, totalSteps } = request.body
logger.info('Starting stream parallel merge', { traceId, totalSteps })
// Initialize stream state
await streams.parallelMerge.set('merge-groups', traceId, {
totalSteps,
startedAt: Date.now(),
completedSteps: 0,
})
// Enqueue parallel work
await Promise.all(
Array.from({ length: totalSteps }, (_, stepIndex) =>
enqueue({
topic: 'step.process',
data: { traceId, stepIndex },
})
)
)
return {
status: 200,
body: { traceId, totalSteps },
}
}
Process Steps and Update Stream
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
export const config = {
name: 'StreamStepProcess',
triggers: [
{
type: 'queue',
topic: 'step.process',
input: z.object({ traceId: z.string(), stepIndex: z.number() }),
},
],
flows: ['stream-parallel-merge'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (request, { streams, logger }) => {
const { traceId, stepIndex } = request
logger.info('Processing step', { traceId, stepIndex })
// Simulate work
await new Promise((resolve) => setTimeout(resolve, 1000))
// Atomically increment - broadcasts to subscribers
await streams.parallelMerge.update('merge-groups', traceId, [
{ type: 'increment', path: 'completedSteps', by: 1 },
])
logger.info('Step completed', { traceId, stepIndex })
}
Stream Triggers
React to stream events automatically:import type { Handlers, StepConfig, StreamTriggerInput } from 'motia'
import type { ParallelMergeStreamItem } from './parallel-merge.stream'
export const config = {
name: 'StreamMergeComplete',
triggers: [
{
type: 'stream',
streamName: 'parallelMerge',
groupId: 'merge-groups',
condition: (input: StreamTriggerInput<ParallelMergeStreamItem>) => {
return (
input.event.type === 'update' &&
input.event.data.completedSteps === input.event.data.totalSteps
)
},
},
],
flows: ['stream-parallel-merge'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (request, { logger }) => {
const result = request.event.data
const duration = Date.now() - result.startedAt
logger.info('Parallel merge completed', {
totalSteps: result.totalSteps,
duration,
})
}
Custom Stream Events
Send custom events to stream subscribers:import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'SendCustomEvent',
triggers: [{ type: 'http', method: 'POST', path: '/notify' }],
flows: ['notifications'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
const { groupId, message } = request.body
// Send custom event to all subscribers
await streams.notifications.send(
{ groupId },
{
type: 'custom',
data: {
message,
timestamp: Date.now(),
},
}
)
return { status: 200, body: { success: true } }
}
Authentication
Secure stream connections with authentication:// motia.config.ts
import type { AuthenticateStream } from 'motia'
export const authenticateStream: AuthenticateStream = async (req, context) => {
const token = req.headers.authorization?.replace('Bearer ', '')
if (!token) {
context.logger.warn('No token provided')
return { unauthorized: true }
}
try {
const user = await verifyToken(token)
context.logger.info('Stream authenticated', { userId: user.id })
return {
unauthorized: false,
context: { userId: user.id, role: user.role },
}
} catch (error) {
context.logger.error('Authentication failed', { error })
return { unauthorized: true }
}
}
Stream Client API Reference
interface StreamClient {
// Connect to WebSocket server
connect(): Promise<void>
// Disconnect from server
disconnect(): void
// Subscribe to a stream group
group<T>(streamName: string, groupId: string): StreamGroupSubscription<T>
// Subscribe to a single item
item<T>(streamName: string, groupId: string, itemId: string): StreamItemSubscription<T>
}
interface StreamGroupSubscription<T> {
// Listen for events
on(event: '*' | 'create' | 'update' | 'delete' | 'sync' | 'event', callback: Function): void
// Get current state
getState(): T[]
// Close subscription
close(): void
}
Best Practices
1. Use Atomic Updates
// Atomic updates prevent race conditions and ensure consistency
await streams.counter.update('global', 'count', [
{ type: 'increment', path: 'value', by: 1 },
])
2. Handle Connection Lifecycle
const client = new StreamClient({ url: 'ws://localhost:3222' })
client.on('connected', () => {
console.log('Connected to stream server')
})
client.on('disconnected', () => {
console.log('Disconnected from stream server')
})
await client.connect()
3. Clean Up Subscriptions
useEffect(() => {
const subscription = client.group('todos', 'my-todos')
subscription.on('*', handleUpdate)
return () => {
subscription.close()
}
}, [])
4. Type Safety
interface Todo {
id: string
description: string
}
const subscription = client.group<Todo>('todo', 'todos')
subscription.on('*', (todos: Todo[]) => {
// Fully typed
console.log(todos[0].description)
})
Next Steps
- Learn about AI Agents for intelligent workflows
- Explore Workflows for multi-step orchestration
- Check out Observability for stream monitoring