Workflow Orchestration with StateGraph
MoFA provides a powerful workflow orchestration system inspired by LangGraph, enabling you to build complex, stateful workflows with conditional routing, parallel execution, and fault tolerance.
Core Concepts
StateGraph Architecture
The StateGraph implementation provides:
- Stateful execution: Maintain state across workflow steps
- Conditional routing: Dynamic path selection based on state
- Parallel execution: Fan-out/fan-in patterns for concurrent processing
- Fault tolerance: Built-in retry, circuit breakers, and fallback nodes
- Streaming support: Real-time updates via
stream() API
use mofa_foundation::workflow::{StateGraphImpl, AppendReducer, OverwriteReducer};
use mofa_kernel::workflow::{StateGraph, START, END};
let graph = StateGraphImpl::<MyState>::build("my_workflow")
.add_reducer("messages", Box::new(AppendReducer))
.add_node("process", Box::new(ProcessNode))
.add_edge(START, "process")
.add_edge("process", END)
.compile()?;
Building Workflows
Linear Workflow
A simple sequential workflow for data processing:
let graph = WorkflowBuilder::new("linear_workflow", "Data Processing")
.description("Extract -> Transform -> Load pipeline")
.start()
.task("fetch_data", "Fetch Data", |_ctx, input| async move {
let data = format!("Data from: {}", input.as_str().unwrap_or("default"));
Ok(WorkflowValue::String(data))
})
.task("process", "Process Data", |_ctx, input| async move {
let processed = format!("Processed - {}", input.as_str().unwrap_or(""));
Ok(WorkflowValue::String(processed))
})
.task("save", "Save Result", |_ctx, input| async move {
println!("Saving: {:?}", input);
Ok(WorkflowValue::String("Save successful".to_string()))
})
.end()
.build();
Conditional Routing
Dynamic path selection based on workflow state:
let mut graph = WorkflowGraph::new("conditional_workflow", "Conditional Branching");
graph.add_node(WorkflowNode::start("start"));
graph.add_node(WorkflowNode::condition(
"check_value",
"Evaluate Threshold",
|_ctx, input| async move {
let value = input.as_i64().unwrap_or(0);
value > 50 // Returns true/false for routing
}
));
graph.add_node(WorkflowNode::task(
"high_path",
"High Value Handler",
|_ctx, input| async move {
Ok(WorkflowValue::String(format!("High: {}", input.as_i64().unwrap_or(0))))
}
));
graph.add_node(WorkflowNode::task(
"low_path",
"Low Value Handler",
|_ctx, input| async move {
Ok(WorkflowValue::String(format!("Low: {}", input.as_i64().unwrap_or(0))))
}
));
graph.add_node(WorkflowNode::end("end"));
// Connect nodes with conditional edges
graph.connect("start", "check_value");
graph.connect_conditional("check_value", "high_path", "true");
graph.connect_conditional("check_value", "low_path", "false");
graph.connect("high_path", "end");
graph.connect("low_path", "end");
Parallel Execution
Fan-out/fan-in pattern for concurrent task processing:
let graph = WorkflowBuilder::new("parallel_workflow", "Parallel Processing")
.start()
.parallel("fork", "Dispatch Tasks")
.branch("task_a", "Task A", |_ctx, _input| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(WorkflowValue::String("Result A".to_string()))
})
.branch("task_b", "Task B", |_ctx, _input| async move {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(WorkflowValue::String("Result B".to_string()))
})
.branch("task_c", "Task C", |_ctx, _input| async move {
tokio::time::sleep(Duration::from_millis(75)).await;
Ok(WorkflowValue::String("Result C".to_string()))
})
.join_with_transform("join", "Aggregate", |results| async move {
let combined: Vec<String> = results.values()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
WorkflowValue::String(format!("Aggregated: {:?}", combined))
})
.end()
.build();
Parallel nodes run without per-node retry/circuit-breaker protection for maximum throughput. Each node executes against an isolated state snapshot. If retry is needed, place a single-node step before/after the parallel fan-out.
State Management
Reducers
Reducers control how state updates are merged:
use mofa_foundation::workflow::{AppendReducer, OverwriteReducer};
let graph = StateGraphImpl::<JsonState>::build("my_workflow")
// Append new messages to the list
.add_reducer("messages", Box::new(AppendReducer))
// Overwrite the current status
.add_reducer("status", Box::new(OverwriteReducer))
.add_node("process", Box::new(node))
.compile()?;
Context Variables
Pass metadata between workflow steps:
.task("extract", "Extract Data", |ctx, _input| async move {
let raw_data = vec![1, 2, 3, 4, 5];
ctx.set_variable("record_count", WorkflowValue::Int(raw_data.len() as i64)).await;
Ok(WorkflowValue::List(raw_data.into_iter().map(WorkflowValue::Int).collect()))
})
.task("load", "Load Data", |ctx, input| async move {
let original_count = ctx.get_variable("record_count").await
.and_then(|v| v.as_i64())
.unwrap_or(0);
println!("Processed {} records", original_count);
Ok(input)
})
Fault Tolerance
Node Policies
Configure retry and circuit breaker policies per node:
use mofa_foundation::workflow::fault_tolerance::NodePolicy;
let policy = NodePolicy {
max_retries: 3,
retry_delay_ms: 1000,
circuit_breaker_threshold: 5,
circuit_breaker_timeout_ms: 30000,
fallback_node: Some("error_handler".to_string()),
};
let graph = StateGraphImpl::<MyState>::build("fault_tolerant")
.add_node("api_call", Box::new(ApiNode))
.with_policy("api_call", policy)
.compile()?;
Fallback Nodes
Route to alternative nodes on failure:
let mut graph = StateGraphImpl::build("with_fallback")
.add_node("primary", Box::new(PrimaryNode))
.add_node("fallback", Box::new(FallbackNode))
.with_policy("primary", NodePolicy {
max_retries: 2,
fallback_node: Some("fallback".to_string()),
..Default::default()
})
.compile()?;
LLM/Agent Integration
ReAct Agent Workflow
Integrate ReAct agents for reasoning within workflows:
let react_agent = create_react_agent(
"decision-agent",
"Professional decision-making assistant"
).await?;
let mut graph = WorkflowGraph::new("react_workflow", "ReAct Decision");
graph.add_node(WorkflowNode::task("react_agent", "Reasoning", {
let agent = Arc::clone(&react_agent);
move |_ctx, input| {
let agent = Arc::clone(&agent);
async move {
let task = input.as_str().unwrap_or("");
match agent.run(task).await {
Ok(result) => Ok(WorkflowValue::String(result.answer)),
Err(e) => Err(format!("Reasoning failed: {}", e)),
}
}
}
}));
Multi-Agent Parallel Analysis
Run multiple expert agents in parallel:
let technical_agent = create_llm_agent("technical-expert", TECHNICAL_PROMPT);
let business_agent = create_llm_agent("business-expert", BUSINESS_PROMPT);
let synthesis_agent = create_llm_agent("synthesis", SYNTHESIS_PROMPT);
let graph = WorkflowBuilder::new("multi_agent", "Parallel Analysis")
.start()
.parallel("fork", "Distribute")
.llm_agent_branch("technical", "Technical Analysis", technical_agent)
.llm_agent_branch("business", "Business Analysis", business_agent)
.join_with_transform("join", "Aggregate", |results| async move {
WorkflowValue::Map(results)
})
.llm_agent_with_template(
"synthesis",
"Final Recommendation",
synthesis_agent,
"Synthesize: {{technical}} and {{business}}".to_string()
)
.end()
.build();
Execution & Streaming
Invoke (Blocking)
Execute and wait for completion:
let executor = WorkflowExecutor::new(ExecutorConfig::default());
let result = executor.execute(&graph, WorkflowValue::Null).await?;
match result.status {
WorkflowStatus::Completed => println!("Success!"),
WorkflowStatus::Failed(err) => eprintln!("Failed: {}", err),
_ => {},
}
Stream (Non-blocking)
Receive real-time updates:
use futures::StreamExt;
let compiled = graph.compile()?;
let mut stream = compiled.stream(initial_state, None);
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::NodeStart { node_id, .. }) => {
println!("Starting: {}", node_id);
}
Ok(StreamEvent::NodeEnd { node_id, state, .. }) => {
println!("Completed: {} - {:?}", node_id, state);
}
Ok(StreamEvent::End { final_state }) => {
println!("Workflow complete: {:?}", final_state);
break;
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
_ => {}
}
}
Event Listening
Monitor workflow execution:
let (event_tx, mut event_rx) = mpsc::channel::<ExecutionEvent>(100);
let executor = WorkflowExecutor::new(ExecutorConfig {
enable_checkpoints: true,
checkpoint_interval: 2,
..Default::default()
})
.with_event_sender(event_tx);
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match event {
ExecutionEvent::NodeStarted { node_id } => {
println!("[EVENT] Node started: {}", node_id);
}
ExecutionEvent::NodeCompleted { node_id, result } => {
println!("[EVENT] Node completed: {}", node_id);
}
ExecutionEvent::CheckpointCreated { label } => {
println!("[EVENT] Checkpoint: {}", label);
}
_ => {}
}
}
});
Advanced Patterns
ETL Pipeline
let graph = WorkflowBuilder::new("etl", "ETL Pipeline")
.start()
.task("extract", "Extract", |ctx, _| async move {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
ctx.set_variable("record_count", WorkflowValue::Int(data.len() as i64)).await;
Ok(WorkflowValue::List(data.into_iter().map(WorkflowValue::Int).collect()))
})
.task("transform", "Transform", |ctx, input| async move {
if let Some(list) = input.as_list() {
let transformed: Vec<WorkflowValue> = list.iter()
.filter_map(|v| v.as_i64())
.filter(|&n| n % 2 == 0) // Keep evens
.map(|n| WorkflowValue::Int(n * 10))
.collect();
ctx.set_variable("transformed_count",
WorkflowValue::Int(transformed.len() as i64)).await;
Ok(WorkflowValue::List(transformed))
} else {
Err("Invalid input".to_string())
}
})
.task("load", "Load", |ctx, input| async move {
let original = ctx.get_variable("record_count").await
.and_then(|v| v.as_i64()).unwrap_or(0);
let transformed = ctx.get_variable("transformed_count").await
.and_then(|v| v.as_i64()).unwrap_or(0);
println!("ETL: {} -> {} records", original, transformed);
Ok(input)
})
.end()
.build();
Loop Workflows
let graph = WorkflowBuilder::new("loop", "Iterative Processing")
.start()
.loop_node(
"process_batch",
"Process Batch",
|ctx, input| async move {
// Process one batch
Ok(input)
},
|ctx, input| async move {
// Continue condition
let iteration = ctx.get_variable("iteration").await
.and_then(|v| v.as_i64()).unwrap_or(0);
iteration < 10
},
10 // Max iterations
)
.end()
.build();
Configuration
Executor Config
let config = ExecutorConfig {
max_parallelism: 10, // Max concurrent branches
stop_on_failure: true, // Stop on first error
enable_checkpoints: true, // Save state snapshots
checkpoint_interval: 5, // Checkpoint every N nodes
execution_timeout_ms: Some(30000), // 30s timeout
};
let executor = WorkflowExecutor::new(config);
Graph Config
let graph = StateGraphImpl::build("my_workflow")
.with_config(GraphConfig {
max_parallelism: 5,
recursion_limit: 100,
..Default::default()
})
.compile()?;
Best Practices
- Use reducers wisely: Choose appropriate reducers for your state merge strategy
- Handle errors gracefully: Implement fallback nodes for critical paths
- Limit parallelism: Set
max_parallelism to prevent resource exhaustion
- Use checkpoints: Enable checkpoints for long-running workflows
- Monitor execution: Use event listeners or streaming API for observability
- Test edge cases: Verify conditional routing with various inputs
- Isolate LLM calls: Wrap LLM nodes with retry policies
See Also