Skip to main content
Mastra workflows support sophisticated control flow patterns using chainable methods like .then(), .parallel(), and .branch().

Sequential Execution with .then()

The .then() method chains steps sequentially. Each step receives the output of the previous step as input:
import { createWorkflow, createStep } from '@mastra/core';
import { z } from 'zod';

const step1 = createStep({
  id: 'fetch-user',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ name: z.string(), email: z.string() }),
  execute: async ({ inputData }) => {
    return { name: 'John Doe', email: '[email protected]' };
  },
});

const step2 = createStep({
  id: 'send-email',
  inputSchema: z.object({ name: z.string(), email: z.string() }),
  outputSchema: z.object({ sent: z.boolean() }),
  execute: async ({ inputData }) => {
    console.log(`Sending email to ${inputData.email}`);
    return { sent: true };
  },
});

const workflow = createWorkflow({
  id: 'user-notification',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ sent: z.boolean() }),
})
  .then(step1)
  .then(step2)
  .commit();
The output schema of each step must match the input schema of the next step for type safety.

Parallel Execution with .parallel()

Execute multiple steps concurrently with .parallel(). All steps receive the same input and their outputs are combined:
const fetchUserStep = createStep({
  id: 'fetch-user',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ user: z.string() }),
  execute: async ({ inputData }) => {
    return { user: `User ${inputData.userId}` };
  },
});

const fetchOrdersStep = createStep({
  id: 'fetch-orders',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ orders: z.array(z.string()) }),
  execute: async ({ inputData }) => {
    return { orders: ['order-1', 'order-2'] };
  },
});

const fetchPreferencesStep = createStep({
  id: 'fetch-preferences',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({ preferences: z.record(z.any()) }),
  execute: async ({ inputData }) => {
    return { preferences: { theme: 'dark' } };
  },
});

const workflow = createWorkflow({
  id: 'fetch-user-data',
  inputSchema: z.object({ userId: z.string() }),
  outputSchema: z.object({
    'fetch-user': z.object({ user: z.string() }),
    'fetch-orders': z.object({ orders: z.array(z.string()) }),
    'fetch-preferences': z.object({ preferences: z.record(z.any()) }),
  }),
})
  .parallel([fetchUserStep, fetchOrdersStep, fetchPreferencesStep])
  .commit();

Parallel Output Structure

The output of .parallel() is an object where keys are step IDs and values are their outputs:
{
  'fetch-user': { user: 'User 123' },
  'fetch-orders': { orders: ['order-1', 'order-2'] },
  'fetch-preferences': { preferences: { theme: 'dark' } }
}

Conditional Execution with .branch()

Execute different steps based on runtime conditions using .branch():
const premiumStep = createStep({
  id: 'premium-flow',
  inputSchema: z.object({ userType: z.string() }),
  outputSchema: z.object({ message: z.string() }),
  execute: async () => {
    return { message: 'Premium features enabled' };
  },
});

const freeStep = createStep({
  id: 'free-flow',
  inputSchema: z.object({ userType: z.string() }),
  outputSchema: z.object({ message: z.string() }),
  execute: async () => {
    return { message: 'Free tier features' };
  },
});

const workflow = createWorkflow({
  id: 'user-onboarding',
  inputSchema: z.object({ userType: z.string() }),
  outputSchema: z.object({
    'premium-flow'?: z.object({ message: z.string() }),
    'free-flow'?: z.object({ message: z.string() }),
  }),
})
  .branch([
    [
      // Condition function
      async ({ inputData }) => inputData.userType === 'premium',
      premiumStep,
    ],
    [
      async ({ inputData }) => inputData.userType === 'free',
      freeStep,
    ],
  ])
  .commit();

Branch Conditions

Condition functions receive the execution context and return a boolean:
async ({
  inputData,      // Input to this branch point
  state,          // Workflow state
  runId,          // Workflow run ID
  getStepResult,  // Access previous step outputs
  getInitData,    // Get workflow input data
  // ... other context
}) => boolean
Multiple conditions can evaluate to true, and all matching branches will execute. The output will contain results from all executed branches.

Loops with .dowhile() and .dountil()

Repeat a step until a condition is met:
const retryStep = createStep({
  id: 'fetch-with-retry',
  inputSchema: z.object({ url: z.string() }),
  outputSchema: z.object({ success: z.boolean(), data: z.any() }),
  execute: async ({ inputData }) => {
    const response = await fetch(inputData.url);
    return {
      success: response.ok,
      data: response.ok ? await response.json() : null,
    };
  },
});

const workflow = createWorkflow({
  id: 'retry-workflow',
  inputSchema: z.object({ url: z.string() }),
  outputSchema: z.object({ success: z.boolean(), data: z.any() }),
})
  .dountil(
    retryStep,
    async ({ inputData, iterationCount }) => {
      // Stop when success or max retries reached
      return inputData.success || iterationCount >= 3;
    }
  )
  .commit();

Loop Types

// Execute step, then check condition
// Continue while condition is TRUE
workflow.dowhile(
  step,
  async ({ inputData, iterationCount }) => {
    return iterationCount < 5; // Continue while true
  }
)

Mapping Data with .map()

Transform data between steps using .map():
const workflow = createWorkflow({
  id: 'data-transform',
  inputSchema: z.object({ value: z.number() }),
  outputSchema: z.object({ doubled: z.number() }),
})
  .then(step1)
  .map(async ({ inputData }) => {
    // Transform the previous output
    return {
      doubled: inputData.value * 2,
    };
  })
  .commit();
You can also map specific step outputs:
const workflow = createWorkflow({ /* ... */ })
  .then(step1)
  .then(step2)
  .map({
    // Map specific fields from previous steps
    userId: { step: step1, path: 'userId' },
    result: { step: step2, path: 'output.data' },
    // Add constant values
    timestamp: { value: new Date().toISOString(), schema: z.string() },
  })
  .commit();

Sleep and Delays

Add delays between steps:
const workflow = createWorkflow({ /* ... */ })
  .then(step1)
  .sleep(5000) // Wait 5 seconds
  .then(step2)
  .sleepUntil(new Date('2024-12-31T00:00:00Z')) // Wait until specific time
  .then(step3)
  .commit();
You can also use dynamic delays:
workflow
  .sleep(async ({ inputData }) => {
    // Calculate delay based on data
    return inputData.priority === 'high' ? 1000 : 5000;
  })

Complex Control Flow Example

Combine multiple patterns for sophisticated workflows:
const workflow = createWorkflow({
  id: 'order-processing',
  inputSchema: z.object({
    orderId: z.string(),
    userId: z.string(),
    priority: z.string(),
  }),
  outputSchema: z.object({ status: z.string() }),
})
  // Fetch order and user in parallel
  .parallel([fetchOrderStep, fetchUserStep])
  
  // Map to next step's input
  .map(async ({ inputData }) => ({
    orderId: inputData['fetch-order'].orderId,
    userEmail: inputData['fetch-user'].email,
  }))
  
  // Branch based on order priority
  .branch([
    [
      async ({ inputData }) => inputData['fetch-order'].priority === 'high',
      expressShippingStep,
    ],
    [
      async ({ inputData }) => inputData['fetch-order'].priority === 'normal',
      standardShippingStep,
    ],
  ])
  
  // Send notification
  .then(sendNotificationStep)
  
  .commit();

Next Steps

Suspend & Resume

Add human-in-the-loop capabilities

Creating Workflows

Learn workflow configuration options

Build docs developers (and LLMs) love