Skip to main content

Parallel and Sequential Patterns

MoFA provides two complementary patterns for task orchestration: Parallel for concurrent execution of independent tasks, and Sequential for pipeline processing of dependent task chains.

Pattern Comparison

Parallel Pattern

Simultaneous execution of independent tasks with result aggregation

Sequential Pattern

Step-by-step execution where each task depends on previous results

Parallel Pattern

The Parallel pattern enables concurrent task execution across multiple agents, maximizing throughput and resource utilization.

Architecture

When to Use Parallel

  • Data Analysis: Process multiple datasets simultaneously
  • Batch Processing: Execute independent operations in parallel
  • Distributed Search: Query multiple sources concurrently
  • Map-Reduce: Parallel data processing followed by aggregation

Basic Parallel Usage

use mofa_sdk::collaboration::{
    ParallelProtocol,
    CollaborationMessage,
    CollaborationMode,
    CollaborationContent
};
use std::sync::Arc;

// Create parallel protocol (default: 4 workers)
let protocol = Arc::new(ParallelProtocol::new("coordinator"));

// Create parallel task
let task = CollaborationMessage::new(
    "coordinator",
    CollaborationContent::Data(serde_json::json!({
        "action": "analyze",
        "data_chunks": [
            {"id": 1, "data": "chunk_1"},
            {"id": 2, "data": "chunk_2"},
            {"id": 3, "data": "chunk_3"},
            {"id": 4, "data": "chunk_4"}
        ]
    })),
    CollaborationMode::Parallel
);

protocol.send_message(task).await?;

if let Some(msg) = protocol.receive_message().await? {
    let result = protocol.process_message(msg).await?;
    println!("Parallel processing completed in {}ms", result.duration_ms);
}

LLM-Enhanced Parallel Processing

use mofa_sdk::collaboration::ParallelProtocol;
use mofa_sdk::llm::{LLMClient, openai_from_env};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let provider = openai_from_env()?;
    let llm_client = Arc::new(LLMClient::new(Arc::new(provider)));
    
    // Create LLM-enabled parallel protocol
    let protocol = Arc::new(ParallelProtocol::with_llm(
        "parallel_coordinator",
        llm_client.clone()
    ));
    
    // LLM will intelligently distribute and process tasks
    let documents = vec![
        "Document 1: Customer feedback about product quality...",
        "Document 2: Market analysis report...",
        "Document 3: Competitor pricing strategy...",
        "Document 4: Internal team feedback..."
    ];
    
    let task = CollaborationMessage::new(
        "coordinator",
        CollaborationContent::Data(serde_json::json!({
            "action": "analyze_sentiment",
            "documents": documents
        })),
        CollaborationMode::Parallel
    );
    
    protocol.send_message(task).await?;
    
    if let Some(msg) = protocol.receive_message().await? {
        let result = protocol.process_message(msg).await?;
        
        if let Some(data) = result.data {
            println!("Analysis results: {}", data.to_text());
        }
    }
    
    Ok(())
}

Real-World Parallel Examples

Data Processing Pipeline

use mofa_sdk::collaboration::{
    ParallelProtocol,
    CollaborationMessage,
    CollaborationContent,
    CollaborationMode
};
use std::sync::Arc;
use tokio::task;

async fn parallel_data_processing() -> Result<(), Box<dyn std::error::Error>> {
    let protocol = Arc::new(ParallelProtocol::new("data_processor"));
    
    // Simulate large dataset split into chunks
    let data_chunks: Vec<_> = (0..10)
        .map(|i| serde_json::json!({
            "chunk_id": i,
            "data": format!("data_chunk_{}", i),
            "size_mb": 100
        }))
        .collect();
    
    let task = CollaborationMessage::new(
        "coordinator",
        CollaborationContent::Data(serde_json::json!({
            "operation": "transform_and_validate",
            "chunks": data_chunks
        })),
        CollaborationMode::Parallel
    );
    
    let start = std::time::Instant::now();
    
    protocol.send_message(task).await?;
    
    if let Some(msg) = protocol.receive_message().await? {
        let result = protocol.process_message(msg).await?;
        
        let duration = start.elapsed();
        println!("Processed 10 chunks in parallel");
        println!("Total time: {:.2}s", duration.as_secs_f64());
        println!("Success: {}", result.success);
    }
    
    Ok(())
}
use mofa_sdk::collaboration::{
    ParallelProtocol,
    CollaborationMessage,
    CollaborationContent,
    CollaborationMode
};
use std::sync::Arc;

async fn parallel_search() -> Result<(), Box<dyn std::error::Error>> {
    let protocol = Arc::new(ParallelProtocol::new("search_coordinator"));
    
    let search_sources = vec![
        ("database", "SELECT * FROM docs WHERE title LIKE '%rust%'"),
        ("elasticsearch", "{ \"query\": { \"match\": { \"content\": \"rust\" } } }"),
        ("filesystem", "find . -name '*.rs' -exec grep 'async' {} \\;"),
        ("api", "GET /search?q=rust&type=code")
    ];
    
    let task = CollaborationMessage::new(
        "search_coordinator",
        CollaborationContent::Data(serde_json::json!({
            "query": "rust programming",
            "sources": search_sources
        })),
        CollaborationMode::Parallel
    );
    
    protocol.send_message(task).await?;
    
    if let Some(msg) = protocol.receive_message().await? {
        let result = protocol.process_message(msg).await?;
        
        println!("Search completed across {} sources", search_sources.len());
        println!("Participants: {:?}", result.participants);
    }
    
    Ok(())
}

Sequential Pattern

The Sequential pattern implements pipeline processing where tasks execute in order, with each task receiving the output of the previous one.

Architecture

When to Use Sequential

  • Data Pipelines: Transform data through multiple stages
  • Workflow Orchestration: Execute dependent tasks in order
  • Request Processing: Chain of processing steps
  • Validation Chains: Multi-stage validation and enrichment

Basic Sequential Usage

use mofa_sdk::collaboration::{
    SequentialProtocol,
    CollaborationMessage,
    CollaborationMode,
    CollaborationContent
};
use std::sync::Arc;

// Create sequential protocol
let protocol = Arc::new(SequentialProtocol::new("pipeline_coordinator"));

// Create sequential task
let task = CollaborationMessage::new(
    "coordinator",
    CollaborationContent::Data(serde_json::json!({
        "pipeline": "data_processing",
        "stages": [
            {"name": "validate", "agent": "validator"},
            {"name": "transform", "agent": "transformer"},
            {"name": "enrich", "agent": "enricher"},
            {"name": "store", "agent": "storage"}
        ],
        "input_data": {"user_id": 123, "action": "purchase"}
    })),
    CollaborationMode::Sequential
);

protocol.send_message(task).await?;

if let Some(msg) = protocol.receive_message().await? {
    let result = protocol.process_message(msg).await?;
    println!("Pipeline completed in {}ms", result.duration_ms);
}

LLM-Enhanced Sequential Processing

use mofa_sdk::collaboration::SequentialProtocol;
use mofa_sdk::llm::{LLMClient, openai_from_env};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let provider = openai_from_env()?;
    let llm_client = Arc::new(LLMClient::new(Arc::new(provider)));
    
    // Create LLM-enabled sequential protocol
    let protocol = Arc::new(SequentialProtocol::with_llm(
        "intelligent_pipeline",
        llm_client.clone()
    ));
    
    // LLM will process each stage intelligently
    let content = "User feedback: The app crashes when I try to upload large files.";
    
    let task = CollaborationMessage::new(
        "coordinator",
        CollaborationContent::Mixed {
            text: "Process customer feedback through pipeline".to_string(),
            data: serde_json::json!({
                "stages": [
                    "classify_issue_type",
                    "extract_key_details",
                    "determine_severity",
                    "route_to_team",
                    "generate_response"
                ],
                "content": content
            })
        },
        CollaborationMode::Sequential
    );
    
    protocol.send_message(task).await?;
    
    if let Some(msg) = protocol.receive_message().await? {
        let result = protocol.process_message(msg).await?;
        
        if let Some(data) = result.data {
            println!("Pipeline output: {}", data.to_text());
        }
    }
    
    Ok(())
}

Real-World Sequential Examples

ETL Pipeline

use mofa_sdk::collaboration::{
    SequentialProtocol,
    CollaborationMessage,
    CollaborationContent,
    CollaborationMode
};

async fn extract_stage() -> Result<CollaborationContent, Box<dyn std::error::Error>> {
    println!("Stage 1: Extract data from source");
    
    // Simulate data extraction
    let extracted_data = serde_json::json!({
        "records": [
            {"id": 1, "name": "Alice", "status": "active"},
            {"id": 2, "name": "Bob", "status": "inactive"},
            {"id": 3, "name": "Charlie", "status": "active"}
        ],
        "metadata": {
            "source": "production_db",
            "timestamp": "2024-01-15T10:30:00Z",
            "count": 3
        }
    });
    
    Ok(CollaborationContent::Data(extracted_data))
}

Content Moderation Pipeline

use mofa_sdk::collaboration::{
    SequentialProtocol,
    CollaborationMessage,
    CollaborationContent,
    CollaborationMode
};
use mofa_sdk::llm::{LLMClient, openai_from_env};
use std::sync::Arc;

async fn content_moderation_pipeline() -> Result<(), Box<dyn std::error::Error>> {
    let provider = openai_from_env()?;
    let llm_client = Arc::new(LLMClient::new(Arc::new(provider)));
    
    let protocol = Arc::new(SequentialProtocol::with_llm(
        "moderation_pipeline",
        llm_client.clone()
    ));
    
    let user_content = "This is a sample user-generated comment...";
    
    let task = CollaborationMessage::new(
        "moderator",
        CollaborationContent::Data(serde_json::json!({
            "content": user_content,
            "stages": [
                {
                    "name": "spam_detection",
                    "description": "Check for spam patterns"
                },
                {
                    "name": "toxicity_analysis",
                    "description": "Analyze for toxic language"
                },
                {
                    "name": "pii_detection",
                    "description": "Detect personally identifiable information"
                },
                {
                    "name": "policy_compliance",
                    "description": "Check against community guidelines"
                },
                {
                    "name": "final_decision",
                    "description": "Approve, flag, or reject content"
                }
            ]
        })),
        CollaborationMode::Sequential
    );
    
    protocol.send_message(task).await?;
    
    if let Some(msg) = protocol.receive_message().await? {
        let result = protocol.process_message(msg).await?;
        
        println!("Moderation complete:");
        println!("  Success: {}", result.success);
        println!("  Duration: {}ms", result.duration_ms);
        
        if let Some(data) = result.data {
            println!("  Decision: {}", data.to_text());
        }
    }
    
    Ok(())
}

Combining Parallel and Sequential

Map-Reduce Pattern

use mofa_sdk::collaboration::{
    ParallelProtocol,
    SequentialProtocol,
    CollaborationMessage,
    CollaborationContent,
    CollaborationMode
};
use std::sync::Arc;

async fn map_reduce() -> Result<(), Box<dyn std::error::Error>> {
    // Step 1: Parallel map phase
    let parallel_protocol = Arc::new(ParallelProtocol::new("mapper"));
    
    let map_task = CollaborationMessage::new(
        "coordinator",
        CollaborationContent::Data(serde_json::json!({
            "operation": "map",
            "chunks": [
                {"id": 1, "data": [1, 2, 3, 4, 5]},
                {"id": 2, "data": [6, 7, 8, 9, 10]},
                {"id": 3, "data": [11, 12, 13, 14, 15]}
            ],
            "map_fn": "square" // x => x * x
        })),
        CollaborationMode::Parallel
    );
    
    parallel_protocol.send_message(map_task).await?;
    
    let map_results = if let Some(msg) = parallel_protocol.receive_message().await? {
        let result = parallel_protocol.process_message(msg).await?;
        result.data
    } else {
        None
    };
    
    // Step 2: Sequential reduce phase
    let sequential_protocol = Arc::new(SequentialProtocol::new("reducer"));
    
    let reduce_task = CollaborationMessage::new(
        "coordinator",
        CollaborationContent::Data(serde_json::json!({
            "operation": "reduce",
            "map_results": map_results,
            "reduce_fn": "sum" // (acc, x) => acc + x
        })),
        CollaborationMode::Sequential
    );
    
    sequential_protocol.send_message(reduce_task).await?;
    
    if let Some(msg) = sequential_protocol.receive_message().await? {
        let result = sequential_protocol.process_message(msg).await?;
        
        println!("Map-Reduce completed:");
        println!("  Duration: {}ms", result.duration_ms);
        if let Some(data) = result.data {
            println!("  Final result: {}", data.to_text());
        }
    }
    
    Ok(())
}

Using with Collaboration Manager

use mofa_sdk::collaboration::{
    LLMDrivenCollaborationManager,
    ParallelProtocol,
    SequentialProtocol
};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let manager = LLMDrivenCollaborationManager::new("orchestrator");
    
    // Register both protocols
    manager.register_protocol(Arc::new(
        ParallelProtocol::new("orchestrator")
    )).await?;
    
    manager.register_protocol(Arc::new(
        SequentialProtocol::new("orchestrator")
    )).await?;
    
    // Execute parallel task
    let parallel_result = manager.execute_task_with_protocol(
        "parallel",
        "Analyze multiple data sources simultaneously"
    ).await?;
    
    println!("Parallel execution: {}ms", parallel_result.duration_ms);
    
    // Execute sequential task
    let sequential_result = manager.execute_task_with_protocol(
        "sequential",
        "Process data through validation pipeline"
    ).await?;
    
    println!("Sequential execution: {}ms", sequential_result.duration_ms);
    
    // Get statistics
    let stats = manager.stats().await;
    println!("Total tasks: {}", stats.total_tasks);
    println!("Mode usage: {:?}", stats.mode_usage);
    
    Ok(())
}

Best Practices

1

Choose the Right Pattern

  • Use Parallel for independent tasks
  • Use Sequential for dependent tasks
  • Combine both for complex workflows
2

Handle Failures Gracefully

Parallel: Continue processing other tasks if one failsSequential: Implement rollback or retry logic
if !result.success {
    if let Some(error) = result.error {
        eprintln!("Stage failed: {}", error);
        // Implement retry or rollback
    }
}
3

Monitor Performance

Track execution times for optimization:
println!("Participants: {:?}", result.participants);
println!("Duration: {}ms", result.duration_ms);
4

Optimize Resource Usage

Parallel: Set appropriate worker limitsSequential: Minimize data copying between stages

See Also

Publish-Subscribe

For event-driven parallel processing

Request-Response

For simple sequential chains

Collaboration Overview

Return to patterns overview

Workflow Orchestration

Learn about advanced workflow patterns

Build docs developers (and LLMs) love