Skip to main content

Workflow DSL

The Workflow DSL (Domain Specific Language) allows you to define workflows declaratively using YAML or TOML configuration files. This provides a more accessible way to build workflows without writing Rust code.

Overview

The Workflow DSL supports:
  • Declarative configuration in YAML or TOML
  • Agent registry for reusable LLM agents
  • Environment variable substitution
  • Multiple node types (start, end, task, agent, condition, parallel, join, loop, transform, sub-workflow, wait)
  • Conditional and parallel edges
  • Retry policies and timeouts
  • Validation before execution

Basic Structure

metadata:
  id: workflow_id
  name: Workflow Name
  description: What this workflow does
  version: "1.0.0"

config:
  max_parallel: 10
  default_timeout_ms: 60000

agents:
  agent_name:
    model: gpt-4
    system_prompt: "System instructions"
    temperature: 0.7

nodes:
  - type: start
    id: start
  
  - type: llm_agent
    id: process
    name: Process Input
    agent:
      agent_id: agent_name
  
  - type: end
    id: end

edges:
  - from: start
    to: process
  - from: process
    to: end

WorkflowDefinition Schema

Metadata

metadata
WorkflowMetadata
required
Workflow metadata and identification
id
string
required
Unique workflow identifier
name
string
required
Human-readable workflow name
description
string
Workflow description
version
string
Workflow version (e.g., “1.0.0”)
author
string
Workflow author
tags
array<string>
Tags for categorization

Configuration

config
WorkflowConfig
Workflow-level configuration
max_parallel
number
default:"10"
Maximum parallel node executions
default_timeout_ms
number
default:"60000"
Default timeout for nodes in milliseconds
enable_checkpoints
boolean
default:"false"
Enable execution checkpoints
retry_policy
RetryPolicy
Default retry policy for all nodes

Agents

agents
map<string, LlmAgentConfig>
Registry of reusable LLM agents
agents:
  classifier:
    model: gpt-4
    system_prompt: "You are a classifier"
    temperature: 0.3
    max_tokens: 50
    context_window_size: 10
    user_id: "user123"
    tenant_id: "tenant456"
model
string
required
Model identifier (e.g., “gpt-4”, “gpt-3.5-turbo”)
system_prompt
string
System prompt for the agent
temperature
number
Temperature parameter (0.0 - 2.0)
max_tokens
number
Maximum tokens to generate
context_window_size
number
Number of conversation rounds to keep in context
user_id
string
User ID for persistence
tenant_id
string
Tenant ID for multi-tenancy

Node Definitions

Start Node

type: start
node
Workflow entry point
- type: start
  id: start
  name: Start  # optional
id
string
required
Node identifier
name
string
Human-readable name

End Node

type: end
node
Workflow exit point
- type: end
  id: end
  name: End  # optional

Task Node

type: task
node
Custom task execution
- type: task
  id: process
  name: Process Data
  executor_type: none  # or function, http, script
  config:
    timeout_ms: 30000
    retry_policy:
      max_retries: 3
      retry_delay_ms: 1000
    metadata:
      priority: high
id
string
required
Node identifier
name
string
required
Human-readable name
executor_type
enum
required
Executor type:
  • none: No-op (pass-through)
  • function: Function executor (code-defined)
  • http: HTTP request executor
  • script: Rhai script executor
config
NodeConfigDef
Node configuration (see Node Configuration)

LLM Agent Node

type: llm_agent
node
Invokes an LLM agent
- type: llm_agent
  id: classify
  name: Classify Query
  agent:
    agent_id: classifier  # reference from registry
  prompt_template: "Query: {{ input }}"  # optional
  config:
    timeout_ms: 30000
Agent reference (registry):
agent:
  agent_id: classifier
Agent reference (inline):
agent:
  model: gpt-4
  system_prompt: "..."
  temperature: 0.7
prompt_template
string
Optional prompt template with {{ input }} placeholder

Condition Node

type: condition
node
Boolean condition evaluation
- type: condition
  id: check_value
  name: Check Value
  condition:
    condition_type: expression
    expr: "value > 50"
Condition types:
expression
condition_type
Expression-based condition
condition:
  condition_type: expression
  expr: "value > 50"
value
condition_type
Value comparison
condition:
  condition_type: value
  field: status
  operator: eq
  value: "active"

Parallel Node

type: parallel
node
Fan-out marker for parallel execution
- type: parallel
  id: fork
  name: Fork Tasks
  config:
    metadata:
      max_concurrent: 5
Use with parallel edges to define branches.

Join Node

type: join
node
Aggregates results from parallel branches
- type: join
  id: merge
  name: Merge Results
  wait_for:
    - task_a
    - task_b
    - task_c
  config:
    timeout_ms: 60000
wait_for
array<string>
List of node IDs to wait for completion

Loop Node

type: loop
node
Executes body repeatedly while condition is true
- type: loop
  id: iterate
  name: Iterate Until Done
  executor_type: none
  condition:
    condition_type: while
    expr: "count < 10"
  max_iterations: 100
  config:
    timeout_ms: 120000
Loop condition types:
  • while: Continue while expression is true
  • until: Continue until expression is true
  • count: Fixed number of iterations

Transform Node

type: transform
node
Data transformation
- type: transform
  id: format
  name: Format Output
  transform_type: template
  template: |
    Result: {{ input }}
Transform types:
template
transform_type
Jinja-style template
transform_type: template
template: "Formatted: {{ input }}"
expression
transform_type
JavaScript expression
transform_type: expression
expr: "input.toUpperCase()"
map_reduce
transform_type
Map/reduce operation
transform_type: map_reduce
map: "x => x * 2"
reduce: "(acc, x) => acc + x"

SubWorkflow Node

type: sub_workflow
node
Invokes another workflow
- type: sub_workflow
  id: sub_process
  name: Sub-Process
  workflow_id: child_workflow
  config:
    timeout_ms: 300000
workflow_id
string
required
ID of the workflow to invoke

Wait Node

type: wait
node
Waits for external event
- type: wait
  id: wait_approval
  name: Wait for Approval
  event_type: approval_event
  config:
    timeout_ms: 86400000  # 24 hours
event_type
string
required
Type of event to wait for

Edge Definitions

edges
array<EdgeDefinition>
required
List of edges connecting nodes
from
string
required
Source node ID
to
string
required
Target node ID
condition
string
Optional condition for conditional routing
label
string
Optional edge label

Simple Edges

edges:
  - from: start
    to: process
  - from: process
    to: end

Conditional Edges

edges:
  - from: classify
    to: handle_billing
    condition: "category == 'billing'"
    label: billing
  
  - from: classify
    to: handle_technical
    condition: "category == 'technical'"
    label: technical

Parallel Edges

edges:
  # Fork to parallel branches
  - from: fork
    to: task_a
  - from: fork
    to: task_b
  - from: fork
    to: task_c
  
  # Branches to merge
  - from: task_a
    to: merge
  - from: task_b
    to: merge
  - from: task_c
    to: merge

Node Configuration

config
NodeConfigDef
Per-node configuration
retry_policy
RetryPolicy
Retry configuration
retry_policy:
  max_retries: 3
  retry_delay_ms: 1000
  exponential_backoff: true
  max_delay_ms: 30000
timeout_ms
number
Execution timeout in milliseconds
metadata
map<string, string>
Custom metadata key-value pairs
metadata:
  priority: high
  team: backend

WorkflowDslParser API

Parsing Methods

from_yaml
fn(content: &str) -> DslResult<WorkflowDefinition>
Parse workflow from YAML string
use mofa_foundation::workflow::dsl::WorkflowDslParser;

let yaml = r#"
metadata:
  id: example
  name: Example Workflow
nodes:
  - type: start
    id: start
  - type: end
    id: end
edges:
  - from: start
    to: end
"#;

let definition = WorkflowDslParser::from_yaml(yaml)?;
from_toml
fn(content: &str) -> DslResult<WorkflowDefinition>
Parse workflow from TOML string
let toml = r#"
[metadata]
id = "example"
name = "Example Workflow"

[[nodes]]
type = "start"
id = "start"

[[nodes]]
type = "end"
id = "end"

[[edges]]
from = "start"
to = "end"
"#;

let definition = WorkflowDslParser::from_toml(toml)?;
from_file
fn(path: impl AsRef<Path>) -> DslResult<WorkflowDefinition>
Parse workflow from file (auto-detects format by extension)Supports .yaml, .yml, and .toml files.
let definition = WorkflowDslParser::from_file("workflow.yaml")?;
build_with_agents
async fn(definition: WorkflowDefinition, agent_registry: &HashMap<String, Arc<LLMAgent>>) -> DslResult<WorkflowGraph>
Build executable workflow from definitionRequires a registry of pre-built LLMAgent instances.
use std::collections::HashMap;
use std::sync::Arc;
use mofa_sdk::llm::{LLMAgent, LLMAgentBuilder, openai_from_env};

// Build agent registry
let mut agent_registry = HashMap::new();

let provider = Arc::new(openai_from_env()?);
let classifier = Arc::new(
    LLMAgentBuilder::new()
        .with_provider(provider)
        .with_system_prompt("You are a classifier")
        .build()
);

agent_registry.insert("classifier".to_string(), classifier);

// Build workflow
let workflow = WorkflowDslParser::build_with_agents(
    definition,
    &agent_registry
).await?;

Environment Variables

The DSL parser supports environment variable substitution using ${VAR_NAME} syntax.
metadata:
  id: ${WORKFLOW_ID}
  name: ${WORKFLOW_NAME}

agents:
  gpt_agent:
    model: ${MODEL_NAME}
    temperature: ${TEMPERATURE}

nodes:
  - type: llm_agent
    id: process
    name: Process
    agent:
      agent_id: gpt_agent
    prompt_template: "${PROMPT_PREFIX} {{ input }}"

Complete Examples

Customer Support Workflow

metadata:
  id: customer_support
  name: Customer Support Workflow
  description: Routes customer queries to appropriate handlers
  version: "1.0.0"

agents:
  classifier:
    model: gpt-4
    system_prompt: "Classify queries into: billing, technical, general"
    temperature: 0.3
    max_tokens: 50
  
  billing_handler:
    model: gpt-4
    system_prompt: "Help customers with billing inquiries"
    temperature: 0.7
  
  technical_handler:
    model: gpt-4
    system_prompt: "Help customers with technical issues"
    temperature: 0.7

nodes:
  - type: start
    id: start
  
  - type: llm_agent
    id: classify
    name: Classify Query
    agent:
      agent_id: classifier
    config:
      timeout_ms: 30000
  
  - type: llm_agent
    id: handle_billing
    name: Handle Billing
    agent:
      agent_id: billing_handler
    prompt_template: "Customer query: {{ input }}"
  
  - type: llm_agent
    id: handle_technical
    name: Handle Technical
    agent:
      agent_id: technical_handler
    prompt_template: "Customer query: {{ input }}"
  
  - type: end
    id: end

edges:
  - from: start
    to: classify
  
  - from: classify
    to: handle_billing
    condition: "category == 'billing'"
  
  - from: classify
    to: handle_technical
    condition: "category == 'technical'"
  
  - from: handle_billing
    to: end
  
  - from: handle_technical
    to: end

Parallel Analysis Workflow

metadata:
  id: parallel_analysis
  name: Parallel Analysis Workflow
  description: Analyzes content using multiple agents in parallel

agents:
  sentiment_analyzer:
    model: gpt-4
    system_prompt: "Analyze sentiment: positive, negative, or neutral"
    temperature: 0.3
  
  keyword_extractor:
    model: gpt-4
    system_prompt: "Extract top 5 keywords"
    temperature: 0.3
  
  summary_generator:
    model: gpt-4
    system_prompt: "Generate a brief summary"
    temperature: 0.5

nodes:
  - type: start
    id: start
  
  - type: parallel
    id: fork
    name: Fork Analysis
  
  - type: llm_agent
    id: sentiment
    name: Sentiment Analysis
    agent:
      agent_id: sentiment_analyzer
  
  - type: llm_agent
    id: keywords
    name: Keyword Extraction
    agent:
      agent_id: keyword_extractor
  
  - type: llm_agent
    id: summary
    name: Summary Generation
    agent:
      agent_id: summary_generator
  
  - type: join
    id: merge
    name: Merge Results
    wait_for:
      - sentiment
      - keywords
      - summary
  
  - type: transform
    id: format
    name: Format Output
    transform_type: template
    template: |
      Analysis Results:
      - Sentiment: {{ sentiment }}
      - Keywords: {{ keywords }}
      - Summary: {{ summary }}
  
  - type: end
    id: end

edges:
  - from: start
    to: fork
  
  - from: fork
    to: sentiment
  - from: fork
    to: keywords
  - from: fork
    to: summary
  
  - from: sentiment
    to: merge
  - from: keywords
    to: merge
  - from: summary
    to: merge
  
  - from: merge
    to: format
  
  - from: format
    to: end

Usage Example (Rust)

use mofa_sdk::workflow::{WorkflowDslParser, WorkflowExecutor, ExecutorConfig, WorkflowValue};
use mofa_sdk::llm::{LLMAgent, LLMAgentBuilder, openai_from_env};
use std::collections::HashMap;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Parse workflow from file
    let definition = WorkflowDslParser::from_file("customer_support.yaml")?;
    
    // Build agent registry
    let mut agent_registry = HashMap::new();
    let provider = Arc::new(openai_from_env()?);
    
    for (agent_id, config) in &definition.agents {
        let agent = Arc::new(
            LLMAgentBuilder::new()
                .with_id(agent_id)
                .with_provider(provider.clone())
                .with_model(&config.model)
                .with_system_prompt(
                    config.system_prompt.as_deref().unwrap_or("")
                )
                .with_temperature(config.temperature.unwrap_or(0.7))
                .build()
        );
        agent_registry.insert(agent_id.clone(), agent);
    }
    
    // Build workflow
    let workflow = WorkflowDslParser::build_with_agents(
        definition,
        &agent_registry
    ).await?;
    
    // Execute workflow
    let executor = WorkflowExecutor::new(ExecutorConfig::default());
    let input = WorkflowValue::String(
        "I was charged twice for my subscription".to_string()
    );
    
    let result = executor.execute(&workflow, input).await?;
    println!("Result: {:?}", result);
    
    Ok(())
}

Error Handling

DslError
enum
Errors that can occur during DSL parsing
Io
variant
IO error reading file
YamlParse
variant
YAML parsing error
TomlParse
variant
TOML parsing error
Validation
variant
Workflow validation error (missing start/end, invalid references, etc.)
AgentNotFound
variant
Referenced agent not found in registry
InvalidNodeType
variant
Invalid node type specified
InvalidEdge
variant
Invalid edge reference
Build
variant
Error building workflow from definition

See Also

Build docs developers (and LLMs) love