Skip to main content
The AgentRuntime (Dora mode) provides a distributed runtime for agents using the Dora-rs dataflow engine for multi-agent coordination and message passing.
This runtime is only available when the dora feature is enabled. For standalone agents, use SimpleRuntime.

Overview

AgentRuntime features:
  • Distributed dataflow execution via Dora-rs
  • Inter-agent message routing
  • Event-driven architecture
  • Plugin lifecycle management
  • Graceful interruption handling

Creating a Runtime

Create via AgentBuilder:
use mofa_runtime::AgentBuilder;

let runtime = AgentBuilder::new("agent-1", "My Agent")
    .with_input("task_input")
    .with_output("task_output")
    .with_plugin(Box::new(llm_plugin))
    .with_agent(my_agent)
    .await?;

Runtime API

agent

Get immutable reference to the agent.
let agent_ref = runtime.agent();
println!("Agent type: {}", agent_ref.agent_type());

agent_mut

Get mutable reference to the agent.
let agent_mut = runtime.agent_mut();
agent_mut.update_config(new_config)?;

node

Get reference to the Dora node.
let node = runtime.node();
println!("Node ID: {}", node.config().node_id);

metadata

Get agent metadata.
let metadata = runtime.metadata();
println!("Agent: {} ({})", metadata.name, metadata.id);
println!("State: {:?}", metadata.state);
println!("Capabilities: {:?}", metadata.capabilities);

config

Get agent configuration.
let config = runtime.config();
println!("Agent ID: {}", config.agent_id);
println!("Custom config: {:?}", config.node_config);

interrupt

Get the interrupt handle.
let interrupt = runtime.interrupt();

// In another task:
interrupt.trigger();

Lifecycle Methods

start

Initialize and start the runtime.
runtime.start().await?;
println!("Runtime started");

run_event_loop

Run the main event processing loop.
// Blocks until shutdown event or interruption
runtime.run_event_loop().await?;
The event loop:
  1. Initializes the agent with AgentContext
  2. Initializes all plugins
  3. Processes events from the Dora node
  4. Checks for interruption signals
  5. Executes agent for each event
  6. Handles shutdown gracefully

stop

Stop the runtime and shut down the agent.
runtime.stop().await?;
println!("Runtime stopped");

Message Operations

send_output

Send a message to an output port.
output_id
&str
Output port name (configured via with_output)
message
&AgentMessage
Message to send
let message = AgentMessage::TaskResponse {
    task_id: "task-1".to_string(),
    result: "Processing complete".to_string(),
};

runtime.send_output("task_output", &message).await?;

inject_event

Inject an event into the runtime for processing.
event
AgentEvent
Event to inject
let event = AgentEvent::TaskReceived(TaskRequest {
    task_id: "task-2".to_string(),
    content: "New task".to_string(),
});

runtime.inject_event(event).await?;

Plugin Management

init_plugins

Initialize all registered plugins.
runtime.init_plugins().await?;
This is automatically called during run_event_loop, but can be called manually if needed.

Complete Example

use mofa_runtime::AgentBuilder;
use mofa_kernel::message::{AgentEvent, AgentMessage, TaskRequest};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Build runtime
    let mut runtime = AgentBuilder::new("worker", "Worker Agent")
        .with_input("tasks")
        .with_output("results")
        .with_max_concurrent_tasks(10)
        .with_plugin(Box::new(llm_plugin))
        .with_agent(worker_agent)
        .await?;

    // 2. Start runtime
    runtime.start().await?;
    println!("Agent {} started", runtime.metadata().id);

    // 3. Spawn event loop in background
    let runtime_clone = runtime.clone();
    let event_loop_handle = tokio::spawn(async move {
        if let Err(e) = runtime_clone.run_event_loop().await {
            eprintln!("Event loop error: {}", e);
        }
    });

    // 4. Send some messages
    for i in 1..=5 {
        let message = AgentMessage::TaskRequest {
            task_id: format!("task-{}", i),
            content: format!("Process item {}", i),
        };
        runtime.send_output("results", &message).await?;
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    // 5. Wait a bit for processing
    tokio::time::sleep(Duration::from_secs(5)).await;

    // 6. Stop runtime
    runtime.stop().await?;
    println!("Runtime stopped");

    // 7. Wait for event loop to finish
    event_loop_handle.await?;

    Ok(())
}

Event Processing

The runtime processes the following events:
pub enum AgentEvent {
    TaskReceived(TaskRequest),
    Shutdown,
    Custom(String, Vec<u8>),
    // ... other variants
}
Events are converted to AgentInput before execution:
let input = match event {
    AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
    AgentEvent::Custom(data, _) => AgentInput::text(data),
    _ => AgentInput::text(format!("{:?}", event)),
};

runtime.agent_mut().execute(input, &context).await?;

Interruption Handling

use mofa_runtime::interrupt::AgentInterrupt;

// Get interrupt handle
let interrupt = runtime.interrupt().clone();

// In signal handler or shutdown task
tokio::spawn(async move {
    tokio::signal::ctrl_c().await.ok();
    interrupt.trigger();
    println!("Shutdown signal sent");
});

// In event loop - automatically checks
runtime.run_event_loop().await?; // Will exit on interrupt

Multi-Agent Coordination

use mofa_runtime::MoFARuntime;

// Create multi-agent runtime
let multi_runtime = MoFARuntime::new().await;

// Register agents
let agent1_runtime = AgentBuilder::new("agent-1", "Agent 1")
    .with_output("to_agent2")
    .with_agent(agent1)
    .await?;

let agent2_runtime = AgentBuilder::new("agent-2", "Agent 2")
    .with_input("from_agent1")
    .with_output("final_output")
    .with_agent(agent2)
    .await?;

// Register with multi-runtime
multi_runtime.register_agent(agent1_runtime.node().clone(), "worker").await?;
multi_runtime.register_agent(agent2_runtime.node().clone(), "processor").await?;

// Connect agents
multi_runtime.connect_agents(
    "agent-1", "to_agent2",
    "agent-2", "from_agent1"
).await?;

// Start all
multi_runtime.build_and_start().await?;

Dora Node Configuration

The runtime uses DoraNodeConfig:
pub struct DoraNodeConfig {
    pub node_id: String,
    pub name: String,
    pub inputs: Vec<String>,
    pub outputs: Vec<String>,
    pub event_buffer_size: usize,
    pub default_timeout: Duration,
    pub custom_config: HashMap<String, String>,
}
Access via:
let node_config = runtime.node().config();
println!("Inputs: {:?}", node_config.inputs);
println!("Outputs: {:?}", node_config.outputs);

Error Handling

use mofa_runtime::dora_adapter::DoraError;

match runtime.send_output("output", &message).await {
    Ok(_) => println!("Message sent"),
    Err(DoraError::ChannelNotFound(msg)) => {
        eprintln!("Output channel not found: {}", msg);
    }
    Err(DoraError::SendFailed(msg)) => {
        eprintln!("Failed to send: {}", msg);
    }
    Err(e) => eprintln!("Dora error: {}", e),
}

Build docs developers (and LLMs) love