Skip to main content

Overview

DefaultEventBus is the synchronous implementation of the EventBus trait. Events are queued when published and processed in batch when drain() is called. Location: crates/oneclaw-core/src/event_bus/bus.rs:19

Design Pattern

The drain pattern separates event publishing from event processing:
  1. Publish: Events are added to a queue
  2. Drain: All pending events are dispatched to matching handlers
  3. History: Processed events are stored in a ring buffer
This synchronous design fits the current OneClaw Runtime event loop.

Usage

Basic Example

use oneclaw_core::event_bus::{DefaultEventBus, Event, EventBus};

let bus = DefaultEventBus::new();

// Subscribe to sensor events
bus.subscribe("sensor.*", Box::new(|event| {
    println!("Sensor reading: {:?}", event.data);
    None
}))?;

// Publish events (queued, not processed yet)
bus.publish(Event::new("sensor.temperature", "sensor-1")
    .with_data("value", "42.5"))?;

assert_eq!(bus.pending_count(), 1);

// Process all pending events
let processed = bus.drain()?;
assert_eq!(processed, 1);
assert_eq!(bus.pending_count(), 0);

Configuration

Location: crates/oneclaw-core/src/event_bus/bus.rs:32
let bus = DefaultEventBus::new()
    .with_max_history(200);  // Default is 100

The Drain Pattern

Location: crates/oneclaw-core/src/event_bus/bus.rs:106

How Drain Works

  1. Take events: Atomically drain the queue
  2. Match handlers: Find subscriptions matching each event’s topic
  3. Execute handlers: Run handler callbacks, collecting response events
  4. Store history: Add processed events to ring buffer
  5. Publish responses: Queue any events returned by handlers

Response Events

Handlers can generate new events by returning Some(Event):
bus.subscribe("sensor.*", Box::new(|event| {
    if let Some(value) = event.data.get("value")
        && value.parse::<f64>().unwrap_or(0.0) > 100.0
    {
        // Generate alert event
        return Some(Event::new("alert.threshold", "pipeline")
            .with_data("device", event.data.get("device").cloned().unwrap_or_default())
            .with_priority(EventPriority::Critical));
    }
    None
}))?;
Response events are queued and processed on the next drain() call.

Topic Matching

Location: crates/oneclaw-core/src/event_bus/bus.rs:49
// Wildcard - matches everything
bus.subscribe("*", handler)?;

// Prefix with .* - matches "sensor.temperature", "sensor.humidity"
bus.subscribe("sensor.*", handler)?;

// Prefix with * - matches "sensor.temp", "sensor.temperature"
bus.subscribe("sensor*", handler)?;

// Exact match - only "sensor.temp"
bus.subscribe("sensor.temp", handler)?;

Event History

Location: crates/oneclaw-core/src/event_bus/bus.rs:160 Processed events are stored in a ring buffer for debugging:
// Get last 10 events
let recent = bus.recent_events(10)?;
for event in recent {
    println!("{}: {} from {}", event.timestamp, event.topic, event.source);
}
History is limited by max_history (default 100). Oldest events are removed when capacity is reached.

Synchronous Event Processing

When to Use DefaultEventBus

  • Control flow: Need explicit control over when events are processed
  • Testing: Predictable event processing in tests
  • Batch operations: Process events in batches for efficiency
  • Event loop integration: Fits well with tick-based runtime loops

Runtime Integration

Typical integration with OneClaw Runtime:
loop {
    // 1. Execute commands, publish events
    runtime.execute()?;
    
    // 2. Process all events at end of tick
    runtime.event_bus.drain()?;
    
    // 3. Sleep or wait for next tick
    std::thread::sleep(tick_duration);
}

Thread Safety

Location: crates/oneclaw-core/src/event_bus/bus.rs:19 All internal state uses Mutex for thread-safe access:
struct DefaultEventBus {
    queue: Mutex<Vec<Event>>,
    subscriptions: Mutex<Vec<Subscription>>,
    history: Mutex<Vec<Event>>,
    max_history: usize,
}
Mutex poisoning is handled gracefully with recovery.

Use Cases

Device Event Processing

// Collect sensor readings
for sensor in sensors {
    let reading = sensor.read()?;
    bus.publish(Event::new("sensor.reading", sensor.id())
        .with_data("value", reading.to_string()))?;
}

// Process all readings in batch
bus.drain()?;

Alert Generation

bus.subscribe("sensor.*", Box::new(|event| {
    if is_threshold_violated(event) {
        Some(Event::new("alert.critical", "monitor")
            .with_data("reason", "threshold_exceeded")
            .with_priority(EventPriority::Critical))
    } else {
        None
    }
}))?;

Performance Considerations

  • Queue overhead: Events are stored in a Vec, cheap to append
  • Drain cost: O(events × subscriptions × pattern_match)
  • History limit: Keep max_history reasonable (default 100)
  • Handler efficiency: Avoid blocking operations in handlers

Build docs developers (and LLMs) love