Skip to main content

Workflow Orchestration with StateGraph

MoFA provides a powerful workflow orchestration system inspired by LangGraph, enabling you to build complex, stateful workflows with conditional routing, parallel execution, and fault tolerance.

Core Concepts

StateGraph Architecture

The StateGraph implementation provides:
  • Stateful execution: Maintain state across workflow steps
  • Conditional routing: Dynamic path selection based on state
  • Parallel execution: Fan-out/fan-in patterns for concurrent processing
  • Fault tolerance: Built-in retry, circuit breakers, and fallback nodes
  • Streaming support: Real-time updates via stream() API
use mofa_foundation::workflow::{StateGraphImpl, AppendReducer, OverwriteReducer};
use mofa_kernel::workflow::{StateGraph, START, END};

let graph = StateGraphImpl::<MyState>::build("my_workflow")
    .add_reducer("messages", Box::new(AppendReducer))
    .add_node("process", Box::new(ProcessNode))
    .add_edge(START, "process")
    .add_edge("process", END)
    .compile()?;

Building Workflows

Linear Workflow

A simple sequential workflow for data processing:
let graph = WorkflowBuilder::new("linear_workflow", "Data Processing")
    .description("Extract -> Transform -> Load pipeline")
    .start()
    .task("fetch_data", "Fetch Data", |_ctx, input| async move {
        let data = format!("Data from: {}", input.as_str().unwrap_or("default"));
        Ok(WorkflowValue::String(data))
    })
    .task("process", "Process Data", |_ctx, input| async move {
        let processed = format!("Processed - {}", input.as_str().unwrap_or(""));
        Ok(WorkflowValue::String(processed))
    })
    .task("save", "Save Result", |_ctx, input| async move {
        println!("Saving: {:?}", input);
        Ok(WorkflowValue::String("Save successful".to_string()))
    })
    .end()
    .build();

Conditional Routing

Dynamic path selection based on workflow state:
let mut graph = WorkflowGraph::new("conditional_workflow", "Conditional Branching");

graph.add_node(WorkflowNode::start("start"));
graph.add_node(WorkflowNode::condition(
    "check_value", 
    "Evaluate Threshold",
    |_ctx, input| async move {
        let value = input.as_i64().unwrap_or(0);
        value > 50  // Returns true/false for routing
    }
));

graph.add_node(WorkflowNode::task(
    "high_path", 
    "High Value Handler",
    |_ctx, input| async move {
        Ok(WorkflowValue::String(format!("High: {}", input.as_i64().unwrap_or(0))))
    }
));

graph.add_node(WorkflowNode::task(
    "low_path",
    "Low Value Handler", 
    |_ctx, input| async move {
        Ok(WorkflowValue::String(format!("Low: {}", input.as_i64().unwrap_or(0))))
    }
));

graph.add_node(WorkflowNode::end("end"));

// Connect nodes with conditional edges
graph.connect("start", "check_value");
graph.connect_conditional("check_value", "high_path", "true");
graph.connect_conditional("check_value", "low_path", "false");
graph.connect("high_path", "end");
graph.connect("low_path", "end");

Parallel Execution

Fan-out/fan-in pattern for concurrent task processing:
let graph = WorkflowBuilder::new("parallel_workflow", "Parallel Processing")
    .start()
    .parallel("fork", "Dispatch Tasks")
    .branch("task_a", "Task A", |_ctx, _input| async move {
        tokio::time::sleep(Duration::from_millis(100)).await;
        Ok(WorkflowValue::String("Result A".to_string()))
    })
    .branch("task_b", "Task B", |_ctx, _input| async move {
        tokio::time::sleep(Duration::from_millis(50)).await;
        Ok(WorkflowValue::String("Result B".to_string()))
    })
    .branch("task_c", "Task C", |_ctx, _input| async move {
        tokio::time::sleep(Duration::from_millis(75)).await;
        Ok(WorkflowValue::String("Result C".to_string()))
    })
    .join_with_transform("join", "Aggregate", |results| async move {
        let combined: Vec<String> = results.values()
            .filter_map(|v| v.as_str().map(|s| s.to_string()))
            .collect();
        WorkflowValue::String(format!("Aggregated: {:?}", combined))
    })
    .end()
    .build();
Parallel nodes run without per-node retry/circuit-breaker protection for maximum throughput. Each node executes against an isolated state snapshot. If retry is needed, place a single-node step before/after the parallel fan-out.

State Management

Reducers

Reducers control how state updates are merged:
use mofa_foundation::workflow::{AppendReducer, OverwriteReducer};

let graph = StateGraphImpl::<JsonState>::build("my_workflow")
    // Append new messages to the list
    .add_reducer("messages", Box::new(AppendReducer))
    // Overwrite the current status
    .add_reducer("status", Box::new(OverwriteReducer))
    .add_node("process", Box::new(node))
    .compile()?;

Context Variables

Pass metadata between workflow steps:
.task("extract", "Extract Data", |ctx, _input| async move {
    let raw_data = vec![1, 2, 3, 4, 5];
    ctx.set_variable("record_count", WorkflowValue::Int(raw_data.len() as i64)).await;
    Ok(WorkflowValue::List(raw_data.into_iter().map(WorkflowValue::Int).collect()))
})
.task("load", "Load Data", |ctx, input| async move {
    let original_count = ctx.get_variable("record_count").await
        .and_then(|v| v.as_i64())
        .unwrap_or(0);
    println!("Processed {} records", original_count);
    Ok(input)
})

Fault Tolerance

Node Policies

Configure retry and circuit breaker policies per node:
use mofa_foundation::workflow::fault_tolerance::NodePolicy;

let policy = NodePolicy {
    max_retries: 3,
    retry_delay_ms: 1000,
    circuit_breaker_threshold: 5,
    circuit_breaker_timeout_ms: 30000,
    fallback_node: Some("error_handler".to_string()),
};

let graph = StateGraphImpl::<MyState>::build("fault_tolerant")
    .add_node("api_call", Box::new(ApiNode))
    .with_policy("api_call", policy)
    .compile()?;

Fallback Nodes

Route to alternative nodes on failure:
let mut graph = StateGraphImpl::build("with_fallback")
    .add_node("primary", Box::new(PrimaryNode))
    .add_node("fallback", Box::new(FallbackNode))
    .with_policy("primary", NodePolicy {
        max_retries: 2,
        fallback_node: Some("fallback".to_string()),
        ..Default::default()
    })
    .compile()?;

LLM/Agent Integration

ReAct Agent Workflow

Integrate ReAct agents for reasoning within workflows:
let react_agent = create_react_agent(
    "decision-agent",
    "Professional decision-making assistant"
).await?;

let mut graph = WorkflowGraph::new("react_workflow", "ReAct Decision");

graph.add_node(WorkflowNode::task("react_agent", "Reasoning", {
    let agent = Arc::clone(&react_agent);
    move |_ctx, input| {
        let agent = Arc::clone(&agent);
        async move {
            let task = input.as_str().unwrap_or("");
            match agent.run(task).await {
                Ok(result) => Ok(WorkflowValue::String(result.answer)),
                Err(e) => Err(format!("Reasoning failed: {}", e)),
            }
        }
    }
}));

Multi-Agent Parallel Analysis

Run multiple expert agents in parallel:
let technical_agent = create_llm_agent("technical-expert", TECHNICAL_PROMPT);
let business_agent = create_llm_agent("business-expert", BUSINESS_PROMPT);
let synthesis_agent = create_llm_agent("synthesis", SYNTHESIS_PROMPT);

let graph = WorkflowBuilder::new("multi_agent", "Parallel Analysis")
    .start()
    .parallel("fork", "Distribute")
    .llm_agent_branch("technical", "Technical Analysis", technical_agent)
    .llm_agent_branch("business", "Business Analysis", business_agent)
    .join_with_transform("join", "Aggregate", |results| async move {
        WorkflowValue::Map(results)
    })
    .llm_agent_with_template(
        "synthesis",
        "Final Recommendation",
        synthesis_agent,
        "Synthesize: {{technical}} and {{business}}".to_string()
    )
    .end()
    .build();

Execution & Streaming

Invoke (Blocking)

Execute and wait for completion:
let executor = WorkflowExecutor::new(ExecutorConfig::default());
let result = executor.execute(&graph, WorkflowValue::Null).await?;

match result.status {
    WorkflowStatus::Completed => println!("Success!"),
    WorkflowStatus::Failed(err) => eprintln!("Failed: {}", err),
    _ => {},
}

Stream (Non-blocking)

Receive real-time updates:
use futures::StreamExt;

let compiled = graph.compile()?;
let mut stream = compiled.stream(initial_state, None);

while let Some(event) = stream.next().await {
    match event {
        Ok(StreamEvent::NodeStart { node_id, .. }) => {
            println!("Starting: {}", node_id);
        }
        Ok(StreamEvent::NodeEnd { node_id, state, .. }) => {
            println!("Completed: {} - {:?}", node_id, state);
        }
        Ok(StreamEvent::End { final_state }) => {
            println!("Workflow complete: {:?}", final_state);
            break;
        }
        Err(e) => {
            eprintln!("Error: {}", e);
            break;
        }
        _ => {}
    }
}

Event Listening

Monitor workflow execution:
let (event_tx, mut event_rx) = mpsc::channel::<ExecutionEvent>(100);

let executor = WorkflowExecutor::new(ExecutorConfig {
    enable_checkpoints: true,
    checkpoint_interval: 2,
    ..Default::default()
})
.with_event_sender(event_tx);

tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        match event {
            ExecutionEvent::NodeStarted { node_id } => {
                println!("[EVENT] Node started: {}", node_id);
            }
            ExecutionEvent::NodeCompleted { node_id, result } => {
                println!("[EVENT] Node completed: {}", node_id);
            }
            ExecutionEvent::CheckpointCreated { label } => {
                println!("[EVENT] Checkpoint: {}", label);
            }
            _ => {}
        }
    }
});

Advanced Patterns

ETL Pipeline

let graph = WorkflowBuilder::new("etl", "ETL Pipeline")
    .start()
    .task("extract", "Extract", |ctx, _| async move {
        let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        ctx.set_variable("record_count", WorkflowValue::Int(data.len() as i64)).await;
        Ok(WorkflowValue::List(data.into_iter().map(WorkflowValue::Int).collect()))
    })
    .task("transform", "Transform", |ctx, input| async move {
        if let Some(list) = input.as_list() {
            let transformed: Vec<WorkflowValue> = list.iter()
                .filter_map(|v| v.as_i64())
                .filter(|&n| n % 2 == 0)  // Keep evens
                .map(|n| WorkflowValue::Int(n * 10))
                .collect();
            ctx.set_variable("transformed_count", 
                WorkflowValue::Int(transformed.len() as i64)).await;
            Ok(WorkflowValue::List(transformed))
        } else {
            Err("Invalid input".to_string())
        }
    })
    .task("load", "Load", |ctx, input| async move {
        let original = ctx.get_variable("record_count").await
            .and_then(|v| v.as_i64()).unwrap_or(0);
        let transformed = ctx.get_variable("transformed_count").await
            .and_then(|v| v.as_i64()).unwrap_or(0);
        println!("ETL: {} -> {} records", original, transformed);
        Ok(input)
    })
    .end()
    .build();

Loop Workflows

let graph = WorkflowBuilder::new("loop", "Iterative Processing")
    .start()
    .loop_node(
        "process_batch",
        "Process Batch",
        |ctx, input| async move {
            // Process one batch
            Ok(input)
        },
        |ctx, input| async move {
            // Continue condition
            let iteration = ctx.get_variable("iteration").await
                .and_then(|v| v.as_i64()).unwrap_or(0);
            iteration < 10
        },
        10  // Max iterations
    )
    .end()
    .build();

Configuration

Executor Config

let config = ExecutorConfig {
    max_parallelism: 10,           // Max concurrent branches
    stop_on_failure: true,         // Stop on first error
    enable_checkpoints: true,      // Save state snapshots
    checkpoint_interval: 5,        // Checkpoint every N nodes
    execution_timeout_ms: Some(30000), // 30s timeout
};

let executor = WorkflowExecutor::new(config);

Graph Config

let graph = StateGraphImpl::build("my_workflow")
    .with_config(GraphConfig {
        max_parallelism: 5,
        recursion_limit: 100,
        ..Default::default()
    })
    .compile()?;

Best Practices

  1. Use reducers wisely: Choose appropriate reducers for your state merge strategy
  2. Handle errors gracefully: Implement fallback nodes for critical paths
  3. Limit parallelism: Set max_parallelism to prevent resource exhaustion
  4. Use checkpoints: Enable checkpoints for long-running workflows
  5. Monitor execution: Use event listeners or streaming API for observability
  6. Test edge cases: Verify conditional routing with various inputs
  7. Isolate LLM calls: Wrap LLM nodes with retry policies

See Also

Build docs developers (and LLMs) love