Skip to main content

Overview

The Workflow module provides a declarative workflow engine that orchestrates multi-step agent processes. It supports sequential, parallel (fanout), conditional, and loop execution modes with automatic error handling and retry logic.

Core Functions

workflow::create

Register a workflow definition with steps and execution modes.
workflow
Workflow
required
id
string
Optional workflow ID (auto-generated if not provided)
name
string
required
Human-readable workflow name
description
string
required
Workflow description
steps
WorkflowStep[]
required
Array of workflow steps (see WorkflowStep structure below)
id
string
The created workflow’s unique identifier
const { id } = await trigger('workflow::create', {
  name: 'Document Analysis Pipeline',
  description: 'Extract, analyze, and summarize documents',
  steps: [
    {
      name: 'extract',
      functionId: 'tool::file_read',
      mode: 'sequential',
      timeoutMs: 30000,
      errorMode: 'fail',
      outputVar: 'document'
    },
    {
      name: 'analyze',
      functionId: 'agent::chat',
      promptTemplate: 'Analyze this document: {{document}}',
      mode: 'sequential',
      timeoutMs: 120000,
      errorMode: 'retry',
      maxRetries: 3,
      outputVar: 'analysis'
    },
    {
      name: 'summarize',
      functionId: 'agent::chat',
      promptTemplate: 'Summarize: {{analysis}}',
      mode: 'sequential',
      timeoutMs: 60000,
      errorMode: 'fail',
      outputVar: 'summary'
    }
  ]
});

console.log(`Created workflow: ${id}`);
Location: workflow.ts:42 HTTP Endpoint: POST /api/workflows (requires authentication)

workflow::run

Execute a workflow with input data.
workflowId
string
required
The workflow identifier to execute
input
any
required
Initial input data for the workflow
agentId
string
Optional agent ID for capability enforcement
result
object
runId
string
Unique identifier for this workflow run
results
StepResult[]
Array of results from each step execution
vars
Record<string, unknown>
Final variable state after workflow completion
const result = await trigger('workflow::run', {
  workflowId: 'wf-123',
  input: {
    path: 'documents/report.pdf'
  },
  agentId: 'agent-456'
});

console.log(`Run ID: ${result.runId}`);
result.results.forEach(step => {
  console.log(`${step.stepName}: ${step.durationMs}ms`);
  if (step.error) {
    console.error(`  Error: ${step.error}`);
  }
});

console.log('Final summary:', result.vars.summary);
Security: If agentId is provided, validates agent has capabilities for all workflow steps before execution. Location: workflow.ts:61 HTTP Endpoint: POST /api/workflows/run (requires authentication)

workflow::list

List all registered workflow definitions.
workflows
array
Array of workflow definitions
const workflows = await trigger('workflow::list', {});

workflows.forEach(wf => {
  console.log(`${wf.name} (${wf.id}): ${wf.steps.length} steps`);
});
Location: workflow.ts:302 HTTP Endpoint: GET /api/workflows (requires authentication)

workflow::runs

List execution history for a specific workflow with pagination.
workflowId
string
required
The workflow to query runs for
limit
number
Maximum results to return (default from safePagination)
offset
number
Offset for pagination (default: 0)
result
object
runs
array
Array of workflow run records
total
number
Total number of runs for this workflow
limit
number
Applied limit value
offset
number
Applied offset value
const result = await trigger('workflow::runs', {
  workflowId: 'wf-123',
  limit: 50,
  offset: 0
});

console.log(`Showing ${result.runs.length} of ${result.total} runs`);
result.runs.forEach(run => {
  console.log(`${run.runId}: ${run.status} - ${run.startedAt}`);
});
Location: workflow.ts:314

Workflow Step Modes

Workflow steps support different execution modes for complex orchestration patterns.

Sequential Mode

Executes steps one at a time, passing output to the next step as input.
{
  name: 'process_data',
  functionId: 'agent::chat',
  mode: 'sequential',
  promptTemplate: 'Process this data: {{input}}',
  timeoutMs: 60000,
  errorMode: 'fail',
  outputVar: 'processed'
}
Behavior:
  • Executes function with current vars.input
  • If outputVar is set, stores result in vars[outputVar]
  • Sets vars.input to the output for next step

Fanout Mode

Executes multiple steps in parallel for concurrent processing.
steps: [
  {
    name: 'analyze_sentiment',
    functionId: 'agent::chat',
    mode: 'fanout',
    promptTemplate: 'Analyze sentiment: {{input}}',
    timeoutMs: 30000,
    errorMode: 'skip',
    outputVar: 'sentiment'
  },
  {
    name: 'extract_entities',
    functionId: 'agent::chat',
    mode: 'fanout',
    promptTemplate: 'Extract entities: {{input}}',
    timeoutMs: 30000,
    errorMode: 'skip',
    outputVar: 'entities'
  },
  {
    name: 'summarize',
    functionId: 'agent::chat',
    mode: 'fanout',
    promptTemplate: 'Summarize: {{input}}',
    timeoutMs: 30000,
    errorMode: 'skip',
    outputVar: 'summary'
  }
]
Behavior:
  • Collects consecutive fanout steps into a batch
  • Executes all fanout steps in parallel using Promise.all()
  • Stores array of results in vars.__fanout
  • Individual results stored in their respective outputVar fields
Use Case: Run multiple analyses on the same input simultaneously.

Collect Mode

Collects and processes results from a previous fanout operation.
{
  name: 'merge_results',
  functionId: 'agent::chat',
  mode: 'collect',
  promptTemplate: 'Merge these analyses: {{__fanout}}',
  timeoutMs: 60000,
  errorMode: 'fail',
  outputVar: 'merged'
}
Behavior:
  • Receives vars.__fanout array from previous fanout steps
  • Passes fanout results to function via fanoutResults parameter
  • Typically used to aggregate/merge parallel results

Conditional Mode

Executes step only if a condition is met (simple substring matching).
{
  name: 'handle_error',
  functionId: 'agent::chat',
  mode: 'conditional',
  condition: 'error',  // Execute only if previous output contains "error"
  promptTemplate: 'Handle this error: {{input}}',
  timeoutMs: 30000,
  errorMode: 'skip',
  outputVar: 'error_handled'
}
Behavior:
  • Checks if previous step’s output contains condition string (case-insensitive)
  • If condition not met, step is skipped (result marked as “skipped”)
  • If condition met, executes normally

Loop Mode

Repeats a step until a condition is met or max iterations reached.
{
  name: 'refine_output',
  functionId: 'agent::chat',
  mode: 'loop',
  maxIterations: 5,
  until: 'complete',  // Stop when output contains "complete"
  promptTemplate: 'Refine this result: {{input}}',
  timeoutMs: 60000,
  errorMode: 'fail',
  outputVar: 'refined'
}
Behavior:
  • Executes function repeatedly up to maxIterations (default: 10)
  • Passes current iteration number to function
  • Updates vars.input and outputVar on each iteration
  • Stops early if output contains until string (case-insensitive)
Use Case: Iterative refinement, polling for completion, retries with feedback.

Error Handling Modes

Each step can specify how to handle errors:

Fail Mode (default)

{ errorMode: 'fail' }
  • Stops workflow execution immediately on error
  • Marks workflow run as “failed”
  • Throws the error

Skip Mode

{ errorMode: 'skip' }
  • Logs error in step result but continues workflow
  • Sets step output to null
  • Useful for optional steps or graceful degradation

Retry Mode

{ errorMode: 'retry', maxRetries: 3 }
  • Retries step up to maxRetries times (default: 3)
  • If all retries fail, marks workflow as failed
  • Useful for transient failures (network errors, rate limits)

WorkflowStep Interface

interface WorkflowStep {
  name: string;                // Step identifier
  functionId: string;          // Function to invoke (e.g., "agent::chat")
  promptTemplate?: string;     // Template with {{variable}} interpolation
  mode: StepMode;              // "sequential" | "fanout" | "collect" | "conditional" | "loop"
  timeoutMs: number;           // Execution timeout in milliseconds
  errorMode: ErrorMode;        // "fail" | "skip" | "retry"
  maxRetries?: number;         // Retry limit for "retry" error mode
  outputVar?: string;          // Variable name to store result
  condition?: string;          // Substring to check for "conditional" mode
  maxIterations?: number;      // Loop limit for "loop" mode
  until?: string;              // Exit condition for "loop" mode
}

StepResult Interface

interface StepResult {
  stepName: string;       // Step that executed
  output: unknown;        // Step output (or "skipped" for conditional skip)
  durationMs: number;     // Execution time in milliseconds
  error?: string;         // Error message if step failed
}

Template Interpolation

Workflow steps support variable interpolation in promptTemplate using {{variable}} syntax:
interpolate('Process {{input}} and output to {{outputVar}}', {
  input: 'data.json',
  outputVar: 'result'
})
// Returns: "Process data.json and output to result"
Available Variables:
  • {{input}} - Current input value
  • {{__fanout}} - Array of fanout results
  • {{variableName}} - Any variable stored via outputVar
  • Non-existent variables remain as {{name}}
Location: workflow.ts:355

Complete Workflow Example

const { id } = await trigger('workflow::create', {
  name: 'Content Analysis Pipeline',
  description: 'Extract, analyze in parallel, and merge results',
  steps: [
    // Step 1: Extract content
    {
      name: 'fetch_content',
      functionId: 'tool::web_fetch',
      mode: 'sequential',
      timeoutMs: 30000,
      errorMode: 'retry',
      maxRetries: 3,
      outputVar: 'content'
    },
    
    // Steps 2-4: Parallel analysis (fanout)
    {
      name: 'analyze_sentiment',
      functionId: 'agent::chat',
      mode: 'fanout',
      promptTemplate: 'Analyze sentiment of: {{content}}',
      timeoutMs: 60000,
      errorMode: 'skip',
      outputVar: 'sentiment'
    },
    {
      name: 'extract_keywords',
      functionId: 'agent::chat',
      mode: 'fanout',
      promptTemplate: 'Extract keywords from: {{content}}',
      timeoutMs: 60000,
      errorMode: 'skip',
      outputVar: 'keywords'
    },
    {
      name: 'categorize',
      functionId: 'agent::chat',
      mode: 'fanout',
      promptTemplate: 'Categorize this content: {{content}}',
      timeoutMs: 60000,
      errorMode: 'skip',
      outputVar: 'category'
    },
    
    // Step 5: Collect and merge results
    {
      name: 'merge_analysis',
      functionId: 'agent::chat',
      mode: 'collect',
      promptTemplate: 'Merge these analyses into a report: {{__fanout}}',
      timeoutMs: 90000,
      errorMode: 'fail',
      outputVar: 'report'
    },
    
    // Step 6: Conditional quality check
    {
      name: 'quality_check',
      functionId: 'agent::chat',
      mode: 'conditional',
      condition: 'low confidence',
      promptTemplate: 'Review and improve this report: {{report}}',
      timeoutMs: 60000,
      errorMode: 'skip',
      outputVar: 'reviewed_report'
    },
    
    // Step 7: Loop to refine
    {
      name: 'refine',
      functionId: 'agent::chat',
      mode: 'loop',
      maxIterations: 3,
      until: 'approved',
      promptTemplate: 'Refine report (iteration {{iteration}}): {{input}}',
      timeoutMs: 60000,
      errorMode: 'fail',
      outputVar: 'final_report'
    }
  ]
});

// Execute the workflow
const result = await trigger('workflow::run', {
  workflowId: id,
  input: { url: 'https://example.com/article' },
  agentId: 'agent-123'
});

console.log('Final report:', result.vars.final_report);

HTTP Endpoints

  • POST /api/workflowsworkflow::create (requires auth)
  • GET /api/workflowsworkflow::list (requires auth)
  • POST /api/workflows/runworkflow::run (requires auth)
All endpoints use the requireAuth() middleware for security.

Build docs developers (and LLMs) love