Skip to main content
Workflows in Mastra provide type-safe, composable task execution with built-in error handling, state management, and suspend/resume capabilities. They enable building complex execution flows with branching logic and parallel operations.

Core Concept

Workflows are:
  • Graph-based: Chain steps with .then(), .branch(), .parallel()
  • Type-safe: Full TypeScript inference across step inputs/outputs
  • Resumable: Suspend execution and resume from any step
  • Composable: Nest workflows within workflows
  • Observable: Track execution with observability spans

Basic Workflow

Create a simple sequential workflow:
import { createWorkflow, createStep } from '@mastra/core/workflows';
import { z } from 'zod';

const validateStep = createStep({
  id: 'validate',
  inputSchema: z.object({ email: z.string() }),
  outputSchema: z.object({ valid: z.boolean() }),
  execute: async ({ email }) => {
    return { valid: email.includes('@') };
  }
});

const saveStep = createStep({
  id: 'save',
  inputSchema: z.object({ email: z.string(), valid: z.boolean() }),
  outputSchema: z.object({ saved: z.boolean() }),
  execute: async ({ email, valid }) => {
    if (!valid) throw new Error('Invalid email');
    await db.users.create({ email });
    return { saved: true };
  }
});

const workflow = createWorkflow({
  id: 'user-signup',
  inputSchema: z.object({ email: z.string() })
})
  .then(validateStep)
  .then(saveStep)
  .commit();

// Execute workflow
const run = await workflow.createRun({
  inputData: { email: '[email protected]' }
});
const result = await run.start();
console.log(result.output); // { saved: true }

Workflow Configuration

interface WorkflowConfig<TInputSchema, TOutputSchema> {
  id: string;                    // Unique identifier
  inputSchema: TInputSchema;     // Zod schema for input
  outputSchema?: TOutputSchema;  // Optional output schema
  stateSchema?: ZodSchema;       // Workflow state schema
  type?: 'default' | 'processor'; // Workflow type
  options?: {
    validateInputs?: boolean;    // Validate step inputs
    tracingPolicy?: TracingPolicy; // Observability config
  };
}

Chaining Steps

Sequential Execution

Use .then() to chain steps sequentially:
const workflow = createWorkflow({
  id: 'data-pipeline',
  inputSchema: z.object({ rawData: z.string() })
})
  .then(parseStep)      // Step 1: Parse data
  .then(validateStep)   // Step 2: Validate
  .then(transformStep)  // Step 3: Transform
  .then(saveStep)       // Step 4: Save
  .commit();
Each step receives the output of the previous step.

Dynamic Mapping

Map specific fields from previous steps:
import { mapVariable } from '@mastra/core/workflows';

const workflow = createWorkflow({
  id: 'extract-process',
  inputSchema: z.object({ url: z.string() })
})
  .then(fetchStep)     // Returns { data, metadata }
  .then(
    processStep,       // Only needs 'data' field
    {
      data: mapVariable({ step: fetchStep, path: 'data' })
    }
  )
  .commit();

Branching

Conditionally execute different steps:
const workflow = createWorkflow({
  id: 'user-approval',
  inputSchema: z.object({ userId: z.string() })
})
  .then(checkUserStep)  // Returns { approved: boolean }
  .branch({
    // Condition function
    when: ({ checkUserStep }) => checkUserStep.approved,
    // Steps to run if true
    then: [approveStep, notifyStep],
    // Steps to run if false (optional)
    otherwise: [rejectStep]
  })
  .commit();

Multiple Branches

const workflow = createWorkflow({
  id: 'payment-router',
  inputSchema: z.object({ amount: z.number(), method: z.string() })
})
  .then(validatePaymentStep)
  .branch({
    when: ({ validatePaymentStep }) => 
      validatePaymentStep.method === 'card',
    then: [processCardStep]
  })
  .branch({
    when: ({ validatePaymentStep }) => 
      validatePaymentStep.method === 'bank',
    then: [processBankStep]
  })
  .commit();

Parallel Execution

Run multiple steps concurrently:
const workflow = createWorkflow({
  id: 'data-enrichment',
  inputSchema: z.object({ userId: z.string() })
})
  .then(fetchUserStep)  // Get user data
  .parallel([
    fetchOrdersStep,    // Fetch orders concurrently
    fetchPreferencesStep, // Fetch preferences concurrently
    fetchActivityStep   // Fetch activity concurrently
  ])
  .then(combineDataStep) // Combine results
  .commit();
All parallel steps receive the same input and execute concurrently.

Agent Steps

Wrap agents as workflow steps:
import { Agent } from '@mastra/core/agent';

const summaryAgent = new Agent({
  id: 'summarizer',
  instructions: 'Summarize the content concisely',
  model: 'openai/gpt-5'
});

const workflow = createWorkflow({
  id: 'content-pipeline',
  inputSchema: z.object({ url: z.string() })
})
  .then(fetchContentStep)
  .then(
    createStep(summaryAgent, {
      structuredOutput: {
        schema: z.object({
          summary: z.string(),
          keyPoints: z.array(z.string())
        })
      }
    }),
    { prompt: mapVariable({ step: fetchContentStep, path: 'content' }) }
  )
  .commit();

Tool Steps

Use tools as workflow steps:
import { createTool } from '@mastra/core/tools';

const weatherTool = createTool({
  id: 'get-weather',
  description: 'Get weather for location',
  inputSchema: z.object({ location: z.string() }),
  execute: async ({ location }) => {
    return await fetchWeather(location);
  }
});

const workflow = createWorkflow({
  id: 'weather-report',
  inputSchema: z.object({ city: z.string() })
})
  .then(
    createStep(weatherTool),
    { location: mapVariable({ initData: workflow, path: 'city' }) }
  )
  .commit();

State Management

Maintain workflow state across steps:
const workflow = createWorkflow({
  id: 'stateful-workflow',
  inputSchema: z.object({ items: z.array(z.string()) }),
  stateSchema: z.object({ processed: z.number() })
})
  .then(createStep({
    id: 'process',
    execute: async ({ items }, { state, setState }) => {
      const processed = (state?.processed || 0) + items.length;
      setState({ processed });
      return { processed };
    }
  }))
  .commit();

const run = await workflow.createRun({
  inputData: { items: ['a', 'b', 'c'] },
  initialState: { processed: 0 }
});

Suspend and Resume

Suspend execution for human approval or external input:
const approvalStep = createStep({
  id: 'approval',
  inputSchema: z.object({ amount: z.number() }),
  outputSchema: z.object({ approved: z.boolean() }),
  suspendSchema: z.object({ approvalId: z.string() }),
  resumeSchema: z.object({ approved: z.boolean() }),
  execute: async ({ amount }, { suspend, resumeData }) => {
    // If resuming, use resume data
    if (resumeData) {
      return { approved: resumeData.approved };
    }
    
    // Otherwise suspend for approval
    const approvalId = await createApprovalRequest(amount);
    await suspend({ approvalId });
    
    // This code won't execute until resumed
    return { approved: false };
  }
});

const workflow = createWorkflow({
  id: 'payment-approval',
  inputSchema: z.object({ amount: z.number() })
})
  .then(approvalStep)
  .then(processPaymentStep)
  .commit();

// Start workflow
const run = await workflow.createRun({
  inputData: { amount: 10000 }
});
await run.start(); // Suspends at approval step

// Later, resume with approval decision
await run.resume({
  runId: run.id,
  resumeData: { approved: true }
});

Error Handling

Handle step failures with retries:
const retryableStep = createStep(
  {
    id: 'fetch-api',
    execute: async () => {
      const response = await fetch('https://api.example.com/data');
      if (!response.ok) throw new Error('API failed');
      return response.json();
    }
  },
  { retries: 3 } // Retry up to 3 times
);

const workflow = createWorkflow({
  id: 'api-workflow',
  inputSchema: z.object({})
})
  .then(retryableStep)
  .commit();

try {
  const result = await run.start();
} catch (error) {
  console.error('Workflow failed:', error);
}

Nested Workflows

Compose workflows within workflows:
const dataWorkflow = createWorkflow({
  id: 'data-processing',
  inputSchema: z.object({ data: z.string() })
})
  .then(parseStep)
  .then(validateStep)
  .commit();

const mainWorkflow = createWorkflow({
  id: 'main',
  inputSchema: z.object({ rawData: z.string() })
})
  .then(fetchStep)
  .then(
    dataWorkflow,  // Nest the data workflow
    { data: mapVariable({ step: fetchStep, path: 'content' }) }
  )
  .then(saveStep)
  .commit();

Streaming Output

Stream workflow progress in real-time:
const run = await workflow.createRun({
  inputData: { query: 'search term' }
});

const stream = await run.streamStart();

for await (const chunk of stream) {
  if (chunk.type === 'step-start') {
    console.log(`Starting step: ${chunk.stepId}`);
  }
  if (chunk.type === 'step-finish') {
    console.log(`Finished step: ${chunk.stepId}`, chunk.output);
  }
}

Workflow Runs

Manage workflow execution:
// Create run
const run = await workflow.createRun({
  inputData: { email: '[email protected]' },
  initialState: { count: 0 },
  requestContext: ctx
});

// Start execution
const result = await run.start();

// Get run status
const status = await run.getStatus();
console.log(status); // 'running' | 'completed' | 'failed' | 'suspended'

// Cancel run
await run.cancel();

// Resume suspended run
await run.resume({ resumeData: { approved: true } });

Observability

Workflows automatically create spans for observability:
const workflow = createWorkflow({
  id: 'traced-workflow',
  inputSchema: z.object({ data: z.string() }),
  options: {
    tracingPolicy: {
      internal: false  // Mark spans as user-facing
    }
  }
})
  .then(step1)
  .then(step2)
  .commit();
Spans include:
  • Workflow execution span
  • Step execution spans
  • Step input/output data
  • Timing information

Best Practices

Use clear, descriptive IDs for better observability:
// Good
createStep({ id: 'validate-email', ... })
createStep({ id: 'send-notification', ... })

// Avoid
createStep({ id: 'step1', ... })
createStep({ id: 's2', ... })
Use retries for transient failures, error handlers for expected errors:
const workflow = createWorkflow(...)
  .then(createStep(fetchStep), { retries: 3 })
  .branch({
    when: ({ fetchStep }) => fetchStep.status === 'failed',
    then: [logErrorStep, notifyStep]
  })
  .commit();
Execute independent operations concurrently:
// Good: Parallel execution
workflow.parallel([
  fetchUserStep,
  fetchOrdersStep,
  fetchPreferencesStep
])

// Avoid: Sequential when unnecessary
workflow
  .then(fetchUserStep)
  .then(fetchOrdersStep)
  .then(fetchPreferencesStep)
Store workflow state for multi-step coordination:
const workflow = createWorkflow({
  stateSchema: z.object({
    attempts: z.number(),
    lastError: z.string().optional()
  })
})
  .then(createStep({
    execute: async (input, { state, setState }) => {
      const attempts = (state?.attempts || 0) + 1;
      setState({ attempts });
      // ...
    }
  }))
  .commit();

Build docs developers (and LLMs) love