Skip to main content

AI Agents

Motia provides a unified platform for building AI agents and multi-agent systems. Combine LLM calls, event-driven orchestration, state management, and streaming to create powerful AI workflows.

Simple AI Step

Integrate LLMs directly into your steps:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const querySchema = z.object({
  question: z.string(),
})

export const config = {
  name: 'SimpleAIAgent',
  triggers: [{ type: 'http', method: 'POST', path: '/ask', bodySchema: querySchema }],
  flows: ['ai-agent'],
} as const satisfies StepConfig

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
  const { question } = request.body

  logger.info('Processing AI query', { question })

  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      { role: 'system', content: 'You are a helpful assistant.' },
      { role: 'user', content: question },
    ],
  })

  const answer = completion.choices[0].message.content

  logger.info('AI response generated', { question, answer })

  return {
    status: 200,
    body: { question, answer },
  }
}

Multi-Agent Financial Analysis

Build a financial analysis system with specialized agents:

1. Query Coordinator

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const querySchema = z.object({
  symbol: z.string(),
  question: z.string(),
})

export const config = {
  name: 'FinanceQueryAPI',
  triggers: [{ type: 'http', method: 'POST', path: '/finance/query', bodySchema: querySchema }],
  enqueues: ['finance.search', 'finance.data'],
  flows: ['finance-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state, logger }) => {
  const { symbol, question } = request.body
  const queryId = crypto.randomUUID()

  logger.info('Finance query received', { queryId, symbol, question })

  // Initialize query state
  await state.set('queries', queryId, {
    queryId,
    symbol,
    question,
    status: 'processing',
    createdAt: new Date().toISOString(),
  })

  // Trigger parallel data gathering
  await Promise.all([
    enqueue({
      topic: 'finance.search',
      data: { queryId, symbol, question },
    }),
    enqueue({
      topic: 'finance.data',
      data: { queryId, symbol },
    }),
  ])

  return {
    status: 202,
    body: { queryId, status: 'processing' },
  }
}

2. Web Search Agent

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const inputSchema = z.object({
  queryId: z.string(),
  symbol: z.string(),
  question: z.string(),
})

export const config = {
  name: 'WebSearchAgent',
  triggers: [{ type: 'queue', topic: 'finance.search', input: inputSchema }],
  enqueues: ['finance.analyze'],
  flows: ['finance-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
  const { queryId, symbol, question } = input

  logger.info('Searching web for financial data', { queryId, symbol })

  // Call search API (e.g., SerperDev, Brave Search)
  const searchResults = await fetch('https://api.serper.dev/search', {
    method: 'POST',
    headers: {
      'X-API-KEY': process.env.SERPER_API_KEY!,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      q: `${symbol} ${question} stock market analysis`,
      num: 5,
    }),
  }).then((res) => res.json())

  // Store search results
  await state.update('queries', queryId, [
    {
      type: 'set',
      path: 'searchResults',
      value: searchResults.organic || [],
    },
  ])

  // Trigger analysis
  await enqueue({
    topic: 'finance.analyze',
    data: { queryId },
  })

  logger.info('Web search completed', { queryId, resultCount: searchResults.organic?.length })
}

3. Financial Data Agent

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const inputSchema = z.object({
  queryId: z.string(),
  symbol: z.string(),
})

export const config = {
  name: 'FinancialDataAgent',
  triggers: [{ type: 'queue', topic: 'finance.data', input: inputSchema }],
  enqueues: ['finance.analyze'],
  flows: ['finance-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
  const { queryId, symbol } = input

  logger.info('Fetching financial data', { queryId, symbol })

  // Fetch stock data from Alpha Vantage
  const stockData = await fetch(
    `https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=${symbol}&apikey=${process.env.ALPHA_VANTAGE_API_KEY}`
  ).then((res) => res.json())

  // Fetch company overview
  const companyData = await fetch(
    `https://www.alphavantage.co/query?function=OVERVIEW&symbol=${symbol}&apikey=${process.env.ALPHA_VANTAGE_API_KEY}`
  ).then((res) => res.json())

  // Store financial data
  await state.update('queries', queryId, [
    { type: 'set', path: 'stockData', value: stockData },
    { type: 'set', path: 'companyData', value: companyData },
  ])

  // Trigger analysis
  await enqueue({
    topic: 'finance.analyze',
    data: { queryId },
  })

  logger.info('Financial data fetched', { queryId, symbol })
}

4. AI Analysis Agent

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'

const inputSchema = z.object({
  queryId: z.string(),
})

export const config = {
  name: 'AIAnalysisAgent',
  triggers: [{ type: 'queue', topic: 'finance.analyze', input: inputSchema }],
  flows: ['finance-agent'],
} as const satisfies StepConfig

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
})

export const handler: Handlers<typeof config> = async (input, { state, logger }) => {
  const { queryId } = input

  // Get all gathered data
  const query = await state.get<any>('queries', queryId)

  if (!query) {
    logger.error('Query not found', { queryId })
    return
  }

  // Check if all data is ready
  if (!query.searchResults || !query.stockData || !query.companyData) {
    logger.info('Waiting for more data', { queryId })
    return
  }

  logger.info('Analyzing with AI', { queryId })

  // Prepare context for AI
  const context = `
Stock Data: ${JSON.stringify(query.stockData, null, 2)}
Company Data: ${JSON.stringify(query.companyData, null, 2)}
Recent News: ${JSON.stringify(query.searchResults, null, 2)}
  `

  // Generate analysis
  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: 'You are a financial analyst. Provide clear, actionable insights based on the data.',
      },
      {
        role: 'user',
        content: `Question: ${query.question}\n\nContext:\n${context}`,
      },
    ],
  })

  const analysis = completion.choices[0].message.content

  // Store final analysis
  await state.update('queries', queryId, [
    { type: 'set', path: 'analysis', value: analysis },
    { type: 'set', path: 'status', value: 'completed' },
    { type: 'set', path: 'completedAt', value: new Date().toISOString() },
  ])

  logger.info('Analysis completed', { queryId })
}

5. Results API

import type { Handlers, StepConfig } from 'motia'

export const config = {
  name: 'FinanceResultAPI',
  triggers: [{ type: 'http', method: 'GET', path: '/finance/result/:queryId' }],
  flows: ['finance-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
  const { queryId } = request.params

  const result = await state.get<any>('queries', queryId)

  if (!result) {
    return {
      status: 404,
      body: { error: 'Query not found' },
    }
  }

  return {
    status: 200,
    body: result,
  }
}

Deep Research Agent

Build an iterative research agent with multiple analysis layers:

Research Coordinator

import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  query: z.string(),
  depth: z.number().default(2),
})

export const config = {
  name: 'ResearchAPI',
  triggers: [{ type: 'http', method: 'POST', path: '/research', bodySchema }],
  enqueues: ['research.generate-queries'],
  flows: ['research-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state }) => {
  const { query, depth } = request.body
  const researchId = crypto.randomUUID()

  await state.set('research', researchId, {
    researchId,
    query,
    depth,
    currentDepth: 0,
    status: 'processing',
    findings: [],
    createdAt: new Date().toISOString(),
  })

  await enqueue({
    topic: 'research.generate-queries',
    data: { researchId, query, depth: 0 },
  })

  return {
    status: 202,
    body: { researchId, status: 'processing' },
  }
}

Query Generation Agent

import type { Handlers, StepConfig } from 'motia'
import OpenAI from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

export const config = {
  name: 'GenerateQueries',
  triggers: [{ type: 'queue', topic: 'research.generate-queries' }],
  enqueues: ['research.search'],
  flows: ['research-agent'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
  const { researchId, query, depth } = input

  logger.info('Generating search queries', { researchId, depth })

  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: 'Generate 3-5 specific search queries to research this topic deeply.',
      },
      { role: 'user', content: query },
    ],
  })

  const queries = completion.choices[0].message.content?.split('\n').filter(Boolean) || []

  // Enqueue parallel searches
  await Promise.all(
    queries.map((searchQuery) =>
      enqueue({
        topic: 'research.search',
        data: { researchId, searchQuery, depth },
      })
    )
  )

  logger.info('Search queries generated', { researchId, queryCount: queries.length })
}

Streaming AI Responses

Stream AI responses in real-time to clients:
import type { Handlers, StepConfig } from 'motia'
import OpenAI from 'openai'

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })

export const config = {
  name: 'StreamingAI',
  triggers: [{ type: 'http', method: 'POST', path: '/ai/stream' }],
  flows: ['ai-streaming'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
  const { question } = request.body

  logger.info('Streaming AI response', { question })

  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })

  const stream = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: question }],
    stream: true,
  })

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || ''
    if (content) {
      response.stream.write(`data: ${JSON.stringify({ content })}\n\n`)
    }
  }

  response.stream.write('data: [DONE]\n\n')
  response.close()
}

AI Agent Patterns

Chain of Thought

export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
  const { problem } = request.body

  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: 'Think step by step to solve this problem. Show your reasoning.',
      },
      { role: 'user', content: problem },
    ],
  })

  return { status: 200, body: { solution: completion.choices[0].message.content } }
}

Retrieval Augmented Generation (RAG)

export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
  const { question } = request.body

  // Retrieve relevant documents from state
  const documents = await state.list<Document>('knowledge-base')
  
  // Find relevant docs (simple example - use vector DB in production)
  const relevant = documents.filter(doc => 
    doc.content.toLowerCase().includes(question.toLowerCase())
  )

  const context = relevant.map(doc => doc.content).join('\n\n')

  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      {
        role: 'system',
        content: `Answer based on this context:\n${context}`,
      },
      { role: 'user', content: question },
    ],
  })

  return { status: 200, body: { answer: completion.choices[0].message.content } }
}

Tool Use / Function Calling

const tools = [
  {
    type: 'function',
    function: {
      name: 'get_weather',
      description: 'Get the current weather in a location',
      parameters: {
        type: 'object',
        properties: {
          location: { type: 'string', description: 'City name' },
        },
        required: ['location'],
      },
    },
  },
]

export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [{ role: 'user', content: request.body.question }],
    tools,
    tool_choice: 'auto',
  })

  const toolCalls = completion.choices[0].message.tool_calls

  if (toolCalls) {
    for (const toolCall of toolCalls) {
      if (toolCall.function.name === 'get_weather') {
        const args = JSON.parse(toolCall.function.arguments)
        const weather = await getWeather(args.location)
        logger.info('Weather retrieved', { location: args.location, weather })
      }
    }
  }

  return { status: 200, body: { response: completion.choices[0].message } }
}

Best Practices

1. Error Handling

export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
  try {
    const completion = await openai.chat.completions.create({
      model: 'gpt-4o',
      messages: [{ role: 'user', content: request.body.question }],
    })

    return { status: 200, body: { answer: completion.choices[0].message.content } }
  } catch (error) {
    logger.error('OpenAI API error', { error })
    return {
      status: 500,
      body: { error: 'Failed to generate response' },
    }
  }
}

2. Rate Limiting

export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
  const userId = request.headers['x-user-id']
  
  // Check rate limit
  const usage = await state.get<{ count: number }>('rate-limits', userId)
  
  if (usage && usage.count > 100) {
    return {
      status: 429,
      body: { error: 'Rate limit exceeded' },
    }
  }

  // Increment counter
  await state.update('rate-limits', userId, [
    { type: 'increment', path: 'count', by: 1 },
  ])

  // Process request...
}

3. Prompt Engineering

const SYSTEM_PROMPT = `You are a helpful financial analyst.

Guidelines:
- Provide data-driven insights
- Cite your sources
- Be clear about uncertainty
- Use specific numbers and dates
- Consider multiple perspectives`

export const handler: Handlers<typeof config> = async ({ request }) => {
  const completion = await openai.chat.completions.create({
    model: 'gpt-4o',
    messages: [
      { role: 'system', content: SYSTEM_PROMPT },
      { role: 'user', content: request.body.question },
    ],
    temperature: 0.7,
    max_tokens: 1000,
  })

  return { status: 200, body: { answer: completion.choices[0].message.content } }
}

Next Steps

Build docs developers (and LLMs) love