Overview
SignalBus is the internal event queue between Layer 1 (Retina) and Layer 2 (Limbic). It supports both pull mode (blocking get()) and push mode (subscribe() callbacks), with every event fanned out to all registered subscribers AND made available via get().
The bus runs a dedicated dispatcher thread that ensures no event is dropped or split between modes.
Import
from pulse.bus import SignalBus
Class Definition
Defined in pulse/bus.py:42.
Architecture
┌──────────────┐
Retina │ _raw_queue │
put(event) ─────────►│ queue.Queue │
└──────┬───────┘
│ dispatcher thread
┌───────────┼────────────┐
▼ ▼ ▼
_pull_queue callback[0] callback[1] …
(get() reads)
Constructor
def __init__(self, maxsize: int = 0) -> None
Maximum number of events buffered in the raw queue before put() blocks.
0 (default): Unbounded queue
> 0: Bounded queue; put() blocks when full
The dispatcher thread starts immediately on construction.
Producer API
put
def put(self, event: SignalEvent) -> None
Enqueue a SignalEvent. Called by Retina.
Non-blocking unless the bus was constructed with a finite maxsize and the queue is full, in which case it blocks until space is available.
Example:
from pulse.bus import SignalBus
from pulse.retina import SignalEvent
bus = SignalBus()
event = SignalEvent(
source="filesystem",
location="/home/user/test.py",
delta_type="created",
magnitude=1.0,
timestamp=1678123456.0,
features={},
)
bus.put(event)
Consumer API — Pull Mode
get
def get(self, timeout: float = 1.0) -> Optional[SignalEvent]
Block until the next event is available and return it, or return None if timeout expires first.
Does not consume events that have already been delivered to subscribers.
Seconds to wait. Must be >= 0.
Returns:
SignalEvent | None: The next event, or None if timeout expires
Example:
from pulse.bus import SignalBus
import time
bus = SignalBus()
# Start producer thread (e.g., Retina) in background...
while True:
event = bus.get(timeout=5.0)
if event is not None:
print(f"Received: {event.delta_type} at {event.location}")
else:
print("No events in the last 5 seconds")
Consumer API — Push Mode
subscribe
def subscribe(self, callback: Callable[[SignalEvent], None]) -> None
Register a callback that will be called on the dispatcher thread for every subsequent event.
Callbacks must not block for extended periods; offload heavy work to a separate thread if needed.
Callbacks registered after some events have already been dispatched will only receive future events.
callback
Callable[[SignalEvent], None]
required
Function that receives each SignalEvent. Exceptions are silently caught to prevent dispatcher thread crashes.
Example:
from pulse.bus import SignalBus
from pulse.retina import SignalEvent
def on_event(event: SignalEvent) -> None:
print(f"Callback: {event.source} | {event.delta_type}")
bus = SignalBus()
bus.subscribe(on_event)
# All future events will trigger on_event() on the dispatcher thread
unsubscribe
def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None
Remove a previously registered callback. No-op if not registered.
callback
Callable[[SignalEvent], None]
required
The same function object that was passed to subscribe().
Example:
def on_event(event):
print(event.location)
bus.subscribe(on_event)
# ... later ...
bus.unsubscribe(on_event)
Lifecycle
stop
Signal the dispatcher thread to exit and wait for it to finish.
After stop() returns:
- No further callbacks will be invoked
get() will drain any remaining events already in the pull queue
- New
put() calls will still enqueue (but will not be dispatched)
Example:
bus = SignalBus()
bus.subscribe(my_callback)
# ... run for a while ...
bus.stop() # Clean shutdown
Thread Safety
All methods are safe to call from any thread:
put(): Called by Retina’s filesystem observer and tick threads
get(): Can be called from any consumer thread
subscribe() / unsubscribe(): Protected by internal lock
- Callbacks: Executed on the dedicated dispatcher thread
Exception Handling
If a subscriber callback raises an exception, it is silently caught to prevent the dispatcher thread from crashing. This ensures that a misbehaving subscriber cannot disrupt the entire event bus.
Example:
def buggy_callback(event):
raise ValueError("Oops!")
bus.subscribe(buggy_callback)
# Dispatcher continues running; exception is caught and ignored
Complete Example
import queue
import threading
import time
from pulse.bus import SignalBus
from pulse.retina import Retina, SignalEvent
# 1. Create SignalBus
bus = SignalBus(maxsize=100)
# 2. Subscribe a callback
def on_event(event: SignalEvent) -> None:
print(f"[Subscriber] {event.source} | {event.delta_type} | {event.location}")
bus.subscribe(on_event)
# 3. Create Retina and wire it to the bus
signal_queue = queue.Queue()
retina = Retina(watch_dirs=["/home/user/workspace"], signal_queue=signal_queue)
# 4. Forward Retina output to the bus
def forwarder():
while True:
try:
event = signal_queue.get(timeout=1.0)
bus.put(event)
except queue.Empty:
pass
threading.Thread(target=forwarder, daemon=True).start()
# 5. Start Retina
retina.start()
# 6. Pull events in main thread
try:
for _ in range(10):
event = bus.get(timeout=5.0)
if event:
print(f"[Main] {event.location}")
finally:
retina.stop()
bus.stop()
See Also