Skip to main content

Overview

Motia is designed for building AI-powered applications. This guide shows you how to integrate LLMs, build multi-agent systems, and create sophisticated AI workflows.

Basic LLM integration

Start by creating a simple AI-powered endpoint:
1

Install your LLM SDK

npm install openai
# or
pip install openai
2

Create an AI endpoint

steps/ai-chat.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const config = {
  name: 'AI Chat',
  description: 'Process chat messages with GPT-4',
  flows: ['ai-chat'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/chat',
      bodySchema: z.object({
        message: z.string(),
        conversationId: z.string().optional(),
      }),
      responseSchema: {
        200: z.object({
          response: z.string(),
          conversationId: z.string(),
        }),
      },
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state }
) => {
  const { message, conversationId } = request.body
  const convId = conversationId || `conv-${Date.now()}`

  logger.info('Processing chat message', { conversationId: convId })

  // Retrieve conversation history
  const history = await state.get<Array<{role: string, content: string}>>(
    'conversations',
    convId
  ) || []

  // Add user message
  history.push({ role: 'user', content: message })

  // Call OpenAI
  const completion = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: history,
  })

  const response = completion.choices[0].message.content || ''

  // Add assistant response to history
  history.push({ role: 'assistant', content: response })

  // Save conversation
  await state.set('conversations', convId, history)

  logger.info('Chat response generated', { 
    conversationId: convId,
    messageLength: response.length,
  })

  return {
    status: 200,
    body: {
      response,
      conversationId: convId,
    },
  }
}
3

Test your AI endpoint

curl -X POST http://localhost:3000/chat \
  -H "Content-Type: application/json" \
  -d '{
    "message": "What is Motia?"
  }'

Streaming AI responses

For better UX, stream AI responses as they’re generated:
steps/ai-stream.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const config = {
  name: 'AI Chat Stream',
  description: 'Stream AI responses in real-time',
  flows: ['ai-chat'],
  triggers: [
    http('POST', '/chat/stream', {
      bodySchema: z.object({
        message: z.string(),
        conversationId: z.string().optional(),
      }),
    }),
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request, response },
  { logger, state }
) => {
  const { message, conversationId } = request.body
  const convId = conversationId || `conv-${Date.now()}`

  // Set up SSE
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  try {
    // Get conversation history
    const history = await state.get<Array<{role: string, content: string}>>(
      'conversations',
      convId
    ) || []

    history.push({ role: 'user', content: message })

    // Stream from OpenAI
    const stream = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: history,
      stream: true,
    })

    let fullResponse = ''

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || ''
      
      if (content) {
        fullResponse += content
        
        // Stream each token to client
        response.stream.write(
          `event: token\ndata: ${JSON.stringify({ content })}\n\n`
        )
      }
    }

    // Save complete response
    history.push({ role: 'assistant', content: fullResponse })
    await state.set('conversations', convId, history)

    // Send completion event
    response.stream.write(
      `event: done\ndata: ${JSON.stringify({
        conversationId: convId,
      })}\n\n`
    )

    logger.info('Streaming complete', { conversationId: convId })

  } catch (error) {
    logger.error('AI streaming failed', { error })
    response.stream.write(
      `event: error\ndata: ${JSON.stringify({
        error: 'Failed to generate response',
      })}\n\n`
    )
  }

  response.close()
}

Multi-agent orchestration

Build sophisticated workflows where multiple AI agents collaborate:
steps/multi-agent.step.ts
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

export const config = {
  name: 'Research Orchestrator',
  description: 'Coordinate multiple AI agents for research tasks',
  flows: ['ai-research'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/research',
      bodySchema: z.object({
        topic: z.string(),
      }),
    },
  ],
  enqueues: ['research-complete'],
} as const satisfies StepConfig

const AGENTS = {
  researcher: {
    role: 'You are a research specialist. Gather key facts about the topic.',
    model: 'gpt-4',
  },
  analyst: {
    role: 'You are an analyst. Analyze the research and identify patterns.',
    model: 'gpt-4',
  },
  writer: {
    role: 'You are a writer. Create a clear summary from the analysis.',
    model: 'gpt-4',
  },
}

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state, enqueue }
) => {
  const { topic } = request.body
  const researchId = `research-${Date.now()}`

  logger.info('Starting multi-agent research', { topic, researchId })

  try {
    // Agent 1: Research
    logger.info('Agent 1: Researching')
    const researchResponse = await openai.chat.completions.create({
      model: AGENTS.researcher.model,
      messages: [
        { role: 'system', content: AGENTS.researcher.role },
        { role: 'user', content: `Research this topic: ${topic}` },
      ],
    })
    const research = researchResponse.choices[0].message.content

    await state.set('research', `${researchId}:research`, {
      agent: 'researcher',
      content: research,
      timestamp: new Date().toISOString(),
    })

    // Agent 2: Analysis
    logger.info('Agent 2: Analyzing')
    const analysisResponse = await openai.chat.completions.create({
      model: AGENTS.analyst.model,
      messages: [
        { role: 'system', content: AGENTS.analyst.role },
        { role: 'user', content: `Analyze this research: ${research}` },
      ],
    })
    const analysis = analysisResponse.choices[0].message.content

    await state.set('research', `${researchId}:analysis`, {
      agent: 'analyst',
      content: analysis,
      timestamp: new Date().toISOString(),
    })

    // Agent 3: Writing
    logger.info('Agent 3: Writing summary')
    const summaryResponse = await openai.chat.completions.create({
      model: AGENTS.writer.model,
      messages: [
        { role: 'system', content: AGENTS.writer.role },
        { 
          role: 'user', 
          content: `Create a summary from this analysis: ${analysis}` 
        },
      ],
    })
    const summary = summaryResponse.choices[0].message.content

    // Store final result
    const result = {
      topic,
      research,
      analysis,
      summary,
      createdAt: new Date().toISOString(),
    }

    await state.set('research', researchId, result)

    // Notify completion
    await enqueue({
      topic: 'research-complete',
      data: { researchId, topic },
    })

    logger.info('Multi-agent research complete', { researchId })

    return {
      status: 200,
      body: {
        researchId,
        summary,
      },
    }

  } catch (error) {
    logger.error('Multi-agent research failed', { error, researchId })
    
    return {
      status: 500,
      body: {
        error: 'Research failed',
        researchId,
      },
    }
  }
}

Background AI processing

Process AI tasks in the background to avoid blocking API responses:
steps/ai-background.step.ts
import { queue, step } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

const analysisSchema = z.object({
  contentId: z.string(),
  content: z.string(),
  analysisType: z.enum(['sentiment', 'summary', 'classification']),
})

export const stepConfig = {
  name: 'AI Content Analyzer',
  description: 'Analyze content with AI in the background',
  flows: ['ai-analysis'],
  triggers: [
    queue('analyze-content', { input: analysisSchema }),
  ],
  enqueues: ['analysis-complete'],
}

export const { config, handler } = step(stepConfig, async (_input, ctx) => {
  const { contentId, content, analysisType } = ctx.getData()

  ctx.logger.info('Starting AI analysis', { contentId, analysisType })

  try {
    let prompt = ''
    
    switch (analysisType) {
      case 'sentiment':
        prompt = `Analyze the sentiment of this text. Respond with only: positive, negative, or neutral.\n\n${content}`
        break
      case 'summary':
        prompt = `Provide a concise summary of this text in 2-3 sentences:\n\n${content}`
        break
      case 'classification':
        prompt = `Classify this text into categories. List up to 3 categories:\n\n${content}`
        break
    }

    const response = await openai.chat.completions.create({
      model: 'gpt-4',
      messages: [{ role: 'user', content: prompt }],
    })

    const result = response.choices[0].message.content

    // Store analysis result
    await ctx.state.set('analysis', contentId, {
      contentId,
      type: analysisType,
      result,
      analyzedAt: new Date().toISOString(),
    })

    // Notify completion
    await ctx.enqueue({
      topic: 'analysis-complete',
      data: { contentId, type: analysisType, result },
    })

    ctx.logger.info('Analysis complete', { contentId, analysisType })

  } catch (error) {
    ctx.logger.error('Analysis failed', { contentId, error })
    throw error // Will trigger retry
  }
})
Trigger the analysis from an API:
steps/trigger-analysis.step.ts
export const handler: Handlers<typeof config> = async (
  { request },
  { enqueue }
) => {
  const { contentId, content, analysisType } = request.body

  // Enqueue for background processing
  await enqueue({
    topic: 'analyze-content',
    data: { contentId, content, analysisType },
  })

  return {
    status: 202,
    body: {
      message: 'Analysis started',
      contentId,
    },
  }
}

Using different LLMs

Motia works with any LLM provider. Here’s an example with Anthropic’s Claude:
import Anthropic from '@anthropic-ai/sdk'

const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
})

export const handler: Handlers<typeof config> = async (
  { request },
  { logger }
) => {
  const { message } = request.body

  const response = await anthropic.messages.create({
    model: 'claude-3-5-sonnet-20241022',
    max_tokens: 1024,
    messages: [{ role: 'user', content: message }],
  })

  const reply = response.content[0].text

  logger.info('Claude response generated')

  return {
    status: 200,
    body: { response: reply },
  }
}

RAG (Retrieval Augmented Generation)

Build RAG systems to enhance AI responses with your own data:
import { embed, search } from './vector-db'

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state }
) => {
  const { question } = request.body

  // 1. Embed the question
  const questionEmbedding = await embed(question)

  // 2. Search for relevant documents
  const relevantDocs = await search(questionEmbedding, { limit: 3 })

  // 3. Build context from documents
  const context = relevantDocs
    .map(doc => doc.content)
    .join('\n\n')

  // 4. Generate response with context
  const response = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [
      {
        role: 'system',
        content: 'Answer questions based on the provided context.',
      },
      {
        role: 'user',
        content: `Context:\n${context}\n\nQuestion: ${question}`,
      },
    ],
  })

  const answer = response.choices[0].message.content

  logger.info('RAG response generated', {
    docsUsed: relevantDocs.length,
  })

  return {
    status: 200,
    body: {
      answer,
      sources: relevantDocs.map(d => d.id),
    },
  }
}

Best practices

Handle rate limits

Implement exponential backoff and retry logic for API rate limits.

Cache responses

Cache AI responses when appropriate to reduce costs and latency.

Monitor costs

Track token usage and costs. Set budgets and alerts.

Use background jobs

Process long-running AI tasks in background queues, not inline.

Next steps

Multi-language

Mix TypeScript and Python in one project

Building APIs

Learn more about API patterns and validation

Build docs developers (and LLMs) love