Skip to main content

Workflow Nodes

Workflow nodes are the building blocks of MoFA workflows. This page documents all node types, reducer implementations, and execution policies.

NodeFunc Trait

The core trait for implementing custom workflow nodes.

Trait Definition

#[async_trait]
pub trait NodeFunc<S: GraphState>: Send + Sync {
    async fn call(&self, state: &mut S, ctx: &RuntimeContext) -> AgentResult<Command>;
    fn name(&self) -> &str;
    fn description(&self) -> Option<&str> { None }
}

Parameters

state
&mut S
Mutable reference to the current workflow stateThe node can read from and modify the state directly, but state updates returned in the Command are the recommended approach.
ctx
&RuntimeContext
Runtime context with execution metadataAvailable information:
  • execution_id: Unique execution identifier
  • graph_id: Graph identifier
  • current_node: Current node being executed
  • remaining_steps: Steps remaining before recursion limit
  • config: Graph configuration
  • metadata: Custom metadata
  • tags: Execution tags

Return Value

Command
AgentResult<Command>
Returns a Command containing:
  • State updates to apply
  • Control flow directive (continue, goto, return, send)
  • Optional routing decision for conditional edges
See Command API for details.

Node Types

MoFA provides several built-in node types for common workflow patterns.

Start Node

NodeType::Start
enum variant
Entry point of the workflowCharacteristics:
  • No execution logic
  • Passes input through unchanged
  • Must exist exactly once per workflow
use mofa_foundation::workflow::WorkflowNode;

let start = WorkflowNode::start("start");

End Node

NodeType::End
enum variant
Exit point of the workflowCharacteristics:
  • No execution logic
  • Passes input through unchanged
  • Marks workflow completion
  • Can have multiple end nodes
let end = WorkflowNode::end("end");

Task Node

NodeType::Task
enum variant
Executes custom logicCharacteristics:
  • Accepts async closure for execution
  • Supports retry policy
  • Configurable timeout
let task = WorkflowNode::task(
    "process",
    "Process Data",
    |_ctx, input| async move {
        let value = input.as_i64().unwrap_or(0);
        Ok(WorkflowValue::Int(value * 2))
    }
);

Agent Node

NodeType::Agent
enum variant
Invokes an LLM agentCharacteristics:
  • Integrates LLM agents into workflows
  • Supports prompt templates
  • Automatic input/output conversion
use std::sync::Arc;

let agent = WorkflowNode::llm_agent(
    "classify",
    "Classify Input",
    Arc::new(llm_agent)
);

// With prompt template
let agent = WorkflowNode::llm_agent_with_template(
    "analyze",
    "Analyze",
    Arc::new(llm_agent),
    "Analyze this: {{ input }}".to_string()
);

Condition Node

NodeType::Condition
enum variant
Evaluates a boolean condition for branchingCharacteristics:
  • Returns “true” or “false” as string
  • Used with conditional edges
  • Evaluates synchronously
let condition = WorkflowNode::condition(
    "check_value",
    "Check Value",
    |_ctx, input| async move {
        input.as_i64().unwrap_or(0) > 50
    }
);

Parallel Node

NodeType::Parallel
enum variant
Fan-out marker for parallel executionCharacteristics:
  • Dispatches to multiple parallel branches
  • Each branch receives isolated state snapshot
  • Execution controlled by add_parallel_edges
let parallel = WorkflowNode::parallel(
    "fork",
    "Fork Tasks",
    vec!["task_a", "task_b", "task_c"]
);

Join Node

NodeType::Join
enum variant
Waits for multiple branches to complete and aggregates resultsCharacteristics:
  • Waits for all specified nodes to complete
  • Optional transform function to combine outputs
  • Default behavior merges into Map
// Simple join (merges to Map)
let join = WorkflowNode::join(
    "merge",
    "Merge Results",
    vec!["task_a", "task_b", "task_c"]
);

// Join with custom transform
let join = WorkflowNode::join_with_transform(
    "combine",
    "Combine",
    vec!["task_a", "task_b"],
    |results| async move {
        let a = results.get("task_a").cloned().unwrap_or(WorkflowValue::Null);
        let b = results.get("task_b").cloned().unwrap_or(WorkflowValue::Null);
        WorkflowValue::String(format!("Combined: {:?} + {:?}", a, b))
    }
);

Loop Node

NodeType::Loop
enum variant
Executes logic repeatedly while condition is trueCharacteristics:
  • Loop body executor
  • Loop condition evaluator
  • Maximum iteration limit
  • State carried between iterations
let loop_node = WorkflowNode::loop_node(
    "counter",
    "Count Loop",
    // Body: increment value
    |_ctx, input| async move {
        let n = input.as_i64().unwrap_or(0);
        Ok(WorkflowValue::Int(n + 1))
    },
    // Condition: continue while < 10
    |_ctx, input| async move {
        input.as_i64().unwrap_or(0) < 10
    },
    100  // max iterations
);

Transform Node

NodeType::Transform
enum variant
Transforms data without side effectsCharacteristics:
  • Pure transformation function
  • Receives map of inputs
  • Returns transformed value
let transform = WorkflowNode::transform(
    "format",
    "Format Output",
    |inputs| async move {
        let input = inputs.get("input").cloned()
            .unwrap_or(WorkflowValue::Null);
        WorkflowValue::String(format!("Formatted: {:?}", input))
    }
);

SubWorkflow Node

NodeType::SubWorkflow
enum variant
Invokes another workflow as a sub-workflowCharacteristics:
  • References workflow by ID
  • Passes input to sub-workflow
  • Returns sub-workflow output
let sub = WorkflowNode::sub_workflow(
    "sub_process",
    "Sub-Process",
    "child_workflow_id"
);

Wait Node

NodeType::Wait
enum variant
Waits for external eventCharacteristics:
  • Pauses execution
  • Waits for specified event type
  • Resumes when event received
let wait = WorkflowNode::wait(
    "wait_approval",
    "Wait for Approval",
    "approval_event"
);

Reducers

Reducers control how state updates are merged with existing values.

Reducer Trait

#[async_trait]
pub trait Reducer: Send + Sync {
    async fn reduce(&self, current: Option<&Value>, update: &Value) -> AgentResult<Value>;
    fn name(&self) -> &str;
    fn reducer_type(&self) -> ReducerType;
}

Built-in Reducers

OverwriteReducer
struct
Replaces current value with update (default behavior)
use mofa_foundation::workflow::OverwriteReducer;

graph.add_reducer("result", Box::new(OverwriteReducer));

// Before: { "result": "old" }
// Update: { "result": "new" }
// After:  { "result": "new" }
AppendReducer
struct
Appends update to a list (creates list if doesn’t exist)
use mofa_foundation::workflow::AppendReducer;

graph.add_reducer("messages", Box::new(AppendReducer));

// Before: { "messages": ["hello"] }
// Update: { "messages": "world" }
// After:  { "messages": ["hello", "world"] }
ExtendReducer
struct
Extends list with items from another list
use mofa_foundation::workflow::ExtendReducer;

graph.add_reducer("items", Box::new(ExtendReducer));

// Before: { "items": [1, 2] }
// Update: { "items": [3, 4, 5] }
// After:  { "items": [1, 2, 3, 4, 5] }
MergeReducer
struct
Merges objects (shallow or deep)
use mofa_foundation::workflow::MergeReducer;

// Shallow merge
graph.add_reducer("config", Box::new(MergeReducer::shallow()));

// Deep merge
graph.add_reducer("settings", Box::new(MergeReducer::deep()));

// Before: { "config": { "a": 1, "b": 2 } }
// Update: { "config": { "b": 3, "c": 4 } }
// After:  { "config": { "a": 1, "b": 3, "c": 4 } }
LastNReducer
struct
Keeps only the last N items in a list
use mofa_foundation::workflow::LastNReducer;

graph.add_reducer("history", Box::new(LastNReducer::new(3)));

// Before: { "history": [1, 2, 3, 4, 5] }
// Update: { "history": 6 }
// After:  { "history": [4, 5, 6] }
FirstReducer
struct
Keeps the first non-null value (ignores subsequent updates)
use mofa_foundation::workflow::FirstReducer;

graph.add_reducer("initial", Box::new(FirstReducer));

// Before: null
// Update: "first"
// After:  "first"
// Update: "second"
// After:  "first" (unchanged)
LastReducer
struct
Keeps the last non-null value
use mofa_foundation::workflow::LastReducer;

graph.add_reducer("current", Box::new(LastReducer));

// Before: "first"
// Update: "second"
// After:  "second"
CustomReducer
struct
Custom reducer using a closure
use mofa_foundation::workflow::CustomReducer;

let sum_reducer = CustomReducer::new("sum", |current, update| {
    let curr = current.and_then(|v| v.as_i64()).unwrap_or(0);
    let upd = update.as_i64().unwrap_or(0);
    Ok(json!(curr + upd))
});

graph.add_reducer("total", Box::new(sum_reducer));

Retry Policies

Configure retry behavior for fault tolerance.

RetryPolicy

RetryPolicy
struct
Defines retry behavior for node executionFields:
max_retries
u32
default:"3"
Maximum number of retry attempts
retry_delay_ms
u64
default:"1000"
Delay between retries in milliseconds
exponential_backoff
bool
default:"true"
Whether to use exponential backoff (doubles delay each retry)
max_delay_ms
u64
default:"30000"
Maximum delay cap in milliseconds
use mofa_foundation::workflow::node::RetryPolicy;

// Default policy
let policy = RetryPolicy::default();

// No retry
let policy = RetryPolicy::no_retry();

// Custom retry
let policy = RetryPolicy {
    max_retries: 5,
    retry_delay_ms: 2000,
    exponential_backoff: true,
    max_delay_ms: 60000,
};

let node = WorkflowNode::task("api_call", "API Call", executor)
    .with_retry(policy);

Node Configuration

Configure individual node behavior.

NodeConfig

NodeConfig
struct
Per-node configurationFields:
id
String
Unique node identifier
name
String
Human-readable node name
node_type
NodeType
Node type (Start, End, Task, Agent, etc.)
description
String
Node description
retry_policy
RetryPolicy
Retry policy for this node
timeout
TimeoutConfig
Timeout configuration
metadata
HashMap<String, String>
Custom metadata key-value pairs
use mofa_foundation::workflow::node::{NodeConfig, NodeType, RetryPolicy, TimeoutConfig};

let config = NodeConfig::new("my_node", "My Node", NodeType::Task)
    .with_description("Processes data")
    .with_retry_policy(RetryPolicy::with_retries(5))
    .with_timeout(TimeoutConfig {
        execution_timeout_ms: 30000,
        cancel_on_timeout: true,
    })
    .with_metadata("priority", "high");

TimeoutConfig

TimeoutConfig
struct
Timeout configuration for node execution
execution_timeout_ms
u64
default:"60000"
Execution timeout in milliseconds (default 1 minute)
cancel_on_timeout
bool
default:"true"
Whether to cancel execution on timeout
use mofa_foundation::workflow::node::TimeoutConfig;

let timeout = TimeoutConfig {
    execution_timeout_ms: 120000,  // 2 minutes
    cancel_on_timeout: true,
};

let node = WorkflowNode::task("long_task", "Long Task", executor)
    .with_timeout(120000);

Complete Example

use mofa_kernel::workflow::{StateGraph, JsonState, START, END};
use mofa_foundation::workflow::{
    StateGraphImpl, AppendReducer, OverwriteReducer, LastNReducer,
    WorkflowNode, RetryPolicy,
};
use mofa_foundation::workflow::node::NodeType;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Build workflow
    let mut graph = StateGraphImpl::<JsonState>::new("example")
        // Configure reducers
        .add_reducer("messages", Box::new(AppendReducer))
        .add_reducer("result", Box::new(OverwriteReducer))
        .add_reducer("history", Box::new(LastNReducer::new(5)));
    
    // Add task node with retry
    let fetch_node = WorkflowNode::task(
        "fetch",
        "Fetch Data",
        |_ctx, _input| async move {
            // Simulate API call
            Ok(WorkflowValue::String("data".to_string()))
        }
    ).with_retry(RetryPolicy::with_retries(3))
     .with_timeout(30000);
    
    graph.add_node("fetch", Box::new(fetch_node));
    
    // Add condition node
    let check_node = WorkflowNode::condition(
        "check",
        "Check Data",
        |_ctx, input| async move {
            input.as_str().map(|s| !s.is_empty()).unwrap_or(false)
        }
    );
    
    graph.add_node("check", Box::new(check_node));
    
    // Add processing branches
    let process_node = WorkflowNode::task(
        "process",
        "Process Data",
        |_ctx, input| async move {
            Ok(WorkflowValue::String(format!("Processed: {:?}", input)))
        }
    );
    
    graph.add_node("process", Box::new(process_node));
    
    // Build graph
    graph
        .add_edge(START, "fetch")
        .add_edge("fetch", "check")
        .add_conditional_edges(
            "check",
            HashMap::from([
                ("true".to_string(), "process".to_string()),
                ("false".to_string(), END.to_string()),
            ])
        )
        .add_edge("process", END);
    
    // Compile and execute
    let compiled = graph.compile()?;
    let initial_state = JsonState::new();
    let final_state = compiled.invoke(initial_state, None).await?;
    
    println!("Result: {:?}", final_state.get_value("result"));
    
    Ok(())
}

See Also

Build docs developers (and LLMs) love