Skip to main content
Workflows in Mastra are created using the createWorkflow function and composed using chainable methods to define the execution graph.

Creating a Workflow

Use createWorkflow to initialize a new workflow:
import { createWorkflow } from '@mastra/core';
import { z } from 'zod';

const workflow = createWorkflow({
  id: 'data-processing-workflow',
  inputSchema: z.object({
    userId: z.string(),
    data: z.array(z.string()),
  }),
  outputSchema: z.object({
    processedCount: z.number(),
    results: z.array(z.string()),
  }),
});

Workflow Configuration

id
string
required
Unique identifier for the workflow
inputSchema
ZodSchema
required
Zod schema defining the workflow’s input structure
outputSchema
ZodSchema
required
Zod schema defining the workflow’s expected output
stateSchema
ZodSchema
Zod schema for shared state accessible across all steps
requestContextSchema
ZodSchema
Zod schema for validating request context values
description
string
Human-readable description of the workflow’s purpose
mastra
Mastra
Mastra instance for accessing integrations, memory, and other services
options
WorkflowOptions
Additional configuration options (see below)

Workflow Options

The options parameter accepts:
const workflow = createWorkflow({
  id: 'my-workflow',
  inputSchema: z.object({ /* ... */ }),
  outputSchema: z.object({ /* ... */ }),
  options: {
    validateInputs: true, // Default: true
    shouldPersistSnapshot: ({ stepResults, workflowStatus }) => {
      // Custom logic to determine if snapshot should be saved
      return workflowStatus !== 'running';
    },
    onFinish: async (result) => {
      // Called when workflow completes (success, failed, or suspended)
      console.log('Workflow finished:', result.status);
      // Access: result.runId, result.steps, result.error, etc.
    },
    onError: async (errorInfo) => {
      // Called only when workflow fails
      console.error('Workflow failed:', errorInfo.error);
      // Access: errorInfo.runId, errorInfo.steps, errorInfo.tripwire, etc.
    },
  },
});
validateInputs
boolean
default:"true"
Enable/disable input validation against schemas
shouldPersistSnapshot
function
Function that determines when to save workflow snapshots. Receives stepResults and workflowStatus.
onFinish
function
Callback invoked when workflow execution completes (any status). Runs server-side without requiring client .watch().
onError
function
Callback invoked only when workflow fails. Runs server-side for error handling and logging.

Committing the Workflow

After defining all steps and control flow, you must call .commit() to finalize the workflow:
const workflow = createWorkflow({ /* config */ })
  .then(step1)
  .then(step2)
  .commit(); // Required before execution
Attempting to execute a workflow without calling .commit() will result in an error. Always commit your workflow after defining its structure.

Workflow State Management

Workflows can maintain shared state across all steps:
const workflow = createWorkflow({
  id: 'stateful-workflow',
  inputSchema: z.object({ input: z.string() }),
  outputSchema: z.object({ output: z.string() }),
  stateSchema: z.object({
    counter: z.number(),
    history: z.array(z.string()),
  }),
});
Steps can read and update this state:
const step = createStep({
  id: 'increment-counter',
  inputSchema: z.object({ input: z.string() }),
  outputSchema: z.object({ count: z.number() }),
  execute: async ({ state, setState }) => {
    const newCount = (state.counter || 0) + 1;
    await setState({ 
      ...state,
      counter: newCount,
      history: [...(state.history || []), 'incremented'],
    });
    return { count: newCount };
  },
});

Request Context

Request context allows you to pass request-scoped data (like user IDs, tenant IDs, auth tokens) through the workflow:
const workflow = createWorkflow({
  id: 'tenant-aware-workflow',
  inputSchema: z.object({ data: z.string() }),
  outputSchema: z.object({ result: z.string() }),
  requestContextSchema: z.object({
    userId: z.string(),
    tenantId: z.string(),
    authToken: z.string().optional(),
  }),
});
Access request context in steps:
const step = createStep({
  id: 'use-context',
  inputSchema: z.object({ data: z.string() }),
  outputSchema: z.object({ result: z.string() }),
  execute: async ({ requestContext }) => {
    const userId = requestContext.get('userId');
    const tenantId = requestContext.get('tenantId');
    // Use context values...
    return { result: `Processed for ${userId}` };
  },
});

Executing Workflows

Once committed, execute the workflow:
import { RequestContext } from '@mastra/core';

const requestContext = new RequestContext();
requestContext.set('userId', 'user-123');
requestContext.set('tenantId', 'tenant-abc');

const run = await workflow.execute({
  inputData: { userId: 'user-123', data: ['item1', 'item2'] },
  requestContext,
});

const result = await run.result();
if (result.status === 'success') {
  console.log('Output:', result.result);
} else if (result.status === 'failed') {
  console.error('Error:', result.error);
}

Registering with Mastra

For multi-tenant applications and workflow management features, register workflows with a Mastra instance:
import { Mastra } from '@mastra/core';

const mastra = new Mastra({
  workflows: {
    myWorkflow: workflow,
    // ... other workflows
  },
});

// Access registered workflow
const wf = mastra.getWorkflow('myWorkflow');

Next Steps

Defining Steps

Learn how to create workflow steps

Control Flow

Add branching and parallel execution

Build docs developers (and LLMs) love