StateGraph API
The StateGraph API provides a LangGraph-inspired interface for building stateful workflow graphs in MoFA. It enables you to define complex workflows with nodes, edges, and state management.
Overview
StateGraph is the core API for building workflow graphs. It provides:
- Declarative graph building with a fluent API
- State management with customizable reducers
- Multiple edge types: single, conditional, and parallel
- Execution modes: invoke (synchronous), stream (event-based), and step (interactive)
- Fault tolerance with retry policies and circuit breakers
Core Concepts
Special Node IDs
Special node ID for the graph entry point ("__START__")
Special node ID for the graph exit point ("__END__")
StateGraph Trait
The main trait for building workflow graphs.
Constructor
new
fn(id: impl Into<String>) -> Self
Create a new StateGraph with the given IDuse mofa_kernel::workflow::{StateGraph, START, END};
use mofa_foundation::workflow::StateGraphImpl;
let mut graph = StateGraphImpl::<MyState>::new("my_workflow");
Node Management
add_node
fn(&mut self, id: impl Into<String>, node: Box<dyn NodeFunc<S>>) -> &mut Self
Add a node to the graphParameters:
id: Unique node identifier
node: Node function implementation
Returns: &mut Self for method chaininggraph.add_node("process", Box::new(ProcessNode));
Edge Management
add_edge
fn(&mut self, from: impl Into<String>, to: impl Into<String>) -> &mut Self
Add a single edge between two nodesParameters:
from: Source node ID (use START for entry edge)
to: Target node ID (use END for exit edge)
graph
.add_edge(START, "process")
.add_edge("process", "validate")
.add_edge("validate", END);
add_conditional_edges
fn(&mut self, from: impl Into<String>, conditions: HashMap<String, String>) -> &mut Self
Add conditional edges from a node based on routing decisionsParameters:
from: Source node ID
conditions: Map of route names to target node IDs
use std::collections::HashMap;
graph.add_conditional_edges(
"classify",
HashMap::from([
("billing".to_string(), "handle_billing".to_string()),
("technical".to_string(), "handle_technical".to_string()),
])
);
add_parallel_edges
fn(&mut self, from: impl Into<String>, targets: Vec<String>) -> &mut Self
Add parallel edges to execute multiple nodes concurrentlyParameters:
from: Source node ID
targets: List of target node IDs to execute in parallel
graph.add_parallel_edges(
"fan_out",
vec!["task_a".to_string(), "task_b".to_string(), "task_c".to_string()]
);
set_entry_point
fn(&mut self, node: impl Into<String>) -> &mut Self
Set the graph entry point (equivalent to add_edge(START, node))graph.set_entry_point("start_node");
set_finish_point
fn(&mut self, node: impl Into<String>) -> &mut Self
Set a finish point (equivalent to add_edge(node, END))graph.set_finish_point("final_node");
State Reducers
add_reducer
fn(&mut self, key: impl Into<String>, reducer: Box<dyn Reducer>) -> &mut Self
Add a reducer for a state key to control how updates are mergedParameters:
key: State key name
reducer: Reducer implementation (see Reducers)
use mofa_foundation::workflow::AppendReducer;
graph.add_reducer("messages", Box::new(AppendReducer));
Configuration
with_config
fn(&mut self, config: GraphConfig) -> &mut Self
Set graph configuration including parallelism limits and recursion depthuse mofa_kernel::workflow::GraphConfig;
graph.with_config(GraphConfig {
max_parallelism: 10,
recursion_limit: 100,
});
with_policy
fn(&mut self, node_id: impl Into<String>, policy: NodePolicy) -> &mut Self
Attach fault-tolerance policy to a specific nodeuse mofa_foundation::workflow::NodePolicy;
graph.with_policy(
"api_call",
NodePolicy {
max_retries: 3,
retry_delay_ms: 1000,
circuit_breaker_threshold: 5,
fallback_node: Some("fallback_handler".to_string()),
}
);
Compilation
compile
fn(self) -> AgentResult<CompiledGraph<S>>
Compile the graph into an executable formThis validates the graph structure and prepares it for execution.Validation checks:
- Entry point is set
- All nodes are reachable from entry point
- All edge references point to valid nodes
- Fallback nodes in policies exist
let compiled = graph.compile()?;
CompiledGraph Trait
A compiled graph ready for execution.
Execution Methods
invoke
async fn(&self, input: S, config: Option<RuntimeContext>) -> AgentResult<S>
Execute the graph synchronously and return the final stateParameters:
input: Initial state
config: Optional runtime configuration
Returns: Final state after execution completeslet initial_state = JsonState::new();
let final_state = compiled.invoke(initial_state, None).await?;
stream
fn(&self, input: S, config: Option<RuntimeContext>) -> GraphStream<S>
Execute the graph with streaming outputReturns a stream of events as each node executes.Stream Events:
NodeStart { node_id, state }: Node started
NodeEnd { node_id, state, command }: Node completed
NodeRetry { node_id, attempt, error }: Node retrying
NodeFallback { from_node, to_node, reason }: Falling back
CircuitOpen { node_id }: Circuit breaker opened
End { final_state }: Execution completed
Error { node_id, error }: Error occurred
use futures::StreamExt;
let mut stream = compiled.stream(initial_state, None);
while let Some(event) = stream.next().await {
match event? {
StreamEvent::NodeStart { node_id, .. } => {
println!("Started: {}", node_id);
}
StreamEvent::NodeEnd { node_id, .. } => {
println!("Completed: {}", node_id);
}
StreamEvent::End { final_state } => {
println!("Done: {:?}", final_state);
}
_ => {}
}
}
step
async fn(&self, input: S, config: Option<RuntimeContext>) -> AgentResult<StepResult<S>>
Execute a single step of the graphUseful for debugging or interactive execution.Returns: StepResult containing:
state: Current state after the step
node_id: Which node was executed
command: Command returned by the node
is_complete: Whether execution is complete
next_node: Next node to execute (if any)
let mut state = initial_state;
let mut ctx = RuntimeContext::default();
loop {
let result = compiled.step(state, Some(ctx.clone())).await?;
println!("Executed: {}", result.node_id);
if result.is_complete {
state = result.state;
break;
}
state = result.state;
if let Some(next) = result.next_node {
ctx.set_current_node(&next).await;
}
}
Validation
validate_state
fn(&self, state: &S) -> AgentResult<()>
Validate that a state is valid for this graphcompiled.validate_state(&state)?;
state_schema
fn(&self) -> HashMap<String, String>
Get the graph’s state schema showing reducer types for each keylet schema = compiled.state_schema();
// { "messages": "append", "result": "overwrite" }
GraphState Trait
Trait for types that can be used as workflow state.
get_value
fn(&self, key: &str) -> Option<Value>
Get a value from the state by keylet messages = state.get_value("messages");
apply_update
async fn(&mut self, key: &str, value: Value) -> AgentResult<()>
Apply an update to the statestate.apply_update("result", json!("success")).await?;
EdgeTarget Types
Single target nodeEdgeTarget::Single("next_node".to_string())
Conditional edges with route names to node IDsEdgeTarget::Conditional(HashMap::from([
("approve".to_string(), "approved".to_string()),
("reject".to_string(), "rejected".to_string()),
]))
Multiple parallel target nodesEdgeTarget::Parallel(vec![
"task_a".to_string(),
"task_b".to_string(),
])
Complete Example
use mofa_kernel::workflow::{StateGraph, JsonState, START, END};
use mofa_foundation::workflow::{StateGraphImpl, AppendReducer, OverwriteReducer};
use mofa_kernel::workflow::{NodeFunc, Command, RuntimeContext};
use mofa_kernel::agent::error::AgentResult;
use async_trait::async_trait;
use serde_json::json;
// Define a custom node
struct ProcessNode;
#[async_trait]
impl NodeFunc<JsonState> for ProcessNode {
async fn call(&self, state: &mut JsonState, _ctx: &RuntimeContext) -> AgentResult<Command> {
// Process state
let input = state.get_value("input").unwrap_or(json!(null));
// Return command with updates
Ok(Command::new()
.update("messages", json!("Processed input"))
.update("result", json!(format!("Done: {:?}", input)))
.continue_())
}
fn name(&self) -> &str {
"process"
}
}
#[tokio::main]
async fn main() -> AgentResult<()> {
// Build graph
let mut graph = StateGraphImpl::<JsonState>::new("example")
// Configure reducers
.add_reducer("messages", Box::new(AppendReducer))
.add_reducer("result", Box::new(OverwriteReducer))
// Add nodes
.add_node("process", Box::new(ProcessNode))
// Add edges
.add_edge(START, "process")
.add_edge("process", END)
// Compile
.compile()?;
// Execute
let mut initial_state = JsonState::new();
initial_state.apply_update("input", json!("Hello")).await?;
let final_state = graph.invoke(initial_state, None).await?;
println!("Result: {:?}", final_state.get_value("result"));
Ok(())
}
Conditional Routing Example
use std::collections::HashMap;
// Node that returns routing decision
struct ClassifierNode;
#[async_trait]
impl NodeFunc<JsonState> for ClassifierNode {
async fn call(&self, state: &mut JsonState, _ctx: &RuntimeContext) -> AgentResult<Command> {
let input = state.get_value("query").unwrap_or(json!(null));
let category = classify(input); // Your classification logic
Ok(Command::new()
.route(&category) // Explicit routing
.update("category", json!(category))
.continue_())
}
fn name(&self) -> &str { "classifier" }
}
// Build graph with conditional edges
let mut graph = StateGraphImpl::<JsonState>::new("router")
.add_node("classify", Box::new(ClassifierNode))
.add_node("handle_billing", Box::new(BillingNode))
.add_node("handle_technical", Box::new(TechnicalNode))
.add_edge(START, "classify")
.add_conditional_edges(
"classify",
HashMap::from([
("billing".to_string(), "handle_billing".to_string()),
("technical".to_string(), "handle_technical".to_string()),
])
)
.add_edge("handle_billing", END)
.add_edge("handle_technical", END)
.compile()?;
Parallel Execution Example
// Build graph with parallel execution
let mut graph = StateGraphImpl::<JsonState>::new("parallel")
.add_node("fan_out", Box::new(FanOutNode))
.add_node("task_a", Box::new(TaskA))
.add_node("task_b", Box::new(TaskB))
.add_node("task_c", Box::new(TaskC))
.add_node("merge", Box::new(MergeNode))
.add_edge(START, "fan_out")
.add_parallel_edges(
"fan_out",
vec!["task_a".to_string(), "task_b".to_string(), "task_c".to_string()]
)
.add_edge("task_a", "merge")
.add_edge("task_b", "merge")
.add_edge("task_c", "merge")
.add_edge("merge", END)
.compile()?;
See Also