Skip to main content

StateGraph API

The StateGraph API provides a LangGraph-inspired interface for building stateful workflow graphs in MoFA. It enables you to define complex workflows with nodes, edges, and state management.

Overview

StateGraph is the core API for building workflow graphs. It provides:
  • Declarative graph building with a fluent API
  • State management with customizable reducers
  • Multiple edge types: single, conditional, and parallel
  • Execution modes: invoke (synchronous), stream (event-based), and step (interactive)
  • Fault tolerance with retry policies and circuit breakers

Core Concepts

Special Node IDs

START
const
Special node ID for the graph entry point ("__START__")
END
const
Special node ID for the graph exit point ("__END__")

StateGraph Trait

The main trait for building workflow graphs.

Constructor

new
fn(id: impl Into<String>) -> Self
Create a new StateGraph with the given ID
use mofa_kernel::workflow::{StateGraph, START, END};
use mofa_foundation::workflow::StateGraphImpl;

let mut graph = StateGraphImpl::<MyState>::new("my_workflow");

Node Management

add_node
fn(&mut self, id: impl Into<String>, node: Box<dyn NodeFunc<S>>) -> &mut Self
Add a node to the graphParameters:
  • id: Unique node identifier
  • node: Node function implementation
Returns: &mut Self for method chaining
graph.add_node("process", Box::new(ProcessNode));

Edge Management

add_edge
fn(&mut self, from: impl Into<String>, to: impl Into<String>) -> &mut Self
Add a single edge between two nodesParameters:
  • from: Source node ID (use START for entry edge)
  • to: Target node ID (use END for exit edge)
graph
    .add_edge(START, "process")
    .add_edge("process", "validate")
    .add_edge("validate", END);
add_conditional_edges
fn(&mut self, from: impl Into<String>, conditions: HashMap<String, String>) -> &mut Self
Add conditional edges from a node based on routing decisionsParameters:
  • from: Source node ID
  • conditions: Map of route names to target node IDs
use std::collections::HashMap;

graph.add_conditional_edges(
    "classify",
    HashMap::from([
        ("billing".to_string(), "handle_billing".to_string()),
        ("technical".to_string(), "handle_technical".to_string()),
    ])
);
add_parallel_edges
fn(&mut self, from: impl Into<String>, targets: Vec<String>) -> &mut Self
Add parallel edges to execute multiple nodes concurrentlyParameters:
  • from: Source node ID
  • targets: List of target node IDs to execute in parallel
graph.add_parallel_edges(
    "fan_out",
    vec!["task_a".to_string(), "task_b".to_string(), "task_c".to_string()]
);
set_entry_point
fn(&mut self, node: impl Into<String>) -> &mut Self
Set the graph entry point (equivalent to add_edge(START, node))
graph.set_entry_point("start_node");
set_finish_point
fn(&mut self, node: impl Into<String>) -> &mut Self
Set a finish point (equivalent to add_edge(node, END))
graph.set_finish_point("final_node");

State Reducers

add_reducer
fn(&mut self, key: impl Into<String>, reducer: Box<dyn Reducer>) -> &mut Self
Add a reducer for a state key to control how updates are mergedParameters:
  • key: State key name
  • reducer: Reducer implementation (see Reducers)
use mofa_foundation::workflow::AppendReducer;

graph.add_reducer("messages", Box::new(AppendReducer));

Configuration

with_config
fn(&mut self, config: GraphConfig) -> &mut Self
Set graph configuration including parallelism limits and recursion depth
use mofa_kernel::workflow::GraphConfig;

graph.with_config(GraphConfig {
    max_parallelism: 10,
    recursion_limit: 100,
});
with_policy
fn(&mut self, node_id: impl Into<String>, policy: NodePolicy) -> &mut Self
Attach fault-tolerance policy to a specific node
use mofa_foundation::workflow::NodePolicy;

graph.with_policy(
    "api_call",
    NodePolicy {
        max_retries: 3,
        retry_delay_ms: 1000,
        circuit_breaker_threshold: 5,
        fallback_node: Some("fallback_handler".to_string()),
    }
);

Compilation

compile
fn(self) -> AgentResult<CompiledGraph<S>>
Compile the graph into an executable formThis validates the graph structure and prepares it for execution.Validation checks:
  • Entry point is set
  • All nodes are reachable from entry point
  • All edge references point to valid nodes
  • Fallback nodes in policies exist
let compiled = graph.compile()?;

CompiledGraph Trait

A compiled graph ready for execution.

Execution Methods

invoke
async fn(&self, input: S, config: Option<RuntimeContext>) -> AgentResult<S>
Execute the graph synchronously and return the final stateParameters:
  • input: Initial state
  • config: Optional runtime configuration
Returns: Final state after execution completes
let initial_state = JsonState::new();
let final_state = compiled.invoke(initial_state, None).await?;
stream
fn(&self, input: S, config: Option<RuntimeContext>) -> GraphStream<S>
Execute the graph with streaming outputReturns a stream of events as each node executes.Stream Events:
  • NodeStart { node_id, state }: Node started
  • NodeEnd { node_id, state, command }: Node completed
  • NodeRetry { node_id, attempt, error }: Node retrying
  • NodeFallback { from_node, to_node, reason }: Falling back
  • CircuitOpen { node_id }: Circuit breaker opened
  • End { final_state }: Execution completed
  • Error { node_id, error }: Error occurred
use futures::StreamExt;

let mut stream = compiled.stream(initial_state, None);

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::NodeStart { node_id, .. } => {
            println!("Started: {}", node_id);
        }
        StreamEvent::NodeEnd { node_id, .. } => {
            println!("Completed: {}", node_id);
        }
        StreamEvent::End { final_state } => {
            println!("Done: {:?}", final_state);
        }
        _ => {}
    }
}
step
async fn(&self, input: S, config: Option<RuntimeContext>) -> AgentResult<StepResult<S>>
Execute a single step of the graphUseful for debugging or interactive execution.Returns: StepResult containing:
  • state: Current state after the step
  • node_id: Which node was executed
  • command: Command returned by the node
  • is_complete: Whether execution is complete
  • next_node: Next node to execute (if any)
let mut state = initial_state;
let mut ctx = RuntimeContext::default();

loop {
    let result = compiled.step(state, Some(ctx.clone())).await?;
    println!("Executed: {}", result.node_id);
    
    if result.is_complete {
        state = result.state;
        break;
    }
    
    state = result.state;
    if let Some(next) = result.next_node {
        ctx.set_current_node(&next).await;
    }
}

Validation

validate_state
fn(&self, state: &S) -> AgentResult<()>
Validate that a state is valid for this graph
compiled.validate_state(&state)?;
state_schema
fn(&self) -> HashMap<String, String>
Get the graph’s state schema showing reducer types for each key
let schema = compiled.state_schema();
// { "messages": "append", "result": "overwrite" }

GraphState Trait

Trait for types that can be used as workflow state.
get_value
fn(&self, key: &str) -> Option<Value>
Get a value from the state by key
let messages = state.get_value("messages");
apply_update
async fn(&mut self, key: &str, value: Value) -> AgentResult<()>
Apply an update to the state
state.apply_update("result", json!("success")).await?;

EdgeTarget Types

EdgeTarget::Single
enum variant
Single target node
EdgeTarget::Single("next_node".to_string())
EdgeTarget::Conditional
enum variant
Conditional edges with route names to node IDs
EdgeTarget::Conditional(HashMap::from([
    ("approve".to_string(), "approved".to_string()),
    ("reject".to_string(), "rejected".to_string()),
]))
EdgeTarget::Parallel
enum variant
Multiple parallel target nodes
EdgeTarget::Parallel(vec![
    "task_a".to_string(),
    "task_b".to_string(),
])

Complete Example

use mofa_kernel::workflow::{StateGraph, JsonState, START, END};
use mofa_foundation::workflow::{StateGraphImpl, AppendReducer, OverwriteReducer};
use mofa_kernel::workflow::{NodeFunc, Command, RuntimeContext};
use mofa_kernel::agent::error::AgentResult;
use async_trait::async_trait;
use serde_json::json;

// Define a custom node
struct ProcessNode;

#[async_trait]
impl NodeFunc<JsonState> for ProcessNode {
    async fn call(&self, state: &mut JsonState, _ctx: &RuntimeContext) -> AgentResult<Command> {
        // Process state
        let input = state.get_value("input").unwrap_or(json!(null));
        
        // Return command with updates
        Ok(Command::new()
            .update("messages", json!("Processed input"))
            .update("result", json!(format!("Done: {:?}", input)))
            .continue_())
    }
    
    fn name(&self) -> &str {
        "process"
    }
}

#[tokio::main]
async fn main() -> AgentResult<()> {
    // Build graph
    let mut graph = StateGraphImpl::<JsonState>::new("example")
        // Configure reducers
        .add_reducer("messages", Box::new(AppendReducer))
        .add_reducer("result", Box::new(OverwriteReducer))
        // Add nodes
        .add_node("process", Box::new(ProcessNode))
        // Add edges
        .add_edge(START, "process")
        .add_edge("process", END)
        // Compile
        .compile()?;
    
    // Execute
    let mut initial_state = JsonState::new();
    initial_state.apply_update("input", json!("Hello")).await?;
    
    let final_state = graph.invoke(initial_state, None).await?;
    
    println!("Result: {:?}", final_state.get_value("result"));
    
    Ok(())
}

Conditional Routing Example

use std::collections::HashMap;

// Node that returns routing decision
struct ClassifierNode;

#[async_trait]
impl NodeFunc<JsonState> for ClassifierNode {
    async fn call(&self, state: &mut JsonState, _ctx: &RuntimeContext) -> AgentResult<Command> {
        let input = state.get_value("query").unwrap_or(json!(null));
        let category = classify(input); // Your classification logic
        
        Ok(Command::new()
            .route(&category)  // Explicit routing
            .update("category", json!(category))
            .continue_())
    }
    
    fn name(&self) -> &str { "classifier" }
}

// Build graph with conditional edges
let mut graph = StateGraphImpl::<JsonState>::new("router")
    .add_node("classify", Box::new(ClassifierNode))
    .add_node("handle_billing", Box::new(BillingNode))
    .add_node("handle_technical", Box::new(TechnicalNode))
    .add_edge(START, "classify")
    .add_conditional_edges(
        "classify",
        HashMap::from([
            ("billing".to_string(), "handle_billing".to_string()),
            ("technical".to_string(), "handle_technical".to_string()),
        ])
    )
    .add_edge("handle_billing", END)
    .add_edge("handle_technical", END)
    .compile()?;

Parallel Execution Example

// Build graph with parallel execution
let mut graph = StateGraphImpl::<JsonState>::new("parallel")
    .add_node("fan_out", Box::new(FanOutNode))
    .add_node("task_a", Box::new(TaskA))
    .add_node("task_b", Box::new(TaskB))
    .add_node("task_c", Box::new(TaskC))
    .add_node("merge", Box::new(MergeNode))
    .add_edge(START, "fan_out")
    .add_parallel_edges(
        "fan_out",
        vec!["task_a".to_string(), "task_b".to_string(), "task_c".to_string()]
    )
    .add_edge("task_a", "merge")
    .add_edge("task_b", "merge")
    .add_edge("task_c", "merge")
    .add_edge("merge", END)
    .compile()?;

See Also

Build docs developers (and LLMs) love