The Signal Bus is the internal event queue that connects Layer 1 (Retina) to Layer 2 (Limbic). It provides a thread-safe, fan-out architecture that supports both pull (blocking get) and push (callback subscription) consumption modes simultaneously.
Key Feature: Every event is delivered to ALL subscribers AND made available via get(). No events are dropped or split between modes.
Producer (Retina): calls put() from filesystem observer thread and time-tick thread
Consumer (Limbic): calls get() from main thread OR registers callbacks that run on dispatcher thread
Subscribers: callbacks invoked on the dispatcher thread, so heavy work should be offloaded
pulse/bus.py
class SignalBus: """ Thread-safe event bus connecting Retina (producer) to Limbic (consumer). The bus starts its dispatcher thread immediately on construction and runs until stop() is called. All methods are safe to call from any thread. """ def __init__(self, maxsize: int = 0) -> None: """ Args: maxsize: Maximum number of events buffered in the raw queue before put() blocks. 0 (default) means unbounded. """ self._raw_queue: queue.Queue = queue.Queue(maxsize=maxsize) self._pull_queue: queue.Queue = queue.Queue() self._subscribers: list[Callable[[SignalEvent], None]] = [] self._lock = threading.Lock() self._dispatcher = threading.Thread( target=self._dispatch_loop, name="signal-bus-dispatcher", daemon=True, ) self._dispatcher.start()
def put(self, event: SignalEvent) -> None: """ Enqueue a SignalEvent. Called by Retina. Non-blocking unless the bus was constructed with a finite maxsize and the queue is full, in which case it blocks until space is available. """ self._raw_queue.put(event)
With maxsize=0 (default), the queue is unbounded and put() never blocks. For bounded queues, put() will block if the buffer is full.
Consumers can block until the next event is available:
pulse/bus.py
def get(self, timeout: float = 1.0) -> Optional[SignalEvent]: """ Block until the next event is available and return it, or return None if timeout expires first. Does not consume events that have already been delivered to subscribers. Args: timeout: Seconds to wait. Must be >= 0. """ try: return self._pull_queue.get(timeout=timeout) except queue.Empty: return None
Consumers can register callbacks that are invoked on the dispatcher thread:
pulse/bus.py
def subscribe(self, callback: Callable[[SignalEvent], None]) -> None: """ Register a callback that will be called on the dispatcher thread for every subsequent event. Callbacks must not block for extended periods; offload heavy work to a separate thread if needed. Callbacks registered after some events have already been dispatched will only receive future events. """ with self._lock: self._subscribers.append(callback)def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None: """Remove a previously registered callback. No-op if not registered.""" with self._lock: try: self._subscribers.remove(callback) except ValueError: pass
The dispatcher thread reads from _raw_queue and fans out to both the pull queue and all subscribers:
pulse/bus.py
def _dispatch_loop(self) -> None: while True: item = self._raw_queue.get() if item is _STOP: break event: SignalEvent = item # Fan out to pull queue first so get() callers are unblocked. self._pull_queue.put(event) # Snapshot subscribers under the lock, then call outside the lock # so that subscribe()/unsubscribe() cannot deadlock against a # slow callback. with self._lock: callbacks = list(self._subscribers) for cb in callbacks: try: cb(event) except Exception: # A misbehaving subscriber must not kill the dispatcher. pass
If a subscriber callback raises an exception, the dispatcher catches and ignores it. This prevents one misbehaving subscriber from breaking event delivery to other subscribers or the pull queue.
Why snapshot subscribers outside the lock?
The dispatcher takes a snapshot of the subscriber list inside the lock, then invokes callbacks outside the lock. This prevents:
Deadlock: If a callback tries to subscribe() or unsubscribe() while holding the lock
Blocking: Slow callbacks blocking new subscriptions/unsubscriptions
Trade-off: A callback that is unsubscribe()d while the dispatcher is iterating may still receive one final event.
def stop(self) -> None: """ Signal the dispatcher thread to exit and wait for it to finish. After stop() returns, no further callbacks will be invoked and get() will drain any remaining events already in the pull queue. """ self._raw_queue.put(_STOP) self._dispatcher.join()
The _STOP sentinel is a special object that signals the dispatcher to exit:
pulse/bus.py
# Sentinel placed on _raw_queue to signal the dispatcher thread to exit._STOP = object()
class SignalBus: def __init__(self, maxsize: int = 0) -> None: """ Args: maxsize: Maximum number of events buffered in the raw queue before put() blocks. 0 (default) means unbounded. """ # Producer API def put(self, event: SignalEvent) -> None: """Enqueue a SignalEvent. Called by Retina.""" # Consumer API — pull mode def get(self, timeout: float = 1.0) -> Optional[SignalEvent]: """Block until the next event is available, or return None on timeout.""" # Consumer API — push mode def subscribe(self, callback: Callable[[SignalEvent], None]) -> None: """Register a callback invoked on the dispatcher thread for every event.""" def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None: """Remove a previously registered callback.""" # Lifecycle def stop(self) -> None: """Gracefully stop the dispatcher thread."""