Skip to main content

Overview

The EventBus trait defines the interface for OneClaw’s Layer 3 event system, providing publish-subscribe functionality with topic-based routing and event history. Location: crates/oneclaw-core/src/event_bus/traits.rs

Event Structure

Event

Events flowing through the bus contain:
pub struct Event {
    pub id: String,              // Unique event ID (UUID)
    pub topic: String,           // "sensor.temperature", "device.status"
    pub data: HashMap<String, String>,  // Flexible key-value payload
    pub source: String,          // Source of the event
    pub priority: EventPriority, // Low, Normal, High, Critical
    pub timestamp: DateTime<Utc>, // When event was created
}

Event Priority

pub enum EventPriority {
    Low,
    Normal,    // default
    High,
    Critical,
}

EventBus Trait

Location: crates/oneclaw-core/src/event_bus/traits.rs:71
pub trait EventBus: Send + Sync {
    fn publish(&self, event: Event) -> Result<()>;
    fn subscribe(&self, topic_pattern: &str, handler: EventHandler) -> Result<String>;
    fn unsubscribe(&self, subscription_id: &str) -> Result<bool>;
    fn pending_count(&self) -> usize;
    fn drain(&self) -> Result<usize>;
    fn recent_events(&self, limit: usize) -> Result<Vec<Event>>;
}

Methods

publish()

Publish an event to the bus.
bus.publish(Event::new("sensor.temperature", "sensor-1")
    .with_data("value", "42.5")
    .with_priority(EventPriority::High))?;

subscribe()

Subscribe a handler to a topic pattern. Returns a subscription ID. Pattern matching:
  • "*" - matches all events
  • "sensor.*" - matches “sensor.temperature”, “sensor.humidity”
  • "sensor.temp" - exact match
let sub_id = bus.subscribe("sensor.*", Box::new(|event| {
    println!("Sensor event: {}", event.topic);
    None  // No response event
}))?;
Handler type:
pub type EventHandler = Box<dyn Fn(&Event) -> Option<Event> + Send + Sync>;
Handlers can return Some(Event) to publish a response event.

unsubscribe()

Remove a subscription by ID. Returns true if subscription was found and removed.
bus.unsubscribe(&sub_id)?;

pending_count()

Get the number of pending events in the queue (for synchronous implementations).
let count = bus.pending_count();

drain()

Process all pending events (synchronous implementations). Returns number of events processed.
let processed = bus.drain()?;

recent_events()

Get recent event history for debugging.
let events = bus.recent_events(10)?;  // Last 10 events

Event Builder

Location: crates/oneclaw-core/src/event_bus/traits.rs:40-64
let event = Event::new("sensor.temperature", "sensor-1")
    .with_data("value", "42.5")
    .with_data("unit", "celsius")
    .with_priority(EventPriority::High);

NoopEventBus

Location: crates/oneclaw-core/src/event_bus/traits.rs:92 A no-operation implementation for testing without event infrastructure.
let bus = NoopEventBus::new();
// All operations succeed but do nothing
bus.publish(event)?;  // OK, ignored
assert_eq!(bus.pending_count(), 0);
assert_eq!(bus.drain()?, 0);

Use Cases

  • Sensor data routing: Temperature, motion, humidity events
  • Device status monitoring: Connection, health, error events
  • Alert generation: Critical threshold violations
  • System telemetry: Performance, diagnostics

Build docs developers (and LLMs) love