Skip to main content
The SimpleRuntime provides a lightweight, standalone runtime that doesn’t depend on Dora-rs. It’s ideal for simpler deployments and when distributed dataflow isn’t required.
This runtime is used when the dora feature is not enabled. For distributed multi-agent systems, use AgentRuntime.

Overview

SimpleRuntime features:
  • No external dependencies (no Dora-rs)
  • In-process message bus
  • Lightweight coordination
  • Simple pub/sub messaging
  • Plugin lifecycle management
  • Multiple agent orchestration

Single Agent Runtime

Create via AgentBuilder:
use mofa_runtime::AgentBuilder;

let mut runtime = AgentBuilder::new("agent-1", "My Agent")
    .with_plugin(Box::new(llm_plugin))
    .with_max_concurrent_tasks(20)
    .with_timeout(Duration::from_secs(60))
    .with_agent(my_agent)
    .await?;

SimpleAgentRuntime API

agent

Get immutable reference to the agent.
let agent_ref = runtime.agent();
println!("Agent: {}", agent_ref.agent_type());

agent_mut

Get mutable reference to the agent.
let agent_mut = runtime.agent_mut();

metadata

Get agent metadata.
let metadata = runtime.metadata();
println!("Agent: {} v{}", metadata.name, metadata.version.unwrap_or_default());

config

Get agent configuration.
let config = runtime.config();
println!("ID: {}", config.agent_id);

interrupt

Get interrupt handle.
let interrupt = runtime.interrupt();
interrupt.trigger(); // Trigger interruption

inputs / outputs

Get input/output port names.
let inputs = runtime.inputs();
let outputs = runtime.outputs();

println!("Inputs: {:?}", inputs);
println!("Outputs: {:?}", outputs);

max_concurrent_tasks

Get maximum concurrent tasks setting.
let max_tasks = runtime.max_concurrent_tasks();
println!("Max concurrent: {}", max_tasks);

default_timeout

Get default timeout duration.
let timeout = runtime.default_timeout();
println!("Timeout: {:?}", timeout);

Lifecycle Methods

start

Initialize and start the runtime.
runtime.start().await?;
println!("Runtime started");
This:
  1. Initializes the agent with AgentContext
  2. Initializes all plugins
  3. Sets up event channels

handle_event

Process a single event.
event
AgentEvent
Event to process
let event = AgentEvent::TaskReceived(TaskRequest {
    task_id: "task-1".to_string(),
    content: "Process this".to_string(),
});

runtime.handle_event(event).await?;

run_with_receiver

Run event loop with external event channel.
event_rx
tokio::sync::mpsc::Receiver<AgentEvent>
Event receiver channel
let (tx, rx) = tokio::sync::mpsc::channel(100);

// Send events from another task
tokio::spawn(async move {
    tx.send(AgentEvent::TaskReceived(task)).await.ok();
});

// Process events
runtime.run_with_receiver(rx).await?;

stop

Stop the runtime and shut down the agent.
runtime.stop().await?;

trigger_interrupt

Trigger an interruption.
runtime.trigger_interrupt();

Plugin Management

init_plugins

Initialize all registered plugins.
runtime.init_plugins().await?;

Complete Example

use mofa_runtime::AgentBuilder;
use mofa_kernel::message::{AgentEvent, TaskRequest};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Build runtime
    let mut runtime = AgentBuilder::new("worker", "Worker Agent")
        .with_plugin(Box::new(llm_plugin))
        .with_max_concurrent_tasks(10)
        .build_and_start(worker_agent)
        .await?;

    println!("Agent started: {}", runtime.metadata().id);

    // 2. Create event channel
    let (tx, rx) = tokio::sync::mpsc::channel(100);

    // 3. Spawn event loop
    let runtime_handle = tokio::spawn(async move {
        runtime.run_with_receiver(rx).await
    });

    // 4. Send events
    for i in 1..=5 {
        let event = AgentEvent::TaskReceived(TaskRequest {
            task_id: format!("task-{}", i),
            content: format!("Process item {}", i),
        });
        tx.send(event).await?;
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    // 5. Send shutdown
    tx.send(AgentEvent::Shutdown).await?;

    // 6. Wait for completion
    runtime_handle.await??;

    println!("Runtime finished");
    Ok(())
}

Multi-Agent Coordination

SimpleRuntime

For orchestrating multiple agents:
use mofa_runtime::SimpleRuntime;

let runtime = SimpleRuntime::new();

register_agent

Register an agent with the runtime.
metadata
AgentMetadata
Agent metadata
config
AgentConfig
Agent configuration
role
&str
Agent role identifier
return
Receiver<AgentEvent>
Event receiver for this agent
let metadata = AgentMetadata {
    id: "agent-1".to_string(),
    name: "Agent 1".to_string(),
    // ...
};

let config = AgentConfig {
    agent_id: "agent-1".to_string(),
    name: "Agent 1".to_string(),
    node_config: HashMap::new(),
};

let rx = runtime.register_agent(metadata, config, "worker").await?;

message_bus

Get reference to the message bus.
let bus = runtime.message_bus();

send_to_agent

Send event to specific agent.
target_id
&str
Target agent ID
event
AgentEvent
Event to send
let event = AgentEvent::TaskReceived(task);
runtime.send_to_agent("agent-2", event).await?;

broadcast

Broadcast event to all agents.
let event = AgentEvent::Custom(
    "system_update".to_string(),
    vec![],
);
runtime.broadcast(event).await?;

publish_to_topic

Publish event to a topic.
topic
&str
Topic name
event
AgentEvent
Event to publish
runtime.publish_to_topic("tasks", event).await?;

subscribe_topic

Subscribe agent to a topic.
agent_id
&str
Agent ID to subscribe
topic
&str
Topic name
runtime.subscribe_topic("agent-1", "tasks").await?;
runtime.subscribe_topic("agent-2", "tasks").await?;

get_agents_by_role

Get agent IDs by role.
role
&str
Role identifier
let workers = runtime.get_agents_by_role("worker").await;
println!("Workers: {:?}", workers);

stop_all

Stop all registered agents.
runtime.stop_all().await?;

Multi-Agent Example

use mofa_runtime::SimpleRuntime;
use mofa_kernel::message::{AgentEvent, TaskRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let runtime = SimpleRuntime::new();

    // Register multiple agents
    let rx1 = runtime.register_agent(
        create_metadata("worker-1"),
        create_config("worker-1"),
        "worker"
    ).await?;

    let rx2 = runtime.register_agent(
        create_metadata("worker-2"),
        create_config("worker-2"),
        "worker"
    ).await?;

    let rx_manager = runtime.register_agent(
        create_metadata("manager"),
        create_config("manager"),
        "manager"
    ).await?;

    // Subscribe workers to tasks topic
    runtime.subscribe_topic("worker-1", "tasks").await?;
    runtime.subscribe_topic("worker-2", "tasks").await?;

    // Spawn agent event loops
    tokio::spawn(run_agent_loop("worker-1", rx1));
    tokio::spawn(run_agent_loop("worker-2", rx2));
    tokio::spawn(run_agent_loop("manager", rx_manager));

    // Publish tasks to topic
    for i in 1..=10 {
        let event = AgentEvent::TaskReceived(TaskRequest {
            task_id: format!("task-{}", i),
            content: format!("Process {}", i),
        });
        runtime.publish_to_topic("tasks", event).await?;
    }

    // Send direct message to manager
    runtime.send_to_agent(
        "manager",
        AgentEvent::Custom("status_request".to_string(), vec![])
    ).await?;

    // Wait for processing
    tokio::time::sleep(Duration::from_secs(5)).await;

    // Broadcast shutdown
    runtime.broadcast(AgentEvent::Shutdown).await?;

    // Stop all
    runtime.stop_all().await?;

    Ok(())
}

async fn run_agent_loop(
    id: &str,
    mut rx: tokio::sync::mpsc::Receiver<AgentEvent>,
) {
    while let Some(event) = rx.recv().await {
        match event {
            AgentEvent::Shutdown => break,
            AgentEvent::TaskReceived(task) => {
                println!("[{}] Processing {}", id, task.task_id);
                // Process task...
            }
            _ => {}
        }
    }
    println!("[{}] Stopped", id);
}

Message Bus

The SimpleMessageBus handles all message routing:
pub struct SimpleMessageBus {
    // Internal subscribers and topic mappings
}

impl SimpleMessageBus {
    pub async fn register(&self, agent_id: &str, tx: Sender<AgentEvent>);
    pub async fn subscribe(&self, agent_id: &str, topic: &str);
    pub async fn send_to(&self, target_id: &str, event: AgentEvent) -> GlobalResult<()>;
    pub async fn broadcast(&self, event: AgentEvent) -> GlobalResult<()>;
    pub async fn publish(&self, topic: &str, event: AgentEvent) -> GlobalResult<()>;
}

Communication Patterns

Point-to-Point

runtime.send_to_agent("receiver", event).await?;

Broadcast

runtime.broadcast(event).await?;

Pub/Sub

// Subscribe
runtime.subscribe_topic("agent-1", "notifications").await?;
runtime.subscribe_topic("agent-2", "notifications").await?;

// Publish
runtime.publish_to_topic("notifications", event).await?;

Event Types

pub enum AgentEvent {
    TaskReceived(TaskRequest),
    Shutdown,
    Custom(String, Vec<u8>),
    // Other variants...
}

pub struct TaskRequest {
    pub task_id: String,
    pub content: String,
}

Error Handling

use mofa_kernel::agent::error::GlobalError;

match runtime.send_to_agent("agent-1", event).await {
    Ok(_) => println!("Sent successfully"),
    Err(GlobalError::AgentNotFound(id)) => {
        eprintln!("Agent not found: {}", id);
    }
    Err(e) => eprintln!("Error: {}", e),
}

Performance Considerations

  • Uses in-memory channels (no network overhead)
  • Lock-free message passing where possible
  • Bounded channels to prevent memory growth
  • No external runtime dependencies

When to Use SimpleRuntime

Use SimpleRuntime when:
  • Running a single agent or small number of agents
  • Don’t need distributed execution
  • Want minimal dependencies
  • Running in resource-constrained environments
  • Prototyping or development
Use AgentRuntime (Dora) when:
  • Need distributed multi-agent coordination
  • Require complex dataflow graphs
  • Need cross-process or cross-machine communication
  • Building large-scale multi-agent systems

Build docs developers (and LLMs) love