import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'
const inputSchema = z.object({
workflowId: z.string(),
stepIndex: z.number(),
waitTime: z.number().optional(),
timestamp: z.number(),
})
export const config = {
name: 'process-workflow-step',
description: 'Processes a single step in the parallel workflow',
triggers: [
{
type: 'queue',
topic: 'workflow.step.process',
input: inputSchema,
},
],
flows: ['workflow-orchestration'],
} as const satisfies StepConfig
export const handler: Handlers<typeof config> = async (
input,
{ logger, state }
) => {
const { workflowId, stepIndex, waitTime } = input
logger.info('Processing workflow step', { workflowId, stepIndex })
// Simulate task processing with optional wait time
if (waitTime) {
logger.info(`Step ${stepIndex} waiting ${waitTime}ms`, { workflowId })
await new Promise((resolve) => setTimeout(resolve, waitTime))
}
// Perform actual work here
const result = await performStepWork(stepIndex)
// Update workflow state with completed step
await state.update<WorkflowResult>('workflows', workflowId, [
{ type: 'increment', path: 'completedSteps', by: 1 },
{
type: 'set',
path: `steps.${stepIndex}`,
value: {
result,
completedAt: Date.now(),
duration: waitTime,
},
},
])
logger.info('Workflow step completed', {
workflowId,
stepIndex,
result,
})
}
async function performStepWork(stepIndex: number) {
// Simulate different types of work
const workTypes = [
{ type: 'data_fetch', value: Math.random() * 100 },
{ type: 'computation', value: Math.random() * 1000 },
{ type: 'api_call', value: Math.random() * 50 },
]
const work = workTypes[stepIndex % workTypes.length]
return {
type: work.type,
value: work.value,
stepIndex,
message: `Step ${stepIndex} completed ${work.type}`,
}
}