The AgentRuntime (Dora mode) provides a distributed runtime for agents using the Dora-rs dataflow engine for multi-agent coordination and message passing.
This runtime is only available when the dora feature is enabled. For standalone agents, use SimpleRuntime.
Overview
AgentRuntime features:
- Distributed dataflow execution via Dora-rs
- Inter-agent message routing
- Event-driven architecture
- Plugin lifecycle management
- Graceful interruption handling
Creating a Runtime
Create via AgentBuilder:
use mofa_runtime::AgentBuilder;
let runtime = AgentBuilder::new("agent-1", "My Agent")
.with_input("task_input")
.with_output("task_output")
.with_plugin(Box::new(llm_plugin))
.with_agent(my_agent)
.await?;
Runtime API
agent
Get immutable reference to the agent.
let agent_ref = runtime.agent();
println!("Agent type: {}", agent_ref.agent_type());
agent_mut
Get mutable reference to the agent.
let agent_mut = runtime.agent_mut();
agent_mut.update_config(new_config)?;
node
Get reference to the Dora node.
let node = runtime.node();
println!("Node ID: {}", node.config().node_id);
Get agent metadata.
let metadata = runtime.metadata();
println!("Agent: {} ({})", metadata.name, metadata.id);
println!("State: {:?}", metadata.state);
println!("Capabilities: {:?}", metadata.capabilities);
config
Get agent configuration.
let config = runtime.config();
println!("Agent ID: {}", config.agent_id);
println!("Custom config: {:?}", config.node_config);
interrupt
Get the interrupt handle.
let interrupt = runtime.interrupt();
// In another task:
interrupt.trigger();
Lifecycle Methods
start
Initialize and start the runtime.
runtime.start().await?;
println!("Runtime started");
run_event_loop
Run the main event processing loop.
// Blocks until shutdown event or interruption
runtime.run_event_loop().await?;
The event loop:
- Initializes the agent with
AgentContext
- Initializes all plugins
- Processes events from the Dora node
- Checks for interruption signals
- Executes agent for each event
- Handles shutdown gracefully
stop
Stop the runtime and shut down the agent.
runtime.stop().await?;
println!("Runtime stopped");
Message Operations
send_output
Send a message to an output port.
Output port name (configured via with_output)
let message = AgentMessage::TaskResponse {
task_id: "task-1".to_string(),
result: "Processing complete".to_string(),
};
runtime.send_output("task_output", &message).await?;
inject_event
Inject an event into the runtime for processing.
let event = AgentEvent::TaskReceived(TaskRequest {
task_id: "task-2".to_string(),
content: "New task".to_string(),
});
runtime.inject_event(event).await?;
Plugin Management
init_plugins
Initialize all registered plugins.
runtime.init_plugins().await?;
This is automatically called during run_event_loop, but can be called manually if needed.
Complete Example
use mofa_runtime::AgentBuilder;
use mofa_kernel::message::{AgentEvent, AgentMessage, TaskRequest};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Build runtime
let mut runtime = AgentBuilder::new("worker", "Worker Agent")
.with_input("tasks")
.with_output("results")
.with_max_concurrent_tasks(10)
.with_plugin(Box::new(llm_plugin))
.with_agent(worker_agent)
.await?;
// 2. Start runtime
runtime.start().await?;
println!("Agent {} started", runtime.metadata().id);
// 3. Spawn event loop in background
let runtime_clone = runtime.clone();
let event_loop_handle = tokio::spawn(async move {
if let Err(e) = runtime_clone.run_event_loop().await {
eprintln!("Event loop error: {}", e);
}
});
// 4. Send some messages
for i in 1..=5 {
let message = AgentMessage::TaskRequest {
task_id: format!("task-{}", i),
content: format!("Process item {}", i),
};
runtime.send_output("results", &message).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// 5. Wait a bit for processing
tokio::time::sleep(Duration::from_secs(5)).await;
// 6. Stop runtime
runtime.stop().await?;
println!("Runtime stopped");
// 7. Wait for event loop to finish
event_loop_handle.await?;
Ok(())
}
Event Processing
The runtime processes the following events:
pub enum AgentEvent {
TaskReceived(TaskRequest),
Shutdown,
Custom(String, Vec<u8>),
// ... other variants
}
Events are converted to AgentInput before execution:
let input = match event {
AgentEvent::TaskReceived(task) => AgentInput::text(task.content),
AgentEvent::Custom(data, _) => AgentInput::text(data),
_ => AgentInput::text(format!("{:?}", event)),
};
runtime.agent_mut().execute(input, &context).await?;
Interruption Handling
use mofa_runtime::interrupt::AgentInterrupt;
// Get interrupt handle
let interrupt = runtime.interrupt().clone();
// In signal handler or shutdown task
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
interrupt.trigger();
println!("Shutdown signal sent");
});
// In event loop - automatically checks
runtime.run_event_loop().await?; // Will exit on interrupt
Multi-Agent Coordination
use mofa_runtime::MoFARuntime;
// Create multi-agent runtime
let multi_runtime = MoFARuntime::new().await;
// Register agents
let agent1_runtime = AgentBuilder::new("agent-1", "Agent 1")
.with_output("to_agent2")
.with_agent(agent1)
.await?;
let agent2_runtime = AgentBuilder::new("agent-2", "Agent 2")
.with_input("from_agent1")
.with_output("final_output")
.with_agent(agent2)
.await?;
// Register with multi-runtime
multi_runtime.register_agent(agent1_runtime.node().clone(), "worker").await?;
multi_runtime.register_agent(agent2_runtime.node().clone(), "processor").await?;
// Connect agents
multi_runtime.connect_agents(
"agent-1", "to_agent2",
"agent-2", "from_agent1"
).await?;
// Start all
multi_runtime.build_and_start().await?;
Dora Node Configuration
The runtime uses DoraNodeConfig:
pub struct DoraNodeConfig {
pub node_id: String,
pub name: String,
pub inputs: Vec<String>,
pub outputs: Vec<String>,
pub event_buffer_size: usize,
pub default_timeout: Duration,
pub custom_config: HashMap<String, String>,
}
Access via:
let node_config = runtime.node().config();
println!("Inputs: {:?}", node_config.inputs);
println!("Outputs: {:?}", node_config.outputs);
Error Handling
use mofa_runtime::dora_adapter::DoraError;
match runtime.send_output("output", &message).await {
Ok(_) => println!("Message sent"),
Err(DoraError::ChannelNotFound(msg)) => {
eprintln!("Output channel not found: {}", msg);
}
Err(DoraError::SendFailed(msg)) => {
eprintln!("Failed to send: {}", msg);
}
Err(e) => eprintln!("Dora error: {}", e),
}