Skip to main content

Overview

The Signal Bus is the internal event queue that connects Layer 1 (Retina) to Layer 2 (Limbic). It provides a thread-safe, fan-out architecture that supports both pull (blocking get) and push (callback subscription) consumption modes simultaneously.
Key Feature: Every event is delivered to ALL subscribers AND made available via get(). No events are dropped or split between modes.

Architecture

                         ┌──────────────┐
    Retina               │  _raw_queue  │
    put(event) ─────────►│  queue.Queue │
                         └──────┬───────┘
                                │  dispatcher thread
                    ┌───────────┼────────────┐
                    ▼           ▼            ▼
             _pull_queue   callback[0]  callback[1] …
             (get() reads)
The bus maintains two queues:
  1. _raw_queue: Receives events from Retina via put()
  2. _pull_queue: Dispatched events available via get()
A dedicated dispatcher thread reads from _raw_queue and fans out to both the _pull_queue and all registered subscriber callbacks.

Thread Safety

All methods are safe to call from any thread:
  • Producer (Retina): calls put() from filesystem observer thread and time-tick thread
  • Consumer (Limbic): calls get() from main thread OR registers callbacks that run on dispatcher thread
  • Subscribers: callbacks invoked on the dispatcher thread, so heavy work should be offloaded
pulse/bus.py
class SignalBus:
    """
    Thread-safe event bus connecting Retina (producer) to Limbic (consumer).

    The bus starts its dispatcher thread immediately on construction and runs
    until stop() is called. All methods are safe to call from any thread.
    """

    def __init__(self, maxsize: int = 0) -> None:
        """
        Args:
            maxsize: Maximum number of events buffered in the raw queue before
                put() blocks. 0 (default) means unbounded.
        """
        self._raw_queue: queue.Queue = queue.Queue(maxsize=maxsize)
        self._pull_queue: queue.Queue = queue.Queue()
        self._subscribers: list[Callable[[SignalEvent], None]] = []
        self._lock = threading.Lock()
        self._dispatcher = threading.Thread(
            target=self._dispatch_loop,
            name="signal-bus-dispatcher",
            daemon=True,
        )
        self._dispatcher.start()

Producer API

The Retina calls put() to enqueue events:
pulse/bus.py
def put(self, event: SignalEvent) -> None:
    """
    Enqueue a SignalEvent. Called by Retina. Non-blocking unless the bus
    was constructed with a finite maxsize and the queue is full, in which
    case it blocks until space is available.
    """
    self._raw_queue.put(event)
With maxsize=0 (default), the queue is unbounded and put() never blocks. For bounded queues, put() will block if the buffer is full.

Consumer API: Pull Mode

Consumers can block until the next event is available:
pulse/bus.py
def get(self, timeout: float = 1.0) -> Optional[SignalEvent]:
    """
    Block until the next event is available and return it, or return None
    if timeout expires first. Does not consume events that have already
    been delivered to subscribers.

    Args:
        timeout: Seconds to wait. Must be >= 0.
    """
    try:
        return self._pull_queue.get(timeout=timeout)
    except queue.Empty:
        return None

Example: Polling Loop

bus = SignalBus()
while True:
    event = bus.get(timeout=1.0)
    if event is not None:
        process_event(event)

Consumer API: Push Mode

Consumers can register callbacks that are invoked on the dispatcher thread:
pulse/bus.py
def subscribe(self, callback: Callable[[SignalEvent], None]) -> None:
    """
    Register a callback that will be called on the dispatcher thread for
    every subsequent event. Callbacks must not block for extended periods;
    offload heavy work to a separate thread if needed.

    Callbacks registered after some events have already been dispatched
    will only receive future events.
    """
    with self._lock:
        self._subscribers.append(callback)

def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None:
    """Remove a previously registered callback. No-op if not registered."""
    with self._lock:
        try:
            self._subscribers.remove(callback)
        except ValueError:
            pass

Example: Callback Subscription

bus = SignalBus()

def on_event(event: SignalEvent):
    if event.source == "filesystem":
        print(f"File changed: {event.location}")

bus.subscribe(on_event)
Callbacks run on the dispatcher thread. Heavy work should be offloaded to a separate thread or queue to avoid blocking event delivery.

Dispatcher Implementation

The dispatcher thread reads from _raw_queue and fans out to both the pull queue and all subscribers:
pulse/bus.py
def _dispatch_loop(self) -> None:
    while True:
        item = self._raw_queue.get()
        if item is _STOP:
            break
        event: SignalEvent = item
        # Fan out to pull queue first so get() callers are unblocked.
        self._pull_queue.put(event)
        # Snapshot subscribers under the lock, then call outside the lock
        # so that subscribe()/unsubscribe() cannot deadlock against a
        # slow callback.
        with self._lock:
            callbacks = list(self._subscribers)
        for cb in callbacks:
            try:
                cb(event)
            except Exception:
                # A misbehaving subscriber must not kill the dispatcher.
                pass

Error Handling

If a subscriber callback raises an exception, the dispatcher catches and ignores it. This prevents one misbehaving subscriber from breaking event delivery to other subscribers or the pull queue.
The dispatcher takes a snapshot of the subscriber list inside the lock, then invokes callbacks outside the lock. This prevents:
  1. Deadlock: If a callback tries to subscribe() or unsubscribe() while holding the lock
  2. Blocking: Slow callbacks blocking new subscriptions/unsubscriptions
Trade-off: A callback that is unsubscribe()d while the dispatcher is iterating may still receive one final event.

Lifecycle

Starting

The dispatcher thread starts automatically in __init__():
pulse/bus.py
self._dispatcher = threading.Thread(
    target=self._dispatch_loop,
    name="signal-bus-dispatcher",
    daemon=True,
)
self._dispatcher.start()
The dispatcher is a daemon thread, so it will not prevent the program from exiting if the main thread finishes.

Stopping

To gracefully shut down the bus:
pulse/bus.py
def stop(self) -> None:
    """
    Signal the dispatcher thread to exit and wait for it to finish.
    After stop() returns, no further callbacks will be invoked and get()
    will drain any remaining events already in the pull queue.
    """
    self._raw_queue.put(_STOP)
    self._dispatcher.join()
The _STOP sentinel is a special object that signals the dispatcher to exit:
pulse/bus.py
# Sentinel placed on _raw_queue to signal the dispatcher thread to exit.
_STOP = object()

Public API

pulse/bus.py
class SignalBus:
    def __init__(self, maxsize: int = 0) -> None:
        """
        Args:
            maxsize: Maximum number of events buffered in the raw queue before
                put() blocks. 0 (default) means unbounded.
        """

    # Producer API
    def put(self, event: SignalEvent) -> None:
        """Enqueue a SignalEvent. Called by Retina."""

    # Consumer API — pull mode
    def get(self, timeout: float = 1.0) -> Optional[SignalEvent]:
        """Block until the next event is available, or return None on timeout."""

    # Consumer API — push mode
    def subscribe(self, callback: Callable[[SignalEvent], None]) -> None:
        """Register a callback invoked on the dispatcher thread for every event."""

    def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None:
        """Remove a previously registered callback."""

    # Lifecycle
    def stop(self) -> None:
        """Gracefully stop the dispatcher thread."""

Design Principles

Fan-Out

Every event delivered to ALL subscribers and the pull queue.

Thread-Safe

All methods safe to call from any thread with proper locking.

Fault-Tolerant

Misbehaving callbacks cannot kill the dispatcher or block other subscribers.

Dual-Mode

Supports both pull (get) and push (subscribe) consumption simultaneously.

Usage in Pulse

The Signal Bus connects Layer 1 (Retina) to Layer 2 (Limbic):
from pulse.bus import SignalBus
from pulse.retina import Retina

# Create the bus
bus = SignalBus()

# Layer 1: Retina produces events
retina = Retina(
    watch_dirs=["~/Downloads", "~/Documents"],
    signal_queue=bus,  # Retina calls bus.put()
)
retina.start()

# Layer 2: Limbic consumes events via subscription
def on_event(event):
    score = limbic_layer.score("homework-agent", [event])
    if score > 0.65:
        escalate_to_prefrontal(event, score)

bus.subscribe(on_event)
The Pulse architecture uses the push mode (subscription) internally for efficiency. The pull mode is available for testing and debugging.

Next Steps

Build docs developers (and LLMs) love