The MessageBus provides communication infrastructure for agents to send messages using point-to-point, broadcast, and pub/sub patterns.
Overview
MoFA provides two message bus implementations:
- AgentBus - Kernel-level bus with serialization
- SimpleMessageBus - Runtime-level bus for SimpleRuntime
AgentBus (Kernel)
The AgentBus from mofa-kernel provides low-level message routing with serialization support.
Creating a Bus
use mofa_kernel::bus::AgentBus;
let bus = AgentBus::new();
Communication Modes
pub enum CommunicationMode {
PointToPoint(String), // Target agent ID
Broadcast, // All agents
PubSub(String), // Topic name
}
register_channel
Register a communication channel for an agent.
Communication mode to register
use mofa_kernel::bus::CommunicationMode;
// Register for point-to-point
bus.register_channel(
&agent_metadata,
CommunicationMode::PointToPoint("sender-id".to_string())
).await?;
// Register for pub/sub
bus.register_channel(
&agent_metadata,
CommunicationMode::PubSub("events".to_string())
).await?;
// Broadcast doesn't need registration
send_message
Send a message through the bus.
use mofa_kernel::message::AgentMessage;
// Point-to-point
let message = AgentMessage::TaskRequest {
task_id: "task-1".to_string(),
content: "Process this".to_string(),
};
bus.send_message(
"sender-1",
CommunicationMode::PointToPoint("receiver-1".to_string()),
&message
).await?;
// Broadcast
bus.send_message(
"sender-1",
CommunicationMode::Broadcast,
&message
).await?;
// Pub/sub
bus.send_message(
"sender-1",
CommunicationMode::PubSub("tasks".to_string()),
&message
).await?;
receive_message
Receive a message from the bus.
Communication mode to receive from
// Point-to-point receive
if let Some(message) = bus.receive_message(
"receiver-1",
CommunicationMode::PointToPoint("sender-1".to_string())
).await? {
println!("Received: {:?}", message);
}
// Broadcast receive
if let Some(message) = bus.receive_message(
"receiver-1",
CommunicationMode::Broadcast
).await? {
println!("Broadcast: {:?}", message);
}
// Topic receive
if let Some(message) = bus.receive_message(
"subscriber-1",
CommunicationMode::PubSub("tasks".to_string())
).await? {
println!("Topic message: {:?}", message);
}
unsubscribe_topic
Unsubscribe from a topic.
bus.unsubscribe_topic("subscriber-1", "tasks").await?;
AgentBus Example
use mofa_kernel::bus::{AgentBus, CommunicationMode};
use mofa_kernel::message::AgentMessage;
use mofa_kernel::agent::{AgentMetadata, AgentCapabilities, AgentState};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bus = AgentBus::new();
// Create agent metadata
let sender_meta = AgentMetadata {
id: "sender".to_string(),
name: "Sender Agent".to_string(),
description: None,
version: None,
capabilities: AgentCapabilities::default(),
state: AgentState::Ready,
};
let receiver_meta = AgentMetadata {
id: "receiver".to_string(),
name: "Receiver Agent".to_string(),
description: None,
version: None,
capabilities: AgentCapabilities::default(),
state: AgentState::Ready,
};
// Register receiver for point-to-point
bus.register_channel(
&receiver_meta,
CommunicationMode::PointToPoint("sender".to_string())
).await?;
// Send message
let message = AgentMessage::TaskRequest {
task_id: "task-1".to_string(),
content: "Hello!".to_string(),
};
bus.send_message(
"sender",
CommunicationMode::PointToPoint("receiver".to_string()),
&message
).await?;
// Receive message
if let Some(received) = bus.receive_message(
"receiver",
CommunicationMode::PointToPoint("sender".to_string())
).await? {
println!("Received: {:?}", received);
}
Ok(())
}
SimpleMessageBus (Runtime)
The SimpleMessageBus is used by SimpleRuntime and works with AgentEvent.
Creating a Bus
use mofa_runtime::SimpleMessageBus;
let bus = SimpleMessageBus::new();
register
Register an agent with event sender.
let (tx, rx) = tokio::sync::mpsc::channel(100);
bus.register("agent-1", tx).await;
subscribe
Subscribe agent to a topic.
bus.subscribe("agent-1", "notifications").await;
send_to
Send event to specific agent.
let event = AgentEvent::TaskReceived(task);
bus.send_to("agent-2", event).await?;
broadcast
Broadcast event to all registered agents.
let event = AgentEvent::Shutdown;
bus.broadcast(event).await?;
publish
Publish event to topic subscribers.
let event = AgentEvent::Custom("update".to_string(), vec![]);
bus.publish("notifications", event).await?;
SimpleMessageBus Example
use mofa_runtime::SimpleMessageBus;
use mofa_kernel::message::{AgentEvent, TaskRequest};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bus = Arc::new(SimpleMessageBus::new());
// Create agents
let (tx1, mut rx1) = tokio::sync::mpsc::channel(100);
let (tx2, mut rx2) = tokio::sync::mpsc::channel(100);
let (tx3, mut rx3) = tokio::sync::mpsc::channel(100);
// Register agents
bus.register("agent-1", tx1).await;
bus.register("agent-2", tx2).await;
bus.register("agent-3", tx3).await;
// Subscribe to topics
bus.subscribe("agent-1", "tasks").await;
bus.subscribe("agent-2", "tasks").await;
bus.subscribe("agent-3", "notifications").await;
// Spawn receivers
tokio::spawn(async move {
while let Some(event) = rx1.recv().await {
println!("[Agent 1] Received: {:?}", event);
}
});
tokio::spawn(async move {
while let Some(event) = rx2.recv().await {
println!("[Agent 2] Received: {:?}", event);
}
});
tokio::spawn(async move {
while let Some(event) = rx3.recv().await {
println!("[Agent 3] Received: {:?}", event);
}
});
// Point-to-point
bus.send_to(
"agent-1",
AgentEvent::Custom("direct".to_string(), vec![])
).await?;
// Publish to topic (agent-1 and agent-2 receive)
let task_event = AgentEvent::TaskReceived(TaskRequest {
task_id: "task-1".to_string(),
content: "Work".to_string(),
});
bus.publish("tasks", task_event).await?;
// Broadcast (all agents receive)
bus.broadcast(AgentEvent::Shutdown).await?;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
}
Message Serialization
AgentBus uses bincode for message serialization:
// Serialization happens automatically
let message_bytes = bincode::serialize(&message)?;
// Deserialization happens automatically
let message: AgentMessage = bincode::deserialize(&bytes)?;
Error Handling
use mofa_kernel::bus::BusError;
match bus.send_message(sender_id, mode, &message).await {
Ok(_) => println!("Sent"),
Err(BusError::AgentNotRegistered(id)) => {
eprintln!("Agent {} not registered", id);
}
Err(BusError::ChannelNotFound(msg)) => {
eprintln!("Channel error: {}", msg);
}
Err(BusError::SendFailed(msg)) => {
eprintln!("Send failed: {}", msg);
}
Err(BusError::Serialization(msg)) => {
eprintln!("Serialization error: {}", msg);
}
}
Best Practices
Channel Sizing
// Choose appropriate buffer size
let (tx, rx) = tokio::sync::mpsc::channel(1000); // Large buffer
let (tx, rx) = tokio::sync::mpsc::channel(10); // Small buffer
Error Recovery
// Retry on failure
for attempt in 1..=3 {
match bus.send_message(sender, mode, &message).await {
Ok(_) => break,
Err(e) if attempt < 3 => {
eprintln!("Attempt {} failed: {}", attempt, e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => return Err(e.into()),
}
}
Topic Management
// Subscribe
bus.subscribe("agent-1", "events").await;
// Use topic
bus.publish("events", event).await?;
// Cleanup when done
bus.unsubscribe_topic("agent-1", "events").await?;
- AgentBus: Adds serialization overhead, suitable for cross-process communication
- SimpleMessageBus: Zero-copy in-process messaging, faster for single-process
- Use bounded channels to prevent memory issues
- Consider batch operations for high-throughput scenarios