Workflows in Mastra are created using the createWorkflow function and composed using chainable methods to define the execution graph.
Creating a Workflow
Use createWorkflow to initialize a new workflow:
import { createWorkflow } from '@mastra/core';
import { z } from 'zod';
const workflow = createWorkflow({
id: 'data-processing-workflow',
inputSchema: z.object({
userId: z.string(),
data: z.array(z.string()),
}),
outputSchema: z.object({
processedCount: z.number(),
results: z.array(z.string()),
}),
});
Workflow Configuration
Unique identifier for the workflow
Zod schema defining the workflow’s input structure
Zod schema defining the workflow’s expected output
Zod schema for shared state accessible across all steps
Zod schema for validating request context values
Human-readable description of the workflow’s purpose
Mastra instance for accessing integrations, memory, and other services
Additional configuration options (see below)
Workflow Options
The options parameter accepts:
const workflow = createWorkflow({
id: 'my-workflow',
inputSchema: z.object({ /* ... */ }),
outputSchema: z.object({ /* ... */ }),
options: {
validateInputs: true, // Default: true
shouldPersistSnapshot: ({ stepResults, workflowStatus }) => {
// Custom logic to determine if snapshot should be saved
return workflowStatus !== 'running';
},
onFinish: async (result) => {
// Called when workflow completes (success, failed, or suspended)
console.log('Workflow finished:', result.status);
// Access: result.runId, result.steps, result.error, etc.
},
onError: async (errorInfo) => {
// Called only when workflow fails
console.error('Workflow failed:', errorInfo.error);
// Access: errorInfo.runId, errorInfo.steps, errorInfo.tripwire, etc.
},
},
});
Enable/disable input validation against schemas
Function that determines when to save workflow snapshots. Receives stepResults and workflowStatus.
Callback invoked when workflow execution completes (any status). Runs server-side without requiring client .watch().
Callback invoked only when workflow fails. Runs server-side for error handling and logging.
Committing the Workflow
After defining all steps and control flow, you must call .commit() to finalize the workflow:
const workflow = createWorkflow({ /* config */ })
.then(step1)
.then(step2)
.commit(); // Required before execution
Attempting to execute a workflow without calling .commit() will result in an error. Always commit your workflow after defining its structure.
Workflow State Management
Workflows can maintain shared state across all steps:
const workflow = createWorkflow({
id: 'stateful-workflow',
inputSchema: z.object({ input: z.string() }),
outputSchema: z.object({ output: z.string() }),
stateSchema: z.object({
counter: z.number(),
history: z.array(z.string()),
}),
});
Steps can read and update this state:
const step = createStep({
id: 'increment-counter',
inputSchema: z.object({ input: z.string() }),
outputSchema: z.object({ count: z.number() }),
execute: async ({ state, setState }) => {
const newCount = (state.counter || 0) + 1;
await setState({
...state,
counter: newCount,
history: [...(state.history || []), 'incremented'],
});
return { count: newCount };
},
});
Request Context
Request context allows you to pass request-scoped data (like user IDs, tenant IDs, auth tokens) through the workflow:
const workflow = createWorkflow({
id: 'tenant-aware-workflow',
inputSchema: z.object({ data: z.string() }),
outputSchema: z.object({ result: z.string() }),
requestContextSchema: z.object({
userId: z.string(),
tenantId: z.string(),
authToken: z.string().optional(),
}),
});
Access request context in steps:
const step = createStep({
id: 'use-context',
inputSchema: z.object({ data: z.string() }),
outputSchema: z.object({ result: z.string() }),
execute: async ({ requestContext }) => {
const userId = requestContext.get('userId');
const tenantId = requestContext.get('tenantId');
// Use context values...
return { result: `Processed for ${userId}` };
},
});
Executing Workflows
Once committed, execute the workflow:
import { RequestContext } from '@mastra/core';
const requestContext = new RequestContext();
requestContext.set('userId', 'user-123');
requestContext.set('tenantId', 'tenant-abc');
const run = await workflow.execute({
inputData: { userId: 'user-123', data: ['item1', 'item2'] },
requestContext,
});
const result = await run.result();
if (result.status === 'success') {
console.log('Output:', result.result);
} else if (result.status === 'failed') {
console.error('Error:', result.error);
}
Registering with Mastra
For multi-tenant applications and workflow management features, register workflows with a Mastra instance:
import { Mastra } from '@mastra/core';
const mastra = new Mastra({
workflows: {
myWorkflow: workflow,
// ... other workflows
},
});
// Access registered workflow
const wf = mastra.getWorkflow('myWorkflow');
Next Steps
Defining Steps
Learn how to create workflow steps
Control Flow
Add branching and parallel execution