Skip to main content

Overview

AsyncEventBus is an asynchronous implementation of the EventBus trait using tokio broadcast channels. Events are processed immediately upon publish with real-time delivery to subscribers. Location: crates/oneclaw-core/src/event_bus/async_bus.rs:32

Key Differences from DefaultEventBus

FeatureAsyncEventBusDefaultEventBus
ProcessingImmediate on publishDeferred until drain()
ConcurrencyMultiple async subscribersSequential handler execution
drain()No-op (returns 0)Processes pending queue
pending_count()Always returns 0Returns queue length
Latency< 10msDepends on drain interval
Use caseReal-time event streamingBatch processing, event loops

Design

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:1-7 Events are published to a tokio broadcast::Sender. All subscribers receive events concurrently via their own Receiver.
┌─────────┐
│ publish │──> broadcast::Sender ──┬──> Receiver 1
└─────────┘                         ├──> Receiver 2
                                    └──> Receiver N

Usage

Basic Example

use oneclaw_core::event_bus::{AsyncEventBus, Event, EventBus};

let bus = AsyncEventBus::new(256);  // capacity: 256 events

// Get a receiver channel BEFORE boxing as EventBus
let mut rx = bus.subscribe_channel();

// Publish event (processed immediately)
bus.publish(Event::new("sensor.temperature", "sensor-1")
    .with_data("value", "42.5"))?;

// Receive event asynchronously
tokio::spawn(async move {
    while let Ok(event) = rx.recv().await {
        println!("Received: {}", event.topic);
    }
});

Creating the Bus

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:42
// capacity: max events a receiver can lag before dropping
let bus = AsyncEventBus::new(256);  // Good for most edge use cases
If a subscriber falls behind by more than capacity events, older events are dropped.

Async Channel Subscription

subscribe_channel()

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:60 Get a broadcast receiver for async event consumption. This is the preferred way for consumer apps to receive events.
let mut rx = bus.subscribe_channel();

tokio::spawn(async move {
    while let Ok(event) = rx.recv().await {
        match event.topic.as_str() {
            "alert.critical" => handle_critical_alert(&event).await,
            "sensor.temperature" => log_temperature(&event),
            _ => {},
        }
    }
});
Important: Call subscribe_channel() BEFORE boxing the bus as Box<dyn EventBus>.

sender()

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:68 Get a clone of the broadcast sender to create receivers later:
let sender = bus.sender();

// Later, create a new receiver
let mut rx = sender.subscribe();

Multiple Concurrent Subscribers

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:362 All receivers receive the same events concurrently:
let bus = AsyncEventBus::new(256);
let mut rx1 = bus.subscribe_channel();
let mut rx2 = bus.subscribe_channel();

bus.publish(Event::new("test", "source"))?;

// Both receivers get the event
let e1 = rx1.recv().await?;
let e2 = rx2.recv().await?;
assert_eq!(e1.topic, e2.topic);

Real-Time Event Delivery

Low Latency

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:377 Events are delivered with < 10ms latency:
let start = std::time::Instant::now();
bus.publish(Event::new("test", "source"))?;
let event = rx.recv().await?;
let latency = start.elapsed();

assert!(latency.as_millis() < 10);

No drain() Required

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:169 Unlike DefaultEventBus, events are processed immediately:
bus.publish(event)?;  // Immediately broadcast to all receivers

// drain() is a no-op
let processed = bus.drain()?;
assert_eq!(processed, 0);

// No pending queue
assert_eq!(bus.pending_count(), 0);

EventBus Trait Compatibility

Pattern-Based Subscription

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:140 Supports the same pattern matching as DefaultEventBus:
bus.subscribe("sensor.*", Box::new(|event| {
    println!("Sensor event: {}", event.topic);
    None
}))?;
These subscriptions are processed synchronously when events are published, before broadcasting to async channels.

Topic Matching

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:78
// Wildcard - matches everything
AsyncEventBus::topic_matches("*", "anything");  // true

// Prefix matching
AsyncEventBus::topic_matches("sensor.*", "sensor.temp");  // true
AsyncEventBus::topic_matches("sensor*", "sensor.temp");   // true

// Exact match
AsyncEventBus::topic_matches("sensor.temp", "sensor.temp");  // true

Event History

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:175 Events are stored in a ring buffer for late-joining subscribers and debugging:
let recent = bus.recent_events(10)?;  // Last 10 events
for event in recent {
    println!("{}: {}", event.timestamp, event.topic);
}
History is limited by the bus capacity. Oldest events are dropped when full.

Handler-Generated Events

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:96-108 Handlers can return response events:
bus.subscribe("sensor.*", Box::new(|event| {
    if let Some(value) = event.data.get("value")
        && value.parse::<f64>().unwrap_or(0.0) > 100.0
    {
        Some(Event::new("alert.threshold", "pipeline")
            .with_priority(EventPriority::Critical))
    } else {
        None
    }
}))?;
Derived events are added to history and broadcast to async subscribers. Note: Derived events are NOT re-matched against sync subscriptions to avoid infinite recursion.

Thread Safety

Location: crates/oneclaw-core/src/event_bus/async_bus.rs:32
struct AsyncEventBus {
    sender: broadcast::Sender<Event>,
    history: Mutex<VecDeque<Event>>,
    subscriptions: RwLock<Vec<AsyncSubscription>>,
    history_capacity: usize,
}
  • sender: Clone-able, thread-safe broadcast sender
  • history: Mutex-protected ring buffer
  • subscriptions: RwLock for concurrent reads

Use Cases

Real-Time Monitoring

let bus = AsyncEventBus::new(256);
let mut rx = bus.subscribe_channel();

tokio::spawn(async move {
    while let Ok(event) = rx.recv().await {
        if event.priority == EventPriority::Critical {
            send_alert(&event).await;
        }
    }
});

Live Telemetry Streaming

// WebSocket handler
async fn telemetry_stream(ws: WebSocket, bus_sender: broadcast::Sender<Event>) {
    let mut rx = bus_sender.subscribe();
    
    while let Ok(event) = rx.recv().await {
        if event.topic.starts_with("telemetry.") {
            ws.send(serde_json::to_string(&event)?).await?;
        }
    }
}

Multi-Subscriber Event Distribution

let bus = AsyncEventBus::new(256);

// Logger subscriber
let mut log_rx = bus.subscribe_channel();
tokio::spawn(async move {
    while let Ok(event) = log_rx.recv().await {
        tracing::info!("Event: {}", event.topic);
    }
});

// Metrics subscriber
let mut metrics_rx = bus.subscribe_channel();
tokio::spawn(async move {
    while let Ok(event) = metrics_rx.recv().await {
        record_metric(&event);
    }
});

Performance Considerations

  • Broadcast overhead: All receivers get all events (filter in receiver)
  • Channel capacity: Set based on max subscriber lag tolerance
  • Concurrent receivers: Scales well with multiple async subscribers
  • No drain cost: Zero overhead for batch processing loop

When to Use AsyncEventBus

  • Real-time event streaming to multiple consumers
  • WebSocket/SSE event feeds
  • Concurrent async event processing
  • Low-latency telemetry distribution
  • Consumer apps needing immediate event delivery

When to Use DefaultEventBus

  • Batch event processing
  • Event loop integration (tick-based)
  • Predictable processing in tests
  • Sequential handler execution required

Build docs developers (and LLMs) love