import { type Handlers, http, type StepConfig } from 'motia'
import OpenAI from 'openai'
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const config = {
name: 'AI Chat Stream',
description: 'Streams AI chat responses using SSE',
flows: ['ai-chatbot'],
triggers: [http('POST', '/chat/stream')],
enqueues: ['chat.message.completed'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
{ request, response },
{ logger, state, enqueue }
) => {
const { message, sessionId, conversationId } = request.body
logger.info('Chat request received', { sessionId, conversationId })
// Set SSE headers for real-time streaming
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
})
try {
// Retrieve conversation history from state
const history = await state.get('conversations', conversationId) || {
messages: [],
createdAt: Date.now(),
}
// Add user message to history
const userMessage = { role: 'user', content: message, timestamp: Date.now() }
history.messages.push(userMessage)
// Stream response from OpenAI
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: history.messages.map(m => ({
role: m.role,
content: m.content,
})),
stream: true,
temperature: 0.7,
max_tokens: 2000,
})
let fullResponse = ''
let chunkCount = 0
// Stream each chunk to the client
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
fullResponse += content
chunkCount++
// Send SSE event with chunk
response.stream.write(
`event: chunk\ndata: ${JSON.stringify({
content,
index: chunkCount,
conversationId
})}\n\n`
)
}
}
// Add assistant response to history
const assistantMessage = {
role: 'assistant',
content: fullResponse,
timestamp: Date.now(),
}
history.messages.push(assistantMessage)
// Update conversation state
await state.set('conversations', conversationId, {
...history,
lastMessageAt: Date.now(),
messageCount: history.messages.length,
})
// Send completion event
response.stream.write(
`event: complete\ndata: ${JSON.stringify({
conversationId,
messageCount: history.messages.length,
responseLength: fullResponse.length,
})}\n\n`
)
// Enqueue analytics event
await enqueue({
topic: 'chat.message.completed',
data: {
conversationId,
sessionId,
messageLength: message.length,
responseLength: fullResponse.length,
chunkCount,
timestamp: Date.now(),
},
})
logger.info('Chat response completed', {
conversationId,
chunks: chunkCount,
responseLength: fullResponse.length,
})
} catch (error) {
logger.error('Chat streaming error', { error, conversationId })
response.stream.write(
`event: error\ndata: ${JSON.stringify({
error: 'Failed to generate response',
conversationId
})}\n\n`
)
} finally {
response.close()
}
}