Skip to main content

Distributed Runtime with Dora-rs

MoFA integrates with dora-rs, a high-performance dataflow runtime for distributed agent systems. This enables scaling agents across processes and machines while maintaining low-latency communication.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    MoFA + Dora-rs Integration                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                   MoFA Layer                             │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │   │
│  │  │ DoraAgent   │ │DoraOperator │ │ DoraDataflow        │ │   │
│  │  │   Node      │ │  (Plugin)   │ │   Builder           │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────┘ │   │
│  └──────────────────────────────────────────────────────────┘   │
│                               │                                 │
│                               ▼                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                   Dora-rs Runtime                        │   │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │   │
│  │  │   Dataflow  │ │   Nodes     │ │    Operators        │ │   │
│  │  │   Executor  │ │  (Processes)│ │   (Zero-copy)       │ │   │
│  │  └─────────────┘ └─────────────┘ └─────────────────────┘ │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Enable Dora Support

Add the dora feature flag:
[dependencies]
mofa-runtime = { version = "*", features = ["dora"] }

Runtime Modes

Embedded Mode

Run dataflows in the current process:
use mofa_runtime::dora_adapter::{
    DoraRuntime, RuntimeMode, EmbeddedConfig
};

let config = EmbeddedConfig {
    working_directory: "./dataflows".into(),
    log_level: "info".to_string(),
};

let runtime = DoraRuntime::new(RuntimeMode::Embedded(config))?;

Distributed Mode

Run dataflows across multiple machines:
use mofa_runtime::dora_adapter::{
    DoraRuntime, RuntimeMode, DistributedConfig
};

let config = DistributedConfig {
    coordinator_addr: "http://coordinator:4000".to_string(),
    machine_id: "worker-01".to_string(),
    listen_addr: "0.0.0.0:5000".to_string(),
    working_directory: "./dataflows".into(),
    log_level: "info".to_string(),
};

let runtime = DoraRuntime::new(RuntimeMode::Distributed(config))?;

Dataflow Builder

Define dataflow graphs declaratively:
use mofa_runtime::dora_adapter::{
    DataflowBuilder, NodeConnection, ChannelConfig
};

let dataflow = DataflowBuilder::new("agent_pipeline")
    .with_description("Multi-agent processing pipeline")
    // Add agent nodes
    .add_agent_node("input_agent", input_agent, vec!["raw_data".to_string()])
    .add_agent_node("processor", processor_agent, vec!["processed".to_string()])
    .add_agent_node("output_agent", output_agent, vec![])
    // Connect nodes
    .add_connection(NodeConnection {
        from_node: "input_agent".to_string(),
        from_output: "raw_data".to_string(),
        to_node: "processor".to_string(),
        to_input: "input".to_string(),
        channel_config: ChannelConfig::default(),
    })
    .add_connection(NodeConnection {
        from_node: "processor".to_string(),
        from_output: "processed".to_string(),
        to_node: "output_agent".to_string(),
        to_input: "final".to_string(),
        channel_config: ChannelConfig::default(),
    })
    .build()?;

Agent Nodes

Wrap MoFA agents as Dora nodes:
use mofa_runtime::dora_adapter::{
    DoraAgentNode, DoraNodeConfig, NodeEventLoop
};
use mofa_sdk::agent::Agent;

// Create your MoFA agent
let agent = create_my_agent()?;

// Wrap as Dora node
let node_config = DoraNodeConfig {
    id: "my_agent".to_string(),
    inputs: vec!["input_data".to_string()],
    outputs: vec!["result".to_string()],
    ..Default::default()
};

let dora_node = DoraAgentNode::new(agent, node_config);

// Run event loop
let event_loop = NodeEventLoop::new(dora_node);
event_loop.run().await?;

Custom Node Logic

impl DoraAgentNode {
    async fn on_input(&mut self, input_id: &str, data: Vec<u8>) -> Result<()> {
        match input_id {
            "task_queue" => {
                let task: Task = bincode::deserialize(&data)?;
                let result = self.agent.process_task(task).await?;
                self.send_output("results", &result).await?;
            }
            "control" => {
                let cmd: Command = bincode::deserialize(&data)?;
                self.handle_command(cmd).await?;
            }
            _ => warn!("Unknown input: {}", input_id),
        }
        Ok(())
    }
}

Operators

Use operators for lightweight processing:
use mofa_runtime::dora_adapter::{
    MoFAOperator, OperatorInput, OperatorOutput
};

struct FilterOperator {
    threshold: f64,
}

impl MoFAOperator for FilterOperator {
    fn on_input(
        &mut self,
        input_id: &str,
        data: Vec<u8>,
    ) -> Result<Vec<OperatorOutput>> {
        let value: f64 = bincode::deserialize(&data)?;
        
        if value > self.threshold {
            Ok(vec![OperatorOutput {
                id: "passed".to_string(),
                data: data,
            }])
        } else {
            Ok(vec![OperatorOutput {
                id: "filtered".to_string(),
                data: vec![],
            }])
        }
    }
}

Plugin Integration

Wrap MoFA plugins as Dora operators:
use mofa_runtime::dora_adapter::{
    DoraPluginOperator, PluginOperatorAdapter
};

let plugin = create_my_plugin()?;

let operator = PluginOperatorAdapter::new(
    plugin,
    vec!["input"],
    vec!["output"],
);

let dora_operator = DoraPluginOperator::from_adapter(operator);

Channel Configuration

Control communication between nodes:
use mofa_runtime::dora_adapter::ChannelConfig;

let config = ChannelConfig {
    buffer_size: 1000,           // Message buffer
    shared_memory: true,          // Use zero-copy shared memory
    qos: QoS::BestEffort,        // Quality of service
    priority: 5,                 // Channel priority
};

let connection = NodeConnection {
    from_node: "producer".to_string(),
    from_output: "data".to_string(),
    to_node: "consumer".to_string(),
    to_input: "stream".to_string(),
    channel_config: config,
};

Running Dataflows

Simple Execution

use mofa_runtime::dora_adapter::run_dataflow;

let result = run_dataflow(dataflow, runtime).await?;

match result {
    DataflowResult::Success => println!("Dataflow completed"),
    DataflowResult::Failed(err) => eprintln!("Failed: {}", err),
    DataflowResult::Cancelled => println!("Cancelled by user"),
}

With Logging

use mofa_runtime::dora_adapter::{
    run_dataflow_with_logs, LogDestination
};

let log_dest = LogDestination::File {
    path: "./logs/dataflow.log".into(),
    rotation: Some(LogRotation::Daily),
};

run_dataflow_with_logs(dataflow, runtime, log_dest).await?;

Background Execution

let runtime = DoraRuntime::new(mode)?;
let handle = runtime.spawn_dataflow(dataflow);

// Do other work...

// Wait for completion
let result = handle.await?;

Message Passing

Zero-copy Messaging

Dora uses shared memory for efficient communication:
use mofa_runtime::dora_adapter::MessageEnvelope;

// Send large data without copying
let large_data = vec![0u8; 10_000_000];  // 10MB
let envelope = MessageEnvelope::new("image_data", large_data);

node.send_output("processed_image", envelope).await?;

Serialization

use mofa_runtime::dora_adapter::DoraChannel;

// Send structured data
let result = ProcessingResult {
    status: "completed".to_string(),
    metrics: metrics_data,
};

let serialized = bincode::serialize(&result)?;
node.send_output("results", serialized).await?;

Fault Tolerance

Node Restarts

let node_config = DoraNodeConfig {
    id: "resilient_agent".to_string(),
    restart_policy: RestartPolicy::Always,
    max_restarts: 3,
    restart_delay_ms: 5000,
    ..Default::default()
};

Health Checks

impl DoraAgentNode {
    async fn health_check(&self) -> Result<HealthStatus> {
        // Check agent state
        if self.agent.is_healthy().await {
            Ok(HealthStatus::Healthy)
        } else {
            Ok(HealthStatus::Degraded { 
                reason: "High error rate".to_string() 
            })
        }
    }
}

Monitoring

Integrate with MoFA monitoring:
use mofa_monitoring::MetricsCollector;

let collector = Arc::new(MetricsCollector::new(config));

let node = DoraAgentNode::new(agent, node_config)
    .with_metrics_collector(collector);

// Metrics are automatically collected
node.run().await?;

Distributed Coordination

Coordinator Setup

# Start coordinator
dora coordinator --addr 0.0.0.0:4000

Worker Registration

let config = DistributedConfig {
    coordinator_addr: "http://coordinator:4000".to_string(),
    machine_id: "worker-01".to_string(),
    listen_addr: "0.0.0.0:5000".to_string(),
    capabilities: vec![
        "gpu".to_string(),
        "large-memory".to_string(),
    ],
    ..Default::default()
};

let runtime = DoraRuntime::new(RuntimeMode::Distributed(config))?;
runtime.register_worker().await?;

Best Practices

  1. Use operators for simple logic: Reserve agent nodes for complex reasoning
  2. Enable shared memory: Use zero-copy for large data transfers
  3. Set appropriate buffer sizes: Balance latency and memory usage
  4. Implement health checks: Enable automatic recovery
  5. Monitor dataflow performance: Track message rates and latency
  6. Handle backpressure: Implement flow control for slow consumers
  7. Use typed messages: Define clear data schemas
  8. Test locally first: Debug in embedded mode before deploying

Performance Tuning

Buffer Sizing

let config = ChannelConfig {
    buffer_size: 10000,      // Larger buffer for bursty traffic
    shared_memory: true,     // Zero-copy for large messages
    ..Default::default()
};

Concurrency

let node_config = DoraNodeConfig {
    id: "parallel_agent".to_string(),
    concurrency: 4,          // Process 4 messages concurrently
    ..Default::default()
};

Backpressure

impl DoraAgentNode {
    async fn on_input(&mut self, input_id: &str, data: Vec<u8>) -> Result<()> {
        // Check queue depth
        if self.pending_tasks.len() > 1000 {
            // Signal backpressure
            self.send_control_message("slow_down").await?;
        }
        
        self.process_input(input_id, data).await
    }
}

Deployment

Docker Compose

version: '3'
services:
  coordinator:
    image: dora-coordinator
    ports:
      - "4000:4000"
  
  worker-1:
    image: mofa-agent
    environment:
      - DORA_COORDINATOR=http://coordinator:4000
      - MACHINE_ID=worker-1
    depends_on:
      - coordinator
  
  worker-2:
    image: mofa-agent
    environment:
      - DORA_COORDINATOR=http://coordinator:4000
      - MACHINE_ID=worker-2
    depends_on:
      - coordinator

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mofa-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mofa-worker
  template:
    metadata:
      labels:
        app: mofa-worker
    spec:
      containers:
      - name: worker
        image: mofa-agent:latest
        env:
        - name: DORA_COORDINATOR
          value: "http://coordinator-service:4000"
        - name: MACHINE_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

Troubleshooting

Connection Issues

// Enable debug logging
let config = DistributedConfig {
    log_level: "debug".to_string(),
    ..config
};

// Check connectivity
runtime.ping_coordinator().await?;

Message Drops

// Monitor channel statistics
let stats = node.get_channel_stats("output").await?;
println!("Dropped: {}", stats.dropped_messages);
println!("Backpressure events: {}", stats.backpressure_count);

Performance Debugging

// Enable profiling
let node = DoraAgentNode::new(agent, config)
    .with_profiling(true);

let profile = node.get_profile_data().await?;
println!("Avg processing time: {}ms", profile.avg_processing_ms);

See Also

Build docs developers (and LLMs) love