Skip to main content
Orchestrate sophisticated workflows that execute multiple tasks in parallel, aggregate results, and coordinate complex business processes. This example demonstrates Motia’s powerful workflow capabilities for building scalable, production-ready automation systems.

Overview

This example shows how to:
  • Execute multiple tasks in parallel for maximum throughput
  • Coordinate workflow steps with queue-based messaging
  • Merge parallel results using state and stream triggers
  • Build fan-out/fan-in patterns for distributed processing
  • Handle workflow completion and error recovery

Architecture

The workflow orchestration system uses a fan-out/fan-in pattern:
1
1. Workflow Initiator
2
Receives workflow requests and fans out to multiple parallel tasks.
3
2. Parallel Task Processors
4
Execute independent tasks concurrently using queue triggers.
5
3. Result Aggregator
6
Collects and merges results using state or stream triggers.
7
4. Completion Handler
8
Executes final actions when all parallel tasks complete.

Implementation

Step 1: Start Parallel Workflow

Initiate a workflow with multiple parallel tasks:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  totalSteps: z.number().optional().default(5),
  waitTime: z.boolean().optional().default(false),
  waitTimeMin: z.number().optional().default(1000),
  waitTimeMax: z.number().optional().default(3000),
})

export const config = {
  name: 'start-parallel-workflow',
  description: 'Initiates a parallel workflow with multiple tasks',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/api/workflow/start',
      bodySchema,
    },
  ],
  enqueues: ['workflow.step.process'],
  flows: ['workflow-orchestration'],
} as const satisfies StepConfig

interface WorkflowResult {
  totalSteps: number
  startedAt: number
  completedSteps: number
  steps?: Record<string, any>
}

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, enqueue, state }
) => {
  const body = bodySchema.parse(request.body ?? {})
  const workflowId = crypto.randomUUID()

  logger.info('Starting parallel workflow', { workflowId, totalSteps: body.totalSteps })

  // Initialize workflow state
  await state.set<WorkflowResult>('workflows', workflowId, {
    totalSteps: body.totalSteps,
    startedAt: Date.now(),
    completedSteps: 0,
    steps: {},
  })

  // Calculate wait time if enabled
  const getWaitTime = () => {
    if (!body.waitTime) return undefined
    return body.waitTimeMin + Math.random() * (body.waitTimeMax - body.waitTimeMin)
  }

  // Fan-out: Enqueue all parallel tasks
  await Promise.all(
    Array.from({ length: body.totalSteps }, (_, stepIndex) =>
      enqueue({
        topic: 'workflow.step.process',
        data: {
          workflowId,
          stepIndex,
          waitTime: getWaitTime(),
          timestamp: Date.now(),
        },
      })
    )
  )

  logger.info('Parallel workflow started', {
    workflowId,
    steps: body.totalSteps,
  })

  return {
    status: 200,
    body: {
      message: 'Workflow started successfully',
      workflowId,
      totalSteps: body.totalSteps,
    },
  }
}

Step 2: Process Parallel Tasks

Execute individual workflow steps in parallel:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const inputSchema = z.object({
  workflowId: z.string(),
  stepIndex: z.number(),
  waitTime: z.number().optional(),
  timestamp: z.number(),
})

export const config = {
  name: 'process-workflow-step',
  description: 'Processes a single step in the parallel workflow',
  triggers: [
    {
      type: 'queue',
      topic: 'workflow.step.process',
      input: inputSchema,
    },
  ],
  flows: ['workflow-orchestration'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state }
) => {
  const { workflowId, stepIndex, waitTime } = input

  logger.info('Processing workflow step', { workflowId, stepIndex })

  // Simulate task processing with optional wait time
  if (waitTime) {
    logger.info(`Step ${stepIndex} waiting ${waitTime}ms`, { workflowId })
    await new Promise((resolve) => setTimeout(resolve, waitTime))
  }

  // Perform actual work here
  const result = await performStepWork(stepIndex)

  // Update workflow state with completed step
  await state.update<WorkflowResult>('workflows', workflowId, [
    { type: 'increment', path: 'completedSteps', by: 1 },
    {
      type: 'set',
      path: `steps.${stepIndex}`,
      value: {
        result,
        completedAt: Date.now(),
        duration: waitTime,
      },
    },
  ])

  logger.info('Workflow step completed', {
    workflowId,
    stepIndex,
    result,
  })
}

async function performStepWork(stepIndex: number) {
  // Simulate different types of work
  const workTypes = [
    { type: 'data_fetch', value: Math.random() * 100 },
    { type: 'computation', value: Math.random() * 1000 },
    { type: 'api_call', value: Math.random() * 50 },
  ]

  const work = workTypes[stepIndex % workTypes.length]
  
  return {
    type: work.type,
    value: work.value,
    stepIndex,
    message: `Step ${stepIndex} completed ${work.type}`,
  }
}

Step 3: Aggregate Results (State Trigger)

Detect workflow completion using state triggers:
import type { Handlers, StateTriggerInput, StepConfig } from 'motia'

interface WorkflowResult {
  totalSteps: number
  completedSteps: number
  startedAt: number
  steps: Record<string, any>
}

export const config = {
  name: 'workflow-completion',
  description: 'Handles workflow completion when all steps finish',
  triggers: [
    {
      type: 'state',
      condition: (input: StateTriggerInput<WorkflowResult>) => {
        return (
          input.group_id === 'workflows' &&
          !!input.new_value &&
          input.new_value.totalSteps === input.new_value.completedSteps
        )
      },
    },
  ],
  enqueues: ['workflow.completed'],
  flows: ['workflow-orchestration'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { logger, enqueue }
) => {
  const workflowId = request.item_id
  const result = request.new_value as WorkflowResult
  const endTime = Date.now()
  const duration = endTime - result.startedAt

  logger.info('Workflow completed!', {
    workflowId,
    totalSteps: result.totalSteps,
    duration,
  })

  // Aggregate all step results
  const stepResults = Object.values(result.steps)
  const aggregatedResults = {
    totalValue: stepResults.reduce((sum: number, step: any) => sum + step.result.value, 0),
    averageValue: stepResults.reduce((sum: number, step: any) => sum + step.result.value, 0) / stepResults.length,
    completionRate: 100,
    duration,
  }

  logger.info('Aggregated results', {
    workflowId,
    ...aggregatedResults,
  })

  // Enqueue completion event for further processing
  await enqueue({
    topic: 'workflow.completed',
    data: {
      workflowId,
      startedAt: result.startedAt,
      completedAt: endTime,
      duration,
      totalSteps: result.totalSteps,
      results: aggregatedResults,
      steps: result.steps,
    },
  })

  // Print success banner
  console.log('╔════════════════════════════════════════════════════════╗')
  console.log('║         WORKFLOW COMPLETED SUCCESSFULLY!               ║')
  console.log('╠════════════════════════════════════════════════════════╣')
  console.log(`║  Workflow ID: ${workflowId.slice(0, 30).padEnd(30)}  ║`)
  console.log(`║  Total Steps: ${result.totalSteps.toString().padEnd(39)} ║`)
  console.log(`║  Duration: ${duration}ms`.padEnd(56) + '║')
  console.log(`║  Total Value: ${aggregatedResults.totalValue.toFixed(2).padEnd(36)} ║`)
  console.log('╚════════════════════════════════════════════════════════╝')
}

Step 4: Stream-Based Workflow (Alternative)

Use streams for real-time workflow tracking:
import type { Handlers, StepConfig } from 'motia'
import { z } from 'zod'

const bodySchema = z.object({
  totalSteps: z.number().default(5),
})

export const config = {
  name: 'start-stream-workflow',
  description: 'Initiates workflow using streams for real-time tracking',
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/api/workflow/stream/start',
      bodySchema,
    },
  ],
  enqueues: ['workflow.stream.step.process'],
  flows: ['workflow-orchestration'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, enqueue, streams }
) => {
  const body = bodySchema.parse(request.body ?? {})
  const workflowId = crypto.randomUUID()

  logger.info('Starting stream workflow', { workflowId })

  // Initialize workflow in stream
  await streams.workflows.set('active-workflows', workflowId, {
    totalSteps: body.totalSteps,
    startedAt: Date.now(),
    completedSteps: 0,
  })

  // Fan-out to parallel processors
  await Promise.all(
    Array.from({ length: body.totalSteps }, (_, stepIndex) =>
      enqueue({
        topic: 'workflow.stream.step.process',
        data: { workflowId, stepIndex },
      })
    )
  )

  return {
    status: 200,
    body: { workflowId, totalSteps: body.totalSteps },
  }
}

Step 5: Stream-Based Completion

Handle completion with stream triggers:
import type { Handlers, StepConfig, StreamTriggerInput } from 'motia'

interface WorkflowStreamItem {
  totalSteps: number
  completedSteps: number
  startedAt: number
}

export const config = {
  name: 'stream-workflow-completion',
  description: 'Handles stream workflow completion',
  triggers: [
    {
      type: 'stream',
      streamName: 'workflows',
      groupId: 'active-workflows',
      condition: (input: StreamTriggerInput<WorkflowStreamItem>) => {
        return (
          input.event.type === 'update' &&
          input.event.data.completedSteps === input.event.data.totalSteps
        )
      },
    },
  ],
  flows: ['workflow-orchestration'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  request,
  { logger }
) => {
  const workflowId = request.id
  const data = request.event.data as WorkflowStreamItem
  const duration = Date.now() - data.startedAt

  logger.info('Stream workflow completed!', {
    workflowId,
    totalSteps: data.totalSteps,
    duration,
  })

  console.log('╔════════════════════════════════════════════════════════╗')
  console.log('║      STREAM WORKFLOW COMPLETED SUCCESSFULLY!           ║')
  console.log('╠════════════════════════════════════════════════════════╣')
  console.log(`║  Workflow ID: ${workflowId.slice(0, 30).padEnd(30)}  ║`)
  console.log(`║  Total Steps: ${data.totalSteps.toString().padEnd(39)} ║`)
  console.log(`║  Duration: ${duration}ms`.padEnd(56) + '║')
  console.log('╚════════════════════════════════════════════════════════╝')
}

Key Features

Parallel Execution

Execute hundreds of tasks concurrently with automatic queue distribution and load balancing.

State-Based Coordination

Use atomic state updates to track progress and trigger completion handlers automatically.

Stream-Based Tracking

Real-time workflow monitoring with stream triggers for instant status updates.

Error Recovery

Automatic retry logic and dead-letter queues for failed tasks.

Flexible Merge Strategies

Choose between state triggers, stream triggers, or custom merge logic based on your needs.

Testing

Test the parallel workflow:
# Start a workflow with 10 parallel steps
curl -X POST http://localhost:3111/api/workflow/start \
  -H "Content-Type: application/json" \
  -d '{
    "totalSteps": 10,
    "waitTime": true,
    "waitTimeMin": 500,
    "waitTimeMax": 2000
  }'

# Start stream-based workflow
curl -X POST http://localhost:3111/api/workflow/stream/start \
  -H "Content-Type: application/json" \
  -d '{"totalSteps": 5}'

Advanced Patterns

Dynamic Fan-Out

Determine task count at runtime:
const tasks = await fetchTasksFromDatabase()

await Promise.all(
  tasks.map((task) =>
    enqueue({
      topic: 'workflow.step.process',
      data: { workflowId, task },
    })
  )
)

Conditional Workflows

Route based on step results:
if (result.value > threshold) {
  await enqueue({ topic: 'workflow.high.priority', data: result })
} else {
  await enqueue({ topic: 'workflow.normal', data: result })
}

Nested Workflows

Orchestrate workflows within workflows:
const childWorkflowId = await startChildWorkflow(parentWorkflowId, config)

await state.update('workflows', parentWorkflowId, [
  { type: 'set', path: `children.${childWorkflowId}`, value: { status: 'running' } },
])

Timeout Handling

Detect and handle stuck workflows:
export const config = {
  name: 'workflow-timeout-check',
  triggers: [cron('*/5 * * * *')], // Every 5 minutes
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  _,
  { logger, state }
) => {
  const workflows = await state.list('workflows')
  const timeout = 30 * 60 * 1000 // 30 minutes

  for (const { id, data } of workflows) {
    if (data.completedSteps < data.totalSteps) {
      const age = Date.now() - data.startedAt
      
      if (age > timeout) {
        logger.error('Workflow timeout detected', { workflowId: id, age })
        // Handle timeout - retry, cancel, or alert
      }
    }
  }
}

Production Considerations

  1. Scalability: Distribute workers across multiple instances
  2. Error Handling: Implement retry logic with exponential backoff
  3. Monitoring: Track workflow success rates and duration metrics
  4. Resource Limits: Control concurrent task execution to prevent overload
  5. State Management: Use PostgreSQL or Redis for production state storage

Use Cases

  • Data Processing: Process large datasets in parallel chunks
  • ETL Pipelines: Extract, transform, and load data from multiple sources
  • Report Generation: Generate reports from multiple data sources concurrently
  • Image Processing: Resize, optimize, and transform images in parallel
  • Email Campaigns: Send bulk emails with parallel execution
  • Batch Jobs: Process nightly batch operations efficiently

Next Steps

  • Add workflow visualization and monitoring
  • Implement priority queues for urgent tasks
  • Build workflow templates and reusable patterns
  • Add distributed tracing for workflow debugging
  • Integrate with scheduling systems (Temporal, Airflow)

Build docs developers (and LLMs) love