Skip to main content

Streaming

Motia provides built-in real-time streaming capabilities through WebSockets, allowing you to push live updates from long-running or asynchronous workflows to clients without polling.

Stream Configuration

Define streams using a StreamConfig file:
// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  description: z.string(),
  createdAt: z.string(),
  dueDate: z.string().optional(),
  completedAt: z.string().optional(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'todo',
  schema: todoSchema,
}

export type Todo = z.infer<typeof todoSchema>

Stream Operations

All steps can access streams via the streams context:

Set, Update, Delete

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  description: z.string(),
  dueDate: z.string().optional(),
})

export const config = {
  name: 'CreateTodo',
  triggers: [{ type: 'http', method: 'POST', path: '/todos', bodySchema }],
  flows: ['todo'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { streams, logger }) => {
  const todo = {
    id: crypto.randomUUID(),
    description: request.body.description,
    dueDate: request.body.dueDate,
    createdAt: new Date().toISOString(),
  }

  // Set stream item - broadcasts to all subscribers
  await streams.todo.set('todos', todo.id, todo)

  logger.info('Todo created and broadcast', { todoId: todo.id })

  return {
    status: 201,
    body: { todo },
  }
}

Atomic Updates

import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'CompleteTodo',
  triggers: [{ type: 'http', method: 'PATCH', path: '/todos/:id/complete' }],
  flows: ['todo'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
  const { id } = request.params

  // Atomic update - broadcasts to subscribers
  await streams.todo.update('todos', id, [
    { type: 'set', path: 'completedAt', value: new Date().toISOString() },
    { type: 'merge', path: 'metadata', value: { completedBy: 'user-123' } },
  ])

  return { status: 200, body: { success: true } }
}

Delete Items

export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
  const { id } = request.params

  // Delete and broadcast removal
  await streams.todo.delete('todos', id)

  return { status: 204 }
}

List Operations

export const handler: Handlers<typeof config> = async (_, { streams }) => {
  // List all items in a group
  const todos = await streams.todo.list('todos')

  // List all groups in a stream
  const groups = await streams.todo.listGroups()

  return { status: 200, body: { todos, groups } }
}

Stream Lifecycle Hooks

React to client connections and disconnections:
// streams/todo.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const todoSchema = z.object({
  id: z.string(),
  description: z.string(),
  createdAt: z.string(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'todo',
  schema: todoSchema,

  onJoin: async (subscription, context, authContext) => {
    // Track active watchers
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'increment', path: 'watching', by: 1 },
    ])

    context.logger.info('Todo stream joined', { 
      groupId: subscription.groupId,
      authContext 
    })

    // Return authorization decision
    return { unauthorized: false }
  },

  onLeave: async (subscription, context, authContext) => {
    // Decrement watcher count
    await context.streams.inbox.update('watching', subscription.groupId, [
      { type: 'decrement', path: 'watching', by: 1 },
    ])

    context.logger.info('Todo stream left', { groupId: subscription.groupId })
  },
}

export type Todo = z.infer<typeof todoSchema>

Stream Client Usage

Connect to streams from the browser or Node.js:

Browser Client

import { StreamClient } from '@motiadev/stream-client-browser'

const client = new StreamClient({
  url: 'ws://localhost:3222',
})

await client.connect()

// Subscribe to a stream group
const todoSub = client.group('todo', 'todos')

// Listen for all events
todoSub.on('*', (state) => {
  console.log('Todos updated:', state)
  renderTodoList(state)
})

// Listen for specific events
todoSub.on('create', (newTodo) => {
  console.log('New todo created:', newTodo)
})

todoSub.on('update', (updatedTodo) => {
  console.log('Todo updated:', updatedTodo)
})

todoSub.on('delete', (deletedTodo) => {
  console.log('Todo deleted:', deletedTodo)
})

// Get current state
const currentState = todoSub.getState()
console.log('Current todos:', currentState)

React Integration

import { StreamClient } from '@motiadev/stream-client-react'
import { useEffect, useState } from 'react'

interface Todo {
  id: string
  description: string
  completedAt?: string
}

function TodoList() {
  const [todos, setTodos] = useState<Todo[]>([])
  const [client] = useState(() => new StreamClient({ url: 'ws://localhost:3222' }))

  useEffect(() => {
    client.connect()

    const subscription = client.group<Todo>('todo', 'todos')

    // Update state on any change
    subscription.on('*', (state) => {
      setTodos(state)
    })

    return () => {
      subscription.close()
      client.disconnect()
    }
  }, [])

  return (
    <div>
      <h1>Todos ({todos.length})</h1>
      <ul>
        {todos.map((todo) => (
          <li key={todo.id}>
            <span style={{ textDecoration: todo.completedAt ? 'line-through' : 'none' }}>
              {todo.description}
            </span>
          </li>
        ))}
      </ul>
    </div>
  )
}

Node.js Client

import { StreamClient } from '@motiadev/stream-client-node'

const client = new StreamClient({
  url: 'ws://localhost:3222',
})

await client.connect()

const subscription = client.group('metrics', 'system-metrics')

subscription.on('update', (metrics) => {
  console.log('System metrics updated:', metrics)
  // Process metrics, send alerts, etc.
})

Parallel Merge with Streams

Coordinate parallel operations with real-time progress updates:

Stream Definition

// streams/parallel-merge.stream.ts
import type { StreamConfig } from 'motia'
import { z } from 'zod'

const parallelMergeSchema = z.object({
  startedAt: z.number(),
  totalSteps: z.number(),
  completedSteps: z.number(),
})

export const config: StreamConfig = {
  baseConfig: { storageType: 'default' },
  name: 'parallelMerge',
  schema: parallelMergeSchema,
}

export type ParallelMergeStreamItem = z.infer<typeof parallelMergeSchema>

Initiate Parallel Work

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  traceId: z.string().default(() => crypto.randomUUID()),
  totalSteps: z.number().default(3),
})

export const config = {
  name: 'StartStreamParallelMerge',
  triggers: [{ type: 'http', method: 'POST', path: '/stream-parallel-merge', bodySchema }],
  enqueues: ['step.process'],
  flows: ['stream-parallel-merge'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { streams, enqueue, logger }) => {
  const { traceId, totalSteps } = request.body

  logger.info('Starting stream parallel merge', { traceId, totalSteps })

  // Initialize stream state
  await streams.parallelMerge.set('merge-groups', traceId, {
    totalSteps,
    startedAt: Date.now(),
    completedSteps: 0,
  })

  // Enqueue parallel work
  await Promise.all(
    Array.from({ length: totalSteps }, (_, stepIndex) =>
      enqueue({
        topic: 'step.process',
        data: { traceId, stepIndex },
      })
    )
  )

  return {
    status: 200,
    body: { traceId, totalSteps },
  }
}

Process Steps and Update Stream

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

export const config = {
  name: 'StreamStepProcess',
  triggers: [
    {
      type: 'queue',
      topic: 'step.process',
      input: z.object({ traceId: z.string(), stepIndex: z.number() }),
    },
  ],
  flows: ['stream-parallel-merge'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (request, { streams, logger }) => {
  const { traceId, stepIndex } = request

  logger.info('Processing step', { traceId, stepIndex })

  // Simulate work
  await new Promise((resolve) => setTimeout(resolve, 1000))

  // Atomically increment - broadcasts to subscribers
  await streams.parallelMerge.update('merge-groups', traceId, [
    { type: 'increment', path: 'completedSteps', by: 1 },
  ])

  logger.info('Step completed', { traceId, stepIndex })
}

Stream Triggers

React to stream events automatically:
import type { Handlers, StepConfig, StreamTriggerInput } from 'motia'
import type { ParallelMergeStreamItem } from './parallel-merge.stream'

export const config = {
  name: 'StreamMergeComplete',
  triggers: [
    {
      type: 'stream',
      streamName: 'parallelMerge',
      groupId: 'merge-groups',
      condition: (input: StreamTriggerInput<ParallelMergeStreamItem>) => {
        return (
          input.event.type === 'update' &&
          input.event.data.completedSteps === input.event.data.totalSteps
        )
      },
    },
  ],
  flows: ['stream-parallel-merge'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (request, { logger }) => {
  const result = request.event.data
  const duration = Date.now() - result.startedAt

  logger.info('Parallel merge completed', {
    totalSteps: result.totalSteps,
    duration,
  })
}

Custom Stream Events

Send custom events to stream subscribers:
import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'SendCustomEvent',
  triggers: [{ type: 'http', method: 'POST', path: '/notify' }],
  flows: ['notifications'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { streams }) => {
  const { groupId, message } = request.body

  // Send custom event to all subscribers
  await streams.notifications.send(
    { groupId },
    {
      type: 'custom',
      data: {
        message,
        timestamp: Date.now(),
      },
    }
  )

  return { status: 200, body: { success: true } }
}

Authentication

Secure stream connections with authentication:
// motia.config.ts
import type { AuthenticateStream } from 'motia'

export const authenticateStream: AuthenticateStream = async (req, context) => {
  const token = req.headers.authorization?.replace('Bearer ', '')

  if (!token) {
    context.logger.warn('No token provided')
    return { unauthorized: true }
  }

  try {
    const user = await verifyToken(token)
    context.logger.info('Stream authenticated', { userId: user.id })

    return {
      unauthorized: false,
      context: { userId: user.id, role: user.role },
    }
  } catch (error) {
    context.logger.error('Authentication failed', { error })
    return { unauthorized: true }
  }
}

Stream Client API Reference

interface StreamClient {
  // Connect to WebSocket server
  connect(): Promise<void>

  // Disconnect from server
  disconnect(): void

  // Subscribe to a stream group
  group<T>(streamName: string, groupId: string): StreamGroupSubscription<T>

  // Subscribe to a single item
  item<T>(streamName: string, groupId: string, itemId: string): StreamItemSubscription<T>
}

interface StreamGroupSubscription<T> {
  // Listen for events
  on(event: '*' | 'create' | 'update' | 'delete' | 'sync' | 'event', callback: Function): void

  // Get current state
  getState(): T[]

  // Close subscription
  close(): void
}

Best Practices

1. Use Atomic Updates

// Atomic updates prevent race conditions and ensure consistency
await streams.counter.update('global', 'count', [
  { type: 'increment', path: 'value', by: 1 },
])

2. Handle Connection Lifecycle

const client = new StreamClient({ url: 'ws://localhost:3222' })

client.on('connected', () => {
  console.log('Connected to stream server')
})

client.on('disconnected', () => {
  console.log('Disconnected from stream server')
})

await client.connect()

3. Clean Up Subscriptions

useEffect(() => {
  const subscription = client.group('todos', 'my-todos')
  
  subscription.on('*', handleUpdate)

  return () => {
    subscription.close()
  }
}, [])

4. Type Safety

interface Todo {
  id: string
  description: string
}

const subscription = client.group<Todo>('todo', 'todos')

subscription.on('*', (todos: Todo[]) => {
  // Fully typed
  console.log(todos[0].description)
})

Next Steps

Build docs developers (and LLMs) love