Skip to main content

Overview

This comprehensive example demonstrates MoFA’s graph-based workflow orchestration system. It includes:
  • Linear, conditional, and parallel workflow patterns
  • ETL data pipelines
  • LLM agent workflow integration (Dify-style)
  • Event listening and monitoring
  • ReAct agent workflow integration
  • Multi-agent parallel analysis
  • Intelligent data pipelines

What You’ll Learn

  • Using WorkflowBuilder for fluent workflow construction
  • Implementing conditional branching
  • Parallel execution and result aggregation
  • State management and data passing
  • LLM integration in workflows
  • Event monitoring and checkpoints

Prerequisites

  • Rust 1.75 or higher
  • OpenAI API key (for LLM examples)
  • Understanding of async/await and graph concepts

Architecture

Source Code Overview

The example contains 1061 lines with 10 different workflow scenarios:
use mofa_sdk::workflow::{WorkflowBuilder, WorkflowValue};

let graph = WorkflowBuilder::new("linear", "Linear Workflow")
    .description("Simple linear data processing")
    .start()
    .task("fetch", "Fetch Data", |_ctx, input| async move {
        info!("Fetching data...");
        let data = format!("Data: {}", input.as_str().unwrap_or("default"));
        Ok(WorkflowValue::String(data))
    })
    .task("process", "Process Data", |_ctx, input| async move {
        info!("Processing: {:?}", input);
        let processed = format!("Processed - {}", input.as_str().unwrap_or(""));
        Ok(WorkflowValue::String(processed))
    })
    .task("save", "Save Result", |_ctx, input| async move {
        info!("Saving: {:?}", input);
        Ok(WorkflowValue::String("Save successful".to_string()))
    })
    .end()
    .build();

let executor = WorkflowExecutor::new(ExecutorConfig::default());
let result = executor.execute(&graph, WorkflowValue::String("API".into())).await?;

Running the Example

1
Run Basic Examples (1-5)
2
cd examples/workflow_orchestration
cargo run
3
Run with LLM Examples (6-10)
4
export OPENAI_API_KEY="your-api-key"
cargo run

Workflow Examples

Pattern: Start → Fetch → Process → Save → EndSimple sequential processing pipeline.Use Case: ETL, data transformation

Key Concepts

WorkflowBuilder API

WorkflowBuilder::new(id, name)
    .description(desc)              // Workflow description
    .start()                        // Add start node
    .task(id, name, handler)        // Add task node
    .condition(id, name, predicate) // Add conditional node
    .parallel(id, name)             // Start parallel section
    .branch(id, name, handler)      // Add parallel branch
    .join(id, name)                 // Join parallel branches
    .llm_agent(id, name, agent)     // Add LLM node
    .end()                          // Add end node
    .build()                        // Build graph

Node Types

Entry point of the workflow:
WorkflowNode::start("start")
Executes custom logic:
WorkflowNode::task("task_id", "Task Name", |ctx, input| async move {
    // Your logic
    Ok(WorkflowValue::String("result".to_string()))
})
Routes based on predicate:
WorkflowNode::condition("check", "Check", |ctx, input| async move {
    input.as_i64().unwrap_or(0) > 50
})
Integrates LLM agent:
WorkflowNode::llm_agent("llm", "LLM Node", agent)
Workflow termination:
WorkflowNode::end("end")

Context and State

// Access workflow context
.task("process", "Process", |ctx, input| async move {
    // Set variable
    ctx.set_variable("count", WorkflowValue::Int(10)).await;
    
    // Get variable
    let count = ctx.get_variable("count").await
        .and_then(|v| v.as_i64())
        .unwrap_or(0);
    
    Ok(WorkflowValue::Int(count + 1))
})

Event Monitoring

use mofa_sdk::workflow::ExecutionEvent;

let (event_tx, mut event_rx) = mpsc::channel(100);

let executor = WorkflowExecutor::new(config)
    .with_event_sender(event_tx);

tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        match event {
            ExecutionEvent::WorkflowStarted { workflow_id, .. } => {
                info!("Started: {}", workflow_id);
            }
            ExecutionEvent::NodeCompleted { node_id, result } => {
                info!("Completed: {} - {:?}", node_id, result.status);
            }
            _ => {}
        }
    }
});

Expected Output

=== MoFA Workflow Orchestration Example ===

--- Example 1: Linear Workflow ---
  [fetch_data] Fetching data...
  [process] Processing data
  [save] Saving result
  Workflow status: Completed
  Number of executed nodes: 5

--- Example 2: Conditional Branch Workflow ---
  [check_value] Checking value: 75 (threshold: 50)
  [high_path] Executing high-value path
  Workflow status: Completed

--- Example 3: Parallel Execution Workflow ---
  [task_a] Starting Task A...
  [task_b] Starting Task B...
  [task_c] Starting Task C...
  [task_b] Task B complete
  [task_c] Task C complete
  [task_a] Task A complete
  [join] Aggregating all results
  Workflow status: Completed

=== Dify-style LLM/Agent Workflow Examples ===

--- Example 6: ReAct Agent Decision Workflow ---
  [gather_context] Gathering context information...
  [react_agent] Starting ReAct reasoning...
  [react_agent] Reasoning complete, iterations: 3
  [final_synthesis] LLM synthesizing results...
  Workflow status: Completed

=== All examples have finished executing ===

Advanced Features

Checkpoints and Recovery

let executor = WorkflowExecutor::new(ExecutorConfig {
    enable_checkpoints: true,
    checkpoint_interval: 3,  // Checkpoint every 3 nodes
    ..Default::default()
});

// Resume from checkpoint
let result = executor.resume_from_checkpoint(
    &checkpoint_data,
    &graph
).await?;

Timeout and Retries

let config = ExecutorConfig {
    max_execution_time: Duration::from_secs(300),
    retry_on_failure: true,
    max_retries: 3,
    ..Default::default()
};

Custom Aggregation

.join_with_transform("join", "Aggregate", |results| async move {
    // Custom aggregation logic
    let sum: i64 = results.values()
        .filter_map(|v| v.as_i64())
        .sum();
    WorkflowValue::Int(sum)
})

Common Use Cases

ETL Pipelines

Extract, transform, load data workflows

ML Pipelines

Training, validation, deployment flows

CI/CD

Build, test, deploy automation

Business Process

Multi-stage approval workflows

Troubleshooting

Problem: Node returns errorSolution: Add error handling:
.task("task", "Task", |ctx, input| async move {
    match process(input).await {
        Ok(result) => Ok(result),
        Err(e) => {
            // Log and return default
            warn!("Task failed: {}", e);
            Ok(WorkflowValue::String("default".into()))
        }
    }
})
Problem: Workflow hangsSolution: Check for circular dependencies and add timeout:
let config = ExecutorConfig {
    max_execution_time: Duration::from_secs(60),
    ..Default::default()
};
Problem: Large workflows consume too much memorySolution: Use streaming or chunking:
.task("process", "Process", |ctx, input| async move {
    // Process in chunks
    for chunk in input.chunks(1000) {
        process_chunk(chunk).await?;
    }
    Ok(WorkflowValue::Null)
})

Next Steps

ReAct Agent

Integrate reasoning agents

Multi-Agent

Multi-agent workflows

Workflow Guide

Advanced workflow patterns

Workflow API

Complete API reference

Build docs developers (and LLMs) love