Skip to main content
Build an intelligent chatbot that streams AI responses in real-time, providing instant user feedback and a seamless conversational experience. This example demonstrates how to integrate OpenAI’s streaming API with Motia’s SSE capabilities for production-grade AI applications.

Overview

This example shows how to:
  • Stream AI responses in real-time using Server-Sent Events (SSE)
  • Handle concurrent chat sessions with state management
  • Implement conversation history and context retention
  • Process streaming data with proper error handling
  • Build interactive chat experiences with immediate feedback

Architecture

The chatbot workflow consists of three main components:
1
1. Chat Message Handler
2
Receives user messages via HTTP POST and initiates the streaming response.
3
2. AI Response Streamer
4
Streams OpenAI responses in real-time using SSE, providing immediate visual feedback.
5
3. Conversation Manager
6
Maintains chat history and context using Motia’s state management.

Implementation

Step 1: Create the Chat Endpoint

Create a step that receives chat messages and streams AI responses:
import { type Handlers, http, type StepConfig } from 'motia'
import OpenAI from 'openai'

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const config = {
  name: 'AI Chat Stream',
  description: 'Streams AI chat responses using SSE',
  flows: ['ai-chatbot'],
  triggers: [http('POST', '/chat/stream')],
  enqueues: ['chat.message.completed'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request, response },
  { logger, state, enqueue }
) => {
  const { message, sessionId, conversationId } = request.body

  logger.info('Chat request received', { sessionId, conversationId })

  // Set SSE headers for real-time streaming
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  try {
    // Retrieve conversation history from state
    const history = await state.get('conversations', conversationId) || {
      messages: [],
      createdAt: Date.now(),
    }

    // Add user message to history
    const userMessage = { role: 'user', content: message, timestamp: Date.now() }
    history.messages.push(userMessage)

    // Stream response from OpenAI
    const stream = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: history.messages.map(m => ({
        role: m.role,
        content: m.content,
      })),
      stream: true,
      temperature: 0.7,
      max_tokens: 2000,
    })

    let fullResponse = ''
    let chunkCount = 0

    // Stream each chunk to the client
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || ''
      
      if (content) {
        fullResponse += content
        chunkCount++

        // Send SSE event with chunk
        response.stream.write(
          `event: chunk\ndata: ${JSON.stringify({ 
            content, 
            index: chunkCount,
            conversationId 
          })}\n\n`
        )
      }
    }

    // Add assistant response to history
    const assistantMessage = {
      role: 'assistant',
      content: fullResponse,
      timestamp: Date.now(),
    }
    history.messages.push(assistantMessage)

    // Update conversation state
    await state.set('conversations', conversationId, {
      ...history,
      lastMessageAt: Date.now(),
      messageCount: history.messages.length,
    })

    // Send completion event
    response.stream.write(
      `event: complete\ndata: ${JSON.stringify({ 
        conversationId,
        messageCount: history.messages.length,
        responseLength: fullResponse.length,
      })}\n\n`
    )

    // Enqueue analytics event
    await enqueue({
      topic: 'chat.message.completed',
      data: {
        conversationId,
        sessionId,
        messageLength: message.length,
        responseLength: fullResponse.length,
        chunkCount,
        timestamp: Date.now(),
      },
    })

    logger.info('Chat response completed', {
      conversationId,
      chunks: chunkCount,
      responseLength: fullResponse.length,
    })

  } catch (error) {
    logger.error('Chat streaming error', { error, conversationId })
    
    response.stream.write(
      `event: error\ndata: ${JSON.stringify({ 
        error: 'Failed to generate response',
        conversationId 
      })}\n\n`
    )
  } finally {
    response.close()
  }
}

Step 2: Analytics & Message Logging

Process completed chat messages for analytics:
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'

const messageSchema = z.object({
  conversationId: z.string(),
  sessionId: z.string(),
  messageLength: z.number(),
  responseLength: z.number(),
  chunkCount: z.number(),
  timestamp: z.number(),
})

export const config = {
  name: 'Chat Analytics',
  description: 'Processes chat message analytics',
  flows: ['ai-chatbot'],
  triggers: [
    {
      type: 'queue',
      topic: 'chat.message.completed',
      input: messageSchema,
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { conversationId, sessionId, responseLength, chunkCount } = input

  logger.info('Processing chat analytics', { conversationId, sessionId })

  // Update session statistics
  const sessionStats = await state.get('session-stats', sessionId) || {
    totalMessages: 0,
    totalTokens: 0,
    averageResponseTime: 0,
    conversationCount: new Set(),
  }

  await state.update('session-stats', sessionId, [
    { type: 'increment', path: 'totalMessages', by: 1 },
    { type: 'increment', path: 'totalTokens', by: Math.ceil(responseLength / 4) },
    { type: 'set', path: `conversations.${conversationId}`, value: Date.now() },
  ])

  logger.info('Analytics processed', {
    conversationId,
    sessionId,
    chunks: chunkCount,
  })
}

Step 3: Conversation Cleanup

Automatically clean up old conversations:
import { type Handlers, cron, type StepConfig } from 'motia'

export const config = {
  name: 'Conversation Cleanup',
  description: 'Removes stale conversations older than 24 hours',
  flows: ['ai-chatbot'],
  triggers: [cron('0 */6 * * *')], // Every 6 hours
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  _,
  { logger, state }
) => {
  logger.info('Starting conversation cleanup')

  const cutoffTime = Date.now() - 24 * 60 * 60 * 1000 // 24 hours ago
  let deletedCount = 0

  // Get all conversations (in production, use pagination)
  const conversations = await state.list('conversations')

  for (const { id, data } of conversations) {
    if (data.lastMessageAt < cutoffTime) {
      await state.delete('conversations', id)
      deletedCount++
    }
  }

  logger.info('Conversation cleanup completed', {
    deleted: deletedCount,
    cutoffTime,
  })
}

Client Integration

JavaScript/TypeScript Client

class ChatClient {
  private eventSource: EventSource | null = null
  private conversationId: string
  private sessionId: string

  constructor(sessionId: string, conversationId?: string) {
    this.sessionId = sessionId
    this.conversationId = conversationId || crypto.randomUUID()
  }

  async sendMessage(
    message: string,
    onChunk: (chunk: string) => void,
    onComplete: () => void,
    onError: (error: string) => void
  ) {
    const response = await fetch('http://localhost:3111/chat/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        message,
        sessionId: this.sessionId,
        conversationId: this.conversationId,
      }),
    })

    const reader = response.body!.getReader()
    const decoder = new TextDecoder()
    let buffer = ''

    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      buffer += decoder.decode(value, { stream: true })
      const lines = buffer.split('\n\n')
      buffer = lines.pop() || ''

      for (const line of lines) {
        if (line.startsWith('event: chunk')) {
          const data = line.split('data: ')[1]
          const { content } = JSON.parse(data)
          onChunk(content)
        } else if (line.startsWith('event: complete')) {
          onComplete()
        } else if (line.startsWith('event: error')) {
          const data = line.split('data: ')[1]
          const { error } = JSON.parse(data)
          onError(error)
        }
      }
    }
  }

  getConversationId(): string {
    return this.conversationId
  }
}

// Usage
const chat = new ChatClient('session-123')

chat.sendMessage(
  'What is Motia?',
  (chunk) => console.log('Chunk:', chunk),
  () => console.log('Complete!'),
  (error) => console.error('Error:', error)
)

React Component Example

import { useState, useCallback } from 'react'

function ChatInterface() {
  const [messages, setMessages] = useState<Array<{ role: string; content: string }>>([])
  const [currentResponse, setCurrentResponse] = useState('')
  const [isStreaming, setIsStreaming] = useState(false)

  const sendMessage = useCallback(async (message: string) => {
    // Add user message
    setMessages(prev => [...prev, { role: 'user', content: message }])
    setIsStreaming(true)
    setCurrentResponse('')

    const chat = new ChatClient('session-id', 'conversation-id')

    await chat.sendMessage(
      message,
      (chunk) => {
        setCurrentResponse(prev => prev + chunk)
      },
      () => {
        setMessages(prev => [...prev, { role: 'assistant', content: currentResponse }])
        setCurrentResponse('')
        setIsStreaming(false)
      },
      (error) => {
        console.error('Chat error:', error)
        setIsStreaming(false)
      }
    )
  }, [currentResponse])

  return (
    <div className="chat-container">
      {messages.map((msg, i) => (
        <div key={i} className={`message ${msg.role}`}>
          {msg.content}
        </div>
      ))}
      {isStreaming && (
        <div className="message assistant streaming">
          {currentResponse}
          <span className="cursor"></span>
        </div>
      )}
    </div>
  )
}

Key Features

Real-Time Streaming

Stream AI responses token-by-token for immediate user feedback, creating a natural conversational experience.

Conversation Context

Maintain full conversation history using Motia’s state management, enabling context-aware responses across multiple turns.

Concurrent Sessions

Handle multiple chat sessions simultaneously with isolated state and proper resource management.

Error Recovery

Graceful error handling with fallback responses and automatic retry logic for resilient chat experiences.

Analytics & Monitoring

Track message counts, response times, and token usage for optimization and cost management.

Environment Setup

# .env
OPENAI_API_KEY=sk-...

Testing

Test the streaming chat endpoint:
curl -X POST http://localhost:3111/chat/stream \
  -H "Content-Type: application/json" \
  -d '{
    "message": "What is Motia and how does it work?",
    "sessionId": "test-session",
    "conversationId": "test-conversation"
  }'

Advanced Features

Custom System Prompts

Add specialized AI behavior:
const systemPrompt = {
  role: 'system',
  content: 'You are a helpful assistant specialized in software development.',
}

history.messages = [systemPrompt, ...history.messages]

Function Calling

Integrate tools and external APIs:
const stream = await openai.chat.completions.create({
  model: 'gpt-4',
  messages: history.messages,
  tools: [
    {
      type: 'function',
      function: {
        name: 'search_documentation',
        description: 'Search Motia documentation',
        parameters: {
          type: 'object',
          properties: {
            query: { type: 'string' },
          },
        },
      },
    },
  ],
  stream: true,
})

Rate Limiting

Implement per-user rate limits:
const rateLimitKey = `rate-limit:${sessionId}`
const count = await state.get('rate-limits', rateLimitKey) || 0

if (count > 10) {
  throw new Error('Rate limit exceeded')
}

await state.update('rate-limits', rateLimitKey, [
  { type: 'increment', path: 'count', by: 1 },
  { type: 'set', path: 'expiresAt', value: Date.now() + 60000 },
])

Production Considerations

  1. Cost Management: Track token usage and implement spending limits
  2. Content Moderation: Filter inappropriate requests and responses
  3. Privacy: Implement conversation encryption and data retention policies
  4. Scalability: Use Redis or PostgreSQL for state storage in production
  5. Monitoring: Track response times, error rates, and user satisfaction

Next Steps

  • Add multi-modal support (images, audio)
  • Implement conversation branching and editing
  • Build knowledge base integration
  • Add voice input/output capabilities
  • Integrate with customer support systems

Build docs developers (and LLMs) love