Skip to main content

Event Bus Usage

OneClaw’s event bus provides pub/sub messaging for reactive event processing. Two implementations are available: DefaultEventBus (synchronous, drain-based) and AsyncEventBus (real-time, tokio-based).

Event Bus Overview

The event bus acts as the “nervous system” of OneClaw, enabling:
  • Decoupled components: Publishers don’t know about subscribers
  • Event pipelines: Chain handlers to create processing workflows
  • Real-time reactions: Respond to sensor data, alerts, state changes
  • Event history: Replay recent events for debugging

Event Structure

use oneclaw_core::event_bus::traits::{Event, EventPriority};
use std::collections::HashMap;

let event = Event::new("sensor.temperature", "device_01")
    .with_data("value", "42.5")
    .with_data("unit", "celsius")
    .with_priority(EventPriority::High);

println!("Event ID: {}", event.id);
println!("Topic: {}", event.topic);
println!("Data: {:?}", event.data);
Event fields:
pub struct Event {
    /// Unique event ID
    pub id: String,
    /// Topic/channel: "sensor.temperature", "device.status", "alert.critical"
    pub topic: String,
    /// Event payload as key-value pairs (flexible schema)
    pub data: HashMap<String, String>,
    /// Source of the event
    pub source: String,
    /// Priority level
    pub priority: EventPriority,
    /// When the event was created
    pub timestamp: DateTime<Utc>,
}

DefaultEventBus (Sync)

The default, drain-based event bus for synchronous event processing.

Creating the Bus

use oneclaw_core::event_bus::{
    bus::DefaultEventBus,
    traits::EventBus,
};

let bus = DefaultEventBus::new();

// With custom history size
let bus = DefaultEventBus::new().with_max_history(500);

Publishing Events

use oneclaw_core::event_bus::traits::{Event, EventPriority};

let event = Event::new("sensor.temperature", "temp_sensor_01")
    .with_data("value", "42.5")
    .with_data("device_id", "rpi4_living_room")
    .with_priority(EventPriority::High);

bus.publish(event)?;

println!("Pending events: {}", bus.pending_count());

Subscribing to Events

use oneclaw_core::event_bus::traits::{EventHandler, Event};

// Subscribe to all temperature sensor events
let sub_id = bus.subscribe("sensor.temperature", Box::new(|event| {
    println!("Temperature: {}", event.data.get("value").unwrap_or(&"unknown".into()));
    None  // No response event
}))?;

// Subscribe with pattern matching
let sub_id = bus.subscribe("sensor.*", Box::new(|event| {
    println!("Sensor event: {}", event.topic);
    None
}))?;

// Wildcard: all events
let sub_id = bus.subscribe("*", Box::new(|event| {
    println!("Any event: {}", event.topic);
    None
}))?;
Pattern matching rules:
fn topic_matches(pattern: &str, topic: &str) -> bool {
    if pattern == "*" {
        return true;
    }
    if let Some(prefix) = pattern.strip_suffix(".*") {
        return topic.starts_with(prefix);
    }
    if let Some(prefix) = pattern.strip_suffix('*') {
        return topic.starts_with(prefix);
    }
    pattern == topic
}

Draining Events

// Process all pending events
let processed = bus.drain()?;
println!("Processed {} events", processed);
How drain works:
fn drain(&self) -> Result<usize> {
    // Take all pending events
    let events: Vec<Event> = {
        let mut queue = self.queue.lock()
            .unwrap_or_else(|e| { tracing::warn!("Queue mutex poisoned, recovering"); e.into_inner() });
        std::mem::take(&mut *queue)
    };

    let count = events.len();
    if count == 0 {
        return Ok(0);
    }

    debug!(count = count, "Draining events");

    // Collect new events generated by handlers
    let mut new_events: Vec<Event> = Vec::new();

    let subs = self.subscriptions.lock()
        .unwrap_or_else(|e| { tracing::warn!("Subscriptions mutex poisoned, recovering"); e.into_inner() });

    for event in &events {
        for sub in subs.iter() {
            if Self::topic_matches(&sub.pattern, &event.topic)
                && let Some(response_event) = (sub.handler)(event)
            {
                new_events.push(response_event);
            }
        }
    }

    // Add to history
    {
        let mut history = self.history.lock()
            .unwrap_or_else(|e| { tracing::warn!("History mutex poisoned, recovering"); e.into_inner() });
        for event in events {
            history.push(event);
        }
        // Trim history to max size
        while history.len() > self.max_history {
            history.remove(0);
        }
    }

    // Publish any response events (from handlers)
    drop(subs);
    for new_event in new_events {
        self.publish(new_event)?;
    }

    Ok(count)
}

Unsubscribing

let sub_id = bus.subscribe("test", Box::new(|_| None))?;

// Later...
if bus.unsubscribe(&sub_id)? {
    println!("Unsubscribed successfully");
}

AsyncEventBus (Real-time)

Tokio-based event bus for real-time event processing without drain loops.

Creating the Async Bus

use oneclaw_core::event_bus::async_bus::AsyncEventBus;

let bus = AsyncEventBus::new(256); // channel capacity

Publishing (Immediate Processing)

// Events are processed immediately on publish (no drain needed)
bus.publish(Event::new("sensor.temp", "device_01")
    .with_data("value", "42.5"))?;

// pending_count() always returns 0 (events processed immediately)
assert_eq!(bus.pending_count(), 0);

Async Channel Subscription

use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<()> {
    let bus = AsyncEventBus::new(256);
    let mut receiver = bus.subscribe_channel();

    // Spawn async listener
    tokio::spawn(async move {
        while let Ok(event) = receiver.recv().await {
            println!("Received: {} from {}", event.topic, event.source);
            // Process event in real-time
        }
    });

    // Publish events (processed immediately by listeners)
    bus.publish(Event::new("test", "src"))?;

    Ok(())
}

Multiple Async Receivers

let bus = AsyncEventBus::new(256);
let mut rx1 = bus.subscribe_channel();
let mut rx2 = bus.subscribe_channel();

// Both receivers get the same event
bus.publish(Event::new("broadcast", "src"))?;

let event1 = rx1.recv().await?;
let event2 = rx2.recv().await?;
assert_eq!(event1.topic, event2.topic);

Sync Subscriptions (EventBus trait compat)

use oneclaw_core::event_bus::traits::EventBus;

let bus = AsyncEventBus::new(256);

// Works with EventBus trait (for compatibility)
bus.subscribe("sensor.*", Box::new(|event| {
    println!("Sensor event: {}", event.topic);
    None
}))?;

bus.publish(Event::new("sensor.temp", "src"))?;
// Handler called immediately on publish

Event Pipelines

Chain handlers to create processing workflows.

Example: Alert Pipeline

use oneclaw_core::event_bus::{
    bus::DefaultEventBus,
    traits::{Event, EventPriority},
};

let bus = DefaultEventBus::new();

// Step 1: Monitor sensor data
bus.subscribe("sensor.*", Box::new(|event| {
    if let Some(value) = event.data.get("value")
        && value.parse::<f64>().unwrap_or(0.0) > 100.0
    {
        // Generate alert event
        return Some(Event::new("alert.threshold", "pipeline")
            .with_data("source_topic", event.topic.clone())
            .with_data("value", value.clone())
            .with_priority(EventPriority::Critical));
    }
    None
}))?;

// Step 2: Handle alerts
bus.subscribe("alert.*", Box::new(|event| {
    println!("ALERT: {} from {}", 
        event.data.get("source_topic").unwrap_or(&"unknown".into()),
        event.source
    );
    // Could generate notification event here
    None
}))?;

// Publish sensor reading
bus.publish(Event::new("sensor.temperature", "temp_sensor")
    .with_data("value", "105.5"))?;

// Drain to process pipeline
bus.drain()?; // Processes sensor event → generates alert
bus.drain()?; // Processes alert event

Multi-Stage Pipeline

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

let bus = DefaultEventBus::new();

// Stage 1: Data validation
bus.subscribe("data.raw", Box::new(|event| {
    if event.data.contains_key("value") {
        Some(Event::new("data.validated", "validator")
            .with_data("value", event.data.get("value").unwrap().clone()))
    } else {
        Some(Event::new("data.invalid", "validator"))
    }
}))?;

// Stage 2: Data processing
bus.subscribe("data.validated", Box::new(|event| {
    let value = event.data.get("value").unwrap();
    let processed = format!("processed_{}", value);
    Some(Event::new("data.processed", "processor")
        .with_data("result", processed))
}))?;

// Stage 3: Storage
let counter = Arc::new(AtomicUsize::new(0));
let c = counter.clone();
bus.subscribe("data.processed", Box::new(move |_event| {
    c.fetch_add(1, Ordering::SeqCst);
    None
}))?;

// Run pipeline
bus.publish(Event::new("data.raw", "source")
    .with_data("value", "test"))?;
bus.drain()?; // Stage 1: raw → validated
bus.drain()?; // Stage 2: validated → processed
bus.drain()?; // Stage 3: processed → stored

assert_eq!(counter.load(Ordering::SeqCst), 1);

Real-Time Event Processing

AsyncEventBus processes events immediately without drain loops.

Example: Live Sensor Monitoring

use oneclaw_core::event_bus::async_bus::AsyncEventBus;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() -> Result<()> {
    let bus = AsyncEventBus::new(256);
    let mut rx = bus.subscribe_channel();

    // Spawn real-time event processor
    tokio::spawn(async move {
        while let Ok(event) = rx.recv().await {
            if event.topic.starts_with("sensor.") {
                if let Some(value) = event.data.get("value") {
                    println!("[{}] {}: {}", 
                        event.timestamp.format("%H:%M:%S"),
                        event.topic,
                        value
                    );
                }
            }
        }
    });

    // Simulate sensor data stream
    for i in 0..10 {
        bus.publish(Event::new("sensor.temperature", "temp_01")
            .with_data("value", format!("{:.1}", 20.0 + i as f64)))?;
        sleep(Duration::from_millis(100)).await;
    }

    Ok(())
}

Latency Test

From the test suite:
#[tokio::test]
async fn test_async_bus_realtime_latency() {
    let bus = AsyncEventBus::new(256);
    let mut rx = bus.subscribe_channel();

    let start = std::time::Instant::now();
    bus.publish(test_event("test", "latency")).unwrap();
    let _ = rx.recv().await.unwrap();
    let elapsed = start.elapsed();

    assert!(elapsed.as_millis() < 10, "Event latency too high: {:?}", elapsed);
}
Result: Sub-10ms event latency

Event History

Retrieving Recent Events

use oneclaw_core::event_bus::traits::EventBus;

// Publish some events
for i in 0..10 {
    bus.publish(Event::new(format!("topic.{}", i), "src"))?;
}
bus.drain()?;

// Get last 5 events
let recent = bus.recent_events(5)?;

for event in recent {
    println!("[{}] {}", event.timestamp, event.topic);
}

History Ring Buffer

// Configure history size
let bus = DefaultEventBus::new().with_max_history(100);

// After 200 events, only last 100 are kept
for i in 0..200 {
    bus.publish(Event::new("test", "src"))?;
}
bus.drain()?;

let recent = bus.recent_events(200)?;
assert_eq!(recent.len(), 100); // Only last 100 kept

Choosing the Right Bus

Use DefaultEventBus when:

  • Running in sync context (no tokio runtime)
  • Want explicit control over event processing (drain loops)
  • Building batch processing systems
  • Need deterministic event ordering
  • Working with simple CLI tools

Use AsyncEventBus when:

  • Running in async context (tokio runtime)
  • Need real-time event processing
  • Building websocket servers, IoT dashboards
  • Want concurrent event consumers
  • Processing high-throughput event streams

Sync vs Async Comparison

FeatureDefaultEventBusAsyncEventBus
ProcessingDrain-basedImmediate
RuntimeSyncTokio
ConcurrencySingle-threaded drainMulti-receiver broadcast
LatencyDrain interval<10ms
Use caseCLI, batch jobsReal-time, IoT
drain()Processes eventsNo-op
pending_count()Queue sizeAlways 0

Best Practices

  1. Use topic hierarchies: sensor.temperature.living_room is better than sensor_temp_lr
  2. Set appropriate priorities: Critical = immediate action, High = important, Normal = info
  3. Keep handlers fast: Long-running handlers block event processing
  4. Avoid infinite loops: Don’t publish events that trigger the same handler
  5. Use event history for debugging: Check recent_events() when things go wrong
  6. Clean up subscriptions: Call unsubscribe() when handlers are no longer needed
  7. Monitor history size: Tune with_max_history() based on your needs

EventBus Trait

Both implementations satisfy this trait:
pub trait EventBus: Send + Sync {
    /// Publish an event to the bus
    fn publish(&self, event: Event) -> Result<()>;

    /// Subscribe a handler to a topic pattern
    /// Pattern supports prefix matching: "sensor.*" matches "sensor.temperature", "sensor.humidity"
    fn subscribe(&self, topic_pattern: &str, handler: EventHandler) -> Result<String>; // returns subscription ID

    /// Unsubscribe by subscription ID
    fn unsubscribe(&self, subscription_id: &str) -> Result<bool>;

    /// Get count of pending events (for monitoring)
    fn pending_count(&self) -> usize;

    /// Process all pending events (synchronous drain)
    fn drain(&self) -> Result<usize>; // returns number of events processed

    /// Get event history (recent events for debugging)
    fn recent_events(&self, limit: usize) -> Result<Vec<Event>>;
}

See Also

Build docs developers (and LLMs) love