The SimpleRuntime provides a lightweight, standalone runtime that doesn’t depend on Dora-rs. It’s ideal for simpler deployments and when distributed dataflow isn’t required.
This runtime is used when the dora feature is not enabled. For distributed multi-agent systems, use AgentRuntime.
Overview
SimpleRuntime features:
- No external dependencies (no Dora-rs)
- In-process message bus
- Lightweight coordination
- Simple pub/sub messaging
- Plugin lifecycle management
- Multiple agent orchestration
Single Agent Runtime
Create via AgentBuilder:
use mofa_runtime::AgentBuilder;
let mut runtime = AgentBuilder::new("agent-1", "My Agent")
.with_plugin(Box::new(llm_plugin))
.with_max_concurrent_tasks(20)
.with_timeout(Duration::from_secs(60))
.with_agent(my_agent)
.await?;
SimpleAgentRuntime API
agent
Get immutable reference to the agent.
let agent_ref = runtime.agent();
println!("Agent: {}", agent_ref.agent_type());
agent_mut
Get mutable reference to the agent.
let agent_mut = runtime.agent_mut();
Get agent metadata.
let metadata = runtime.metadata();
println!("Agent: {} v{}", metadata.name, metadata.version.unwrap_or_default());
config
Get agent configuration.
let config = runtime.config();
println!("ID: {}", config.agent_id);
interrupt
Get interrupt handle.
let interrupt = runtime.interrupt();
interrupt.trigger(); // Trigger interruption
Get input/output port names.
let inputs = runtime.inputs();
let outputs = runtime.outputs();
println!("Inputs: {:?}", inputs);
println!("Outputs: {:?}", outputs);
max_concurrent_tasks
Get maximum concurrent tasks setting.
let max_tasks = runtime.max_concurrent_tasks();
println!("Max concurrent: {}", max_tasks);
default_timeout
Get default timeout duration.
let timeout = runtime.default_timeout();
println!("Timeout: {:?}", timeout);
Lifecycle Methods
start
Initialize and start the runtime.
runtime.start().await?;
println!("Runtime started");
This:
- Initializes the agent with
AgentContext
- Initializes all plugins
- Sets up event channels
handle_event
Process a single event.
let event = AgentEvent::TaskReceived(TaskRequest {
task_id: "task-1".to_string(),
content: "Process this".to_string(),
});
runtime.handle_event(event).await?;
run_with_receiver
Run event loop with external event channel.
event_rx
tokio::sync::mpsc::Receiver<AgentEvent>
Event receiver channel
let (tx, rx) = tokio::sync::mpsc::channel(100);
// Send events from another task
tokio::spawn(async move {
tx.send(AgentEvent::TaskReceived(task)).await.ok();
});
// Process events
runtime.run_with_receiver(rx).await?;
stop
Stop the runtime and shut down the agent.
trigger_interrupt
Trigger an interruption.
runtime.trigger_interrupt();
Plugin Management
init_plugins
Initialize all registered plugins.
runtime.init_plugins().await?;
Complete Example
use mofa_runtime::AgentBuilder;
use mofa_kernel::message::{AgentEvent, TaskRequest};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Build runtime
let mut runtime = AgentBuilder::new("worker", "Worker Agent")
.with_plugin(Box::new(llm_plugin))
.with_max_concurrent_tasks(10)
.build_and_start(worker_agent)
.await?;
println!("Agent started: {}", runtime.metadata().id);
// 2. Create event channel
let (tx, rx) = tokio::sync::mpsc::channel(100);
// 3. Spawn event loop
let runtime_handle = tokio::spawn(async move {
runtime.run_with_receiver(rx).await
});
// 4. Send events
for i in 1..=5 {
let event = AgentEvent::TaskReceived(TaskRequest {
task_id: format!("task-{}", i),
content: format!("Process item {}", i),
});
tx.send(event).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// 5. Send shutdown
tx.send(AgentEvent::Shutdown).await?;
// 6. Wait for completion
runtime_handle.await??;
println!("Runtime finished");
Ok(())
}
Multi-Agent Coordination
SimpleRuntime
For orchestrating multiple agents:
use mofa_runtime::SimpleRuntime;
let runtime = SimpleRuntime::new();
register_agent
Register an agent with the runtime.
Event receiver for this agent
let metadata = AgentMetadata {
id: "agent-1".to_string(),
name: "Agent 1".to_string(),
// ...
};
let config = AgentConfig {
agent_id: "agent-1".to_string(),
name: "Agent 1".to_string(),
node_config: HashMap::new(),
};
let rx = runtime.register_agent(metadata, config, "worker").await?;
message_bus
Get reference to the message bus.
let bus = runtime.message_bus();
send_to_agent
Send event to specific agent.
let event = AgentEvent::TaskReceived(task);
runtime.send_to_agent("agent-2", event).await?;
broadcast
Broadcast event to all agents.
let event = AgentEvent::Custom(
"system_update".to_string(),
vec![],
);
runtime.broadcast(event).await?;
publish_to_topic
Publish event to a topic.
runtime.publish_to_topic("tasks", event).await?;
subscribe_topic
Subscribe agent to a topic.
runtime.subscribe_topic("agent-1", "tasks").await?;
runtime.subscribe_topic("agent-2", "tasks").await?;
get_agents_by_role
Get agent IDs by role.
let workers = runtime.get_agents_by_role("worker").await;
println!("Workers: {:?}", workers);
stop_all
Stop all registered agents.
runtime.stop_all().await?;
Multi-Agent Example
use mofa_runtime::SimpleRuntime;
use mofa_kernel::message::{AgentEvent, TaskRequest};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = SimpleRuntime::new();
// Register multiple agents
let rx1 = runtime.register_agent(
create_metadata("worker-1"),
create_config("worker-1"),
"worker"
).await?;
let rx2 = runtime.register_agent(
create_metadata("worker-2"),
create_config("worker-2"),
"worker"
).await?;
let rx_manager = runtime.register_agent(
create_metadata("manager"),
create_config("manager"),
"manager"
).await?;
// Subscribe workers to tasks topic
runtime.subscribe_topic("worker-1", "tasks").await?;
runtime.subscribe_topic("worker-2", "tasks").await?;
// Spawn agent event loops
tokio::spawn(run_agent_loop("worker-1", rx1));
tokio::spawn(run_agent_loop("worker-2", rx2));
tokio::spawn(run_agent_loop("manager", rx_manager));
// Publish tasks to topic
for i in 1..=10 {
let event = AgentEvent::TaskReceived(TaskRequest {
task_id: format!("task-{}", i),
content: format!("Process {}", i),
});
runtime.publish_to_topic("tasks", event).await?;
}
// Send direct message to manager
runtime.send_to_agent(
"manager",
AgentEvent::Custom("status_request".to_string(), vec![])
).await?;
// Wait for processing
tokio::time::sleep(Duration::from_secs(5)).await;
// Broadcast shutdown
runtime.broadcast(AgentEvent::Shutdown).await?;
// Stop all
runtime.stop_all().await?;
Ok(())
}
async fn run_agent_loop(
id: &str,
mut rx: tokio::sync::mpsc::Receiver<AgentEvent>,
) {
while let Some(event) = rx.recv().await {
match event {
AgentEvent::Shutdown => break,
AgentEvent::TaskReceived(task) => {
println!("[{}] Processing {}", id, task.task_id);
// Process task...
}
_ => {}
}
}
println!("[{}] Stopped", id);
}
Message Bus
The SimpleMessageBus handles all message routing:
pub struct SimpleMessageBus {
// Internal subscribers and topic mappings
}
impl SimpleMessageBus {
pub async fn register(&self, agent_id: &str, tx: Sender<AgentEvent>);
pub async fn subscribe(&self, agent_id: &str, topic: &str);
pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> GlobalResult<()>;
pub async fn broadcast(&self, event: AgentEvent) -> GlobalResult<()>;
pub async fn publish(&self, topic: &str, event: AgentEvent) -> GlobalResult<()>;
}
Communication Patterns
Point-to-Point
runtime.send_to_agent("receiver", event).await?;
Broadcast
runtime.broadcast(event).await?;
Pub/Sub
// Subscribe
runtime.subscribe_topic("agent-1", "notifications").await?;
runtime.subscribe_topic("agent-2", "notifications").await?;
// Publish
runtime.publish_to_topic("notifications", event).await?;
Event Types
pub enum AgentEvent {
TaskReceived(TaskRequest),
Shutdown,
Custom(String, Vec<u8>),
// Other variants...
}
pub struct TaskRequest {
pub task_id: String,
pub content: String,
}
Error Handling
use mofa_kernel::agent::error::GlobalError;
match runtime.send_to_agent("agent-1", event).await {
Ok(_) => println!("Sent successfully"),
Err(GlobalError::AgentNotFound(id)) => {
eprintln!("Agent not found: {}", id);
}
Err(e) => eprintln!("Error: {}", e),
}
- Uses in-memory channels (no network overhead)
- Lock-free message passing where possible
- Bounded channels to prevent memory growth
- No external runtime dependencies
When to Use SimpleRuntime
Use SimpleRuntime when:
- Running a single agent or small number of agents
- Don’t need distributed execution
- Want minimal dependencies
- Running in resource-constrained environments
- Prototyping or development
Use AgentRuntime (Dora) when:
- Need distributed multi-agent coordination
- Require complex dataflow graphs
- Need cross-process or cross-machine communication
- Building large-scale multi-agent systems