Skip to main content

Overview

SignalBus is the internal event queue between Layer 1 (Retina) and Layer 2 (Limbic). It supports both pull mode (blocking get()) and push mode (subscribe() callbacks), with every event fanned out to all registered subscribers AND made available via get(). The bus runs a dedicated dispatcher thread that ensures no event is dropped or split between modes.

Import

from pulse.bus import SignalBus

Class Definition

Defined in pulse/bus.py:42.

Architecture

                     ┌──────────────┐
Retina               │  _raw_queue  │
put(event) ─────────►│  queue.Queue │
                     └──────┬───────┘
                            │  dispatcher thread
                ┌───────────┼────────────┐
                ▼           ▼            ▼
         _pull_queue   callback[0]  callback[1] …
         (get() reads)

Constructor

def __init__(self, maxsize: int = 0) -> None
maxsize
int
default:"0"
Maximum number of events buffered in the raw queue before put() blocks.
  • 0 (default): Unbounded queue
  • > 0: Bounded queue; put() blocks when full
The dispatcher thread starts immediately on construction.

Producer API

put

def put(self, event: SignalEvent) -> None
Enqueue a SignalEvent. Called by Retina. Non-blocking unless the bus was constructed with a finite maxsize and the queue is full, in which case it blocks until space is available.
event
SignalEvent
required
The event to enqueue. See SignalEvent.
Example:
from pulse.bus import SignalBus
from pulse.retina import SignalEvent

bus = SignalBus()

event = SignalEvent(
    source="filesystem",
    location="/home/user/test.py",
    delta_type="created",
    magnitude=1.0,
    timestamp=1678123456.0,
    features={},
)

bus.put(event)

Consumer API — Pull Mode

get

def get(self, timeout: float = 1.0) -> Optional[SignalEvent]
Block until the next event is available and return it, or return None if timeout expires first. Does not consume events that have already been delivered to subscribers.
timeout
float
default:"1.0"
Seconds to wait. Must be >= 0.
Returns:
  • SignalEvent | None: The next event, or None if timeout expires
Example:
from pulse.bus import SignalBus
import time

bus = SignalBus()

# Start producer thread (e.g., Retina) in background...

while True:
    event = bus.get(timeout=5.0)
    if event is not None:
        print(f"Received: {event.delta_type} at {event.location}")
    else:
        print("No events in the last 5 seconds")

Consumer API — Push Mode

subscribe

def subscribe(self, callback: Callable[[SignalEvent], None]) -> None
Register a callback that will be called on the dispatcher thread for every subsequent event. Callbacks must not block for extended periods; offload heavy work to a separate thread if needed. Callbacks registered after some events have already been dispatched will only receive future events.
callback
Callable[[SignalEvent], None]
required
Function that receives each SignalEvent. Exceptions are silently caught to prevent dispatcher thread crashes.
Example:
from pulse.bus import SignalBus
from pulse.retina import SignalEvent

def on_event(event: SignalEvent) -> None:
    print(f"Callback: {event.source} | {event.delta_type}")

bus = SignalBus()
bus.subscribe(on_event)

# All future events will trigger on_event() on the dispatcher thread

unsubscribe

def unsubscribe(self, callback: Callable[[SignalEvent], None]) -> None
Remove a previously registered callback. No-op if not registered.
callback
Callable[[SignalEvent], None]
required
The same function object that was passed to subscribe().
Example:
def on_event(event):
    print(event.location)

bus.subscribe(on_event)
# ... later ...
bus.unsubscribe(on_event)

Lifecycle

stop

def stop(self) -> None
Signal the dispatcher thread to exit and wait for it to finish. After stop() returns:
  • No further callbacks will be invoked
  • get() will drain any remaining events already in the pull queue
  • New put() calls will still enqueue (but will not be dispatched)
Example:
bus = SignalBus()
bus.subscribe(my_callback)

# ... run for a while ...

bus.stop()  # Clean shutdown

Thread Safety

All methods are safe to call from any thread:
  • put(): Called by Retina’s filesystem observer and tick threads
  • get(): Can be called from any consumer thread
  • subscribe() / unsubscribe(): Protected by internal lock
  • Callbacks: Executed on the dedicated dispatcher thread

Exception Handling

If a subscriber callback raises an exception, it is silently caught to prevent the dispatcher thread from crashing. This ensures that a misbehaving subscriber cannot disrupt the entire event bus. Example:
def buggy_callback(event):
    raise ValueError("Oops!")

bus.subscribe(buggy_callback)
# Dispatcher continues running; exception is caught and ignored

Complete Example

import queue
import threading
import time
from pulse.bus import SignalBus
from pulse.retina import Retina, SignalEvent

# 1. Create SignalBus
bus = SignalBus(maxsize=100)

# 2. Subscribe a callback
def on_event(event: SignalEvent) -> None:
    print(f"[Subscriber] {event.source} | {event.delta_type} | {event.location}")

bus.subscribe(on_event)

# 3. Create Retina and wire it to the bus
signal_queue = queue.Queue()
retina = Retina(watch_dirs=["/home/user/workspace"], signal_queue=signal_queue)

# 4. Forward Retina output to the bus
def forwarder():
    while True:
        try:
            event = signal_queue.get(timeout=1.0)
            bus.put(event)
        except queue.Empty:
            pass

threading.Thread(target=forwarder, daemon=True).start()

# 5. Start Retina
retina.start()

# 6. Pull events in main thread
try:
    for _ in range(10):
        event = bus.get(timeout=5.0)
        if event:
            print(f"[Main] {event.location}")
finally:
    retina.stop()
    bus.stop()

See Also

  • SignalEvent — Event structure passed through the bus
  • Retina — Producer that calls put()
  • LimbicLayer — Consumer that uses subscribe() or get()
  • PulseRegistry — Owns and coordinates SignalBus lifecycle

Build docs developers (and LLMs) love