AI Agents
Motia provides a unified platform for building AI agents and multi-agent systems. Combine LLM calls, event-driven orchestration, state management, and streaming to create powerful AI workflows.Simple AI Step
Integrate LLMs directly into your steps:import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const querySchema = z.object({
question: z.string(),
})
export const config = {
name: 'SimpleAIAgent',
triggers: [{ type: 'http', method: 'POST', path: '/ask', bodySchema: querySchema }],
flows: ['ai-agent'],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
const { question } = request.body
logger.info('Processing AI query', { question })
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: question },
],
})
const answer = completion.choices[0].message.content
logger.info('AI response generated', { question, answer })
return {
status: 200,
body: { question, answer },
}
}
Multi-Agent Financial Analysis
Build a financial analysis system with specialized agents:1. Query Coordinator
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const querySchema = z.object({
symbol: z.string(),
question: z.string(),
})
export const config = {
name: 'FinanceQueryAPI',
triggers: [{ type: 'http', method: 'POST', path: '/finance/query', bodySchema: querySchema }],
enqueues: ['finance.search', 'finance.data'],
flows: ['finance-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state, logger }) => {
const { symbol, question } = request.body
const queryId = crypto.randomUUID()
logger.info('Finance query received', { queryId, symbol, question })
// Initialize query state
await state.set('queries', queryId, {
queryId,
symbol,
question,
status: 'processing',
createdAt: new Date().toISOString(),
})
// Trigger parallel data gathering
await Promise.all([
enqueue({
topic: 'finance.search',
data: { queryId, symbol, question },
}),
enqueue({
topic: 'finance.data',
data: { queryId, symbol },
}),
])
return {
status: 202,
body: { queryId, status: 'processing' },
}
}
2. Web Search Agent
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const inputSchema = z.object({
queryId: z.string(),
symbol: z.string(),
question: z.string(),
})
export const config = {
name: 'WebSearchAgent',
triggers: [{ type: 'queue', topic: 'finance.search', input: inputSchema }],
enqueues: ['finance.analyze'],
flows: ['finance-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
const { queryId, symbol, question } = input
logger.info('Searching web for financial data', { queryId, symbol })
// Call search API (e.g., SerperDev, Brave Search)
const searchResults = await fetch('https://api.serper.dev/search', {
method: 'POST',
headers: {
'X-API-KEY': process.env.SERPER_API_KEY!,
'Content-Type': 'application/json',
},
body: JSON.stringify({
q: `${symbol} ${question} stock market analysis`,
num: 5,
}),
}).then((res) => res.json())
// Store search results
await state.update('queries', queryId, [
{
type: 'set',
path: 'searchResults',
value: searchResults.organic || [],
},
])
// Trigger analysis
await enqueue({
topic: 'finance.analyze',
data: { queryId },
})
logger.info('Web search completed', { queryId, resultCount: searchResults.organic?.length })
}
3. Financial Data Agent
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const inputSchema = z.object({
queryId: z.string(),
symbol: z.string(),
})
export const config = {
name: 'FinancialDataAgent',
triggers: [{ type: 'queue', topic: 'finance.data', input: inputSchema }],
enqueues: ['finance.analyze'],
flows: ['finance-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, state, logger }) => {
const { queryId, symbol } = input
logger.info('Fetching financial data', { queryId, symbol })
// Fetch stock data from Alpha Vantage
const stockData = await fetch(
`https://www.alphavantage.co/query?function=GLOBAL_QUOTE&symbol=${symbol}&apikey=${process.env.ALPHA_VANTAGE_API_KEY}`
).then((res) => res.json())
// Fetch company overview
const companyData = await fetch(
`https://www.alphavantage.co/query?function=OVERVIEW&symbol=${symbol}&apikey=${process.env.ALPHA_VANTAGE_API_KEY}`
).then((res) => res.json())
// Store financial data
await state.update('queries', queryId, [
{ type: 'set', path: 'stockData', value: stockData },
{ type: 'set', path: 'companyData', value: companyData },
])
// Trigger analysis
await enqueue({
topic: 'finance.analyze',
data: { queryId },
})
logger.info('Financial data fetched', { queryId, symbol })
}
4. AI Analysis Agent
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
import OpenAI from 'openai'
const inputSchema = z.object({
queryId: z.string(),
})
export const config = {
name: 'AIAnalysisAgent',
triggers: [{ type: 'queue', topic: 'finance.analyze', input: inputSchema }],
flows: ['finance-agent'],
} as const satisfies StepConfig
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
export const handler: Handlers<typeof config> = async (input, { state, logger }) => {
const { queryId } = input
// Get all gathered data
const query = await state.get<any>('queries', queryId)
if (!query) {
logger.error('Query not found', { queryId })
return
}
// Check if all data is ready
if (!query.searchResults || !query.stockData || !query.companyData) {
logger.info('Waiting for more data', { queryId })
return
}
logger.info('Analyzing with AI', { queryId })
// Prepare context for AI
const context = `
Stock Data: ${JSON.stringify(query.stockData, null, 2)}
Company Data: ${JSON.stringify(query.companyData, null, 2)}
Recent News: ${JSON.stringify(query.searchResults, null, 2)}
`
// Generate analysis
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: 'You are a financial analyst. Provide clear, actionable insights based on the data.',
},
{
role: 'user',
content: `Question: ${query.question}\n\nContext:\n${context}`,
},
],
})
const analysis = completion.choices[0].message.content
// Store final analysis
await state.update('queries', queryId, [
{ type: 'set', path: 'analysis', value: analysis },
{ type: 'set', path: 'status', value: 'completed' },
{ type: 'set', path: 'completedAt', value: new Date().toISOString() },
])
logger.info('Analysis completed', { queryId })
}
5. Results API
import type { Handlers, StepConfig } from 'motia'
export const config = {
name: 'FinanceResultAPI',
triggers: [{ type: 'http', method: 'GET', path: '/finance/result/:queryId' }],
flows: ['finance-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
const { queryId } = request.params
const result = await state.get<any>('queries', queryId)
if (!result) {
return {
status: 404,
body: { error: 'Query not found' },
}
}
return {
status: 200,
body: result,
}
}
Deep Research Agent
Build an iterative research agent with multiple analysis layers:Research Coordinator
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const bodySchema = z.object({
query: z.string(),
depth: z.number().default(2),
})
export const config = {
name: 'ResearchAPI',
triggers: [{ type: 'http', method: 'POST', path: '/research', bodySchema }],
enqueues: ['research.generate-queries'],
flows: ['research-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request }, { enqueue, state }) => {
const { query, depth } = request.body
const researchId = crypto.randomUUID()
await state.set('research', researchId, {
researchId,
query,
depth,
currentDepth: 0,
status: 'processing',
findings: [],
createdAt: new Date().toISOString(),
})
await enqueue({
topic: 'research.generate-queries',
data: { researchId, query, depth: 0 },
})
return {
status: 202,
body: { researchId, status: 'processing' },
}
}
Query Generation Agent
import type { Handlers, StepConfig } from 'motia'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const config = {
name: 'GenerateQueries',
triggers: [{ type: 'queue', topic: 'research.generate-queries' }],
enqueues: ['research.search'],
flows: ['research-agent'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (input, { enqueue, logger }) => {
const { researchId, query, depth } = input
logger.info('Generating search queries', { researchId, depth })
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: 'Generate 3-5 specific search queries to research this topic deeply.',
},
{ role: 'user', content: query },
],
})
const queries = completion.choices[0].message.content?.split('\n').filter(Boolean) || []
// Enqueue parallel searches
await Promise.all(
queries.map((searchQuery) =>
enqueue({
topic: 'research.search',
data: { researchId, searchQuery, depth },
})
)
)
logger.info('Search queries generated', { researchId, queryCount: queries.length })
}
Streaming AI Responses
Stream AI responses in real-time to clients:import type { Handlers, StepConfig } from 'motia'
import OpenAI from 'openai'
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
export const config = {
name: 'StreamingAI',
triggers: [{ type: 'http', method: 'POST', path: '/ai/stream' }],
flows: ['ai-streaming'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async ({ request, response }, { logger }) => {
const { question } = request.body
logger.info('Streaming AI response', { question })
response.status(200)
response.headers({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
connection: 'keep-alive',
})
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: question }],
stream: true,
})
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
response.stream.write(`data: ${JSON.stringify({ content })}\n\n`)
}
}
response.stream.write('data: [DONE]\n\n')
response.close()
}
AI Agent Patterns
Chain of Thought
export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
const { problem } = request.body
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: 'Think step by step to solve this problem. Show your reasoning.',
},
{ role: 'user', content: problem },
],
})
return { status: 200, body: { solution: completion.choices[0].message.content } }
}
Retrieval Augmented Generation (RAG)
export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
const { question } = request.body
// Retrieve relevant documents from state
const documents = await state.list<Document>('knowledge-base')
// Find relevant docs (simple example - use vector DB in production)
const relevant = documents.filter(doc =>
doc.content.toLowerCase().includes(question.toLowerCase())
)
const context = relevant.map(doc => doc.content).join('\n\n')
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{
role: 'system',
content: `Answer based on this context:\n${context}`,
},
{ role: 'user', content: question },
],
})
return { status: 200, body: { answer: completion.choices[0].message.content } }
}
Tool Use / Function Calling
const tools = [
{
type: 'function',
function: {
name: 'get_weather',
description: 'Get the current weather in a location',
parameters: {
type: 'object',
properties: {
location: { type: 'string', description: 'City name' },
},
required: ['location'],
},
},
},
]
export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: request.body.question }],
tools,
tool_choice: 'auto',
})
const toolCalls = completion.choices[0].message.tool_calls
if (toolCalls) {
for (const toolCall of toolCalls) {
if (toolCall.function.name === 'get_weather') {
const args = JSON.parse(toolCall.function.arguments)
const weather = await getWeather(args.location)
logger.info('Weather retrieved', { location: args.location, weather })
}
}
}
return { status: 200, body: { response: completion.choices[0].message } }
}
Best Practices
1. Error Handling
export const handler: Handlers<typeof config> = async ({ request }, { logger }) => {
try {
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: request.body.question }],
})
return { status: 200, body: { answer: completion.choices[0].message.content } }
} catch (error) {
logger.error('OpenAI API error', { error })
return {
status: 500,
body: { error: 'Failed to generate response' },
}
}
}
2. Rate Limiting
export const handler: Handlers<typeof config> = async ({ request }, { state }) => {
const userId = request.headers['x-user-id']
// Check rate limit
const usage = await state.get<{ count: number }>('rate-limits', userId)
if (usage && usage.count > 100) {
return {
status: 429,
body: { error: 'Rate limit exceeded' },
}
}
// Increment counter
await state.update('rate-limits', userId, [
{ type: 'increment', path: 'count', by: 1 },
])
// Process request...
}
3. Prompt Engineering
const SYSTEM_PROMPT = `You are a helpful financial analyst.
Guidelines:
- Provide data-driven insights
- Cite your sources
- Be clear about uncertainty
- Use specific numbers and dates
- Consider multiple perspectives`
export const handler: Handlers<typeof config> = async ({ request }) => {
const completion = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [
{ role: 'system', content: SYSTEM_PROMPT },
{ role: 'user', content: request.body.question },
],
temperature: 0.7,
max_tokens: 1000,
})
return { status: 200, body: { answer: completion.choices[0].message.content } }
}
Next Steps
- Learn about Workflows for orchestrating agents
- Explore Streaming for real-time AI updates
- Check out Observability for monitoring AI agents