Overview
PulseRegistry is the top-level coordinator owned by the kernel. It orchestrates all three Pulse layers (Retina, Limbic, Prefrontal), manages module registration, handles escalation events, and coordinates the training feedback loop.
Import
from pulse.registry import PulseRegistry
Class Definition
Defined in pulse/registry.py:30.
Constructor
def __init__(
self,
watch_dirs: list[str],
threshold: float = 0.65,
model_save_path: Optional[Path] = None,
) -> None
Initial list of directories to monitor. Module-specific directories are added dynamically via register_module().
Default escalation threshold for the Prefrontal layer. Must be in [0.0, 1.0].
model_save_path
Path | None
default:"None"
Path where Limbic model weights will be persisted on stop(). If None, models are not saved.
Module Registration
register_module
def register_module(self, module_id: str, fingerprint_raw: dict) -> None
Parse fingerprint, register with LimbicLayer, and extend Retina watch list.
Safe to call before or after start(). When called after start(), new directories are scheduled on the running observer immediately.
Unique identifier for the module.
Example:
from pulse.registry import PulseRegistry
from pathlib import Path
registry = PulseRegistry(
watch_dirs=["/home/user/workspace"],
threshold=0.7,
model_save_path=Path("/var/pulse/models.pt"),
)
fingerprint = {
"module_id": "homework_watcher",
"cluster": "homework",
"version": "1.0.0",
"question_template": "Should I check {location}?",
"default_threshold": 0.7,
"signal_priors": {
"filesystem": {
"watch_directories": ["~/Documents/Homework"],
"relevant_extensions": [".pdf"],
"irrelevant_extensions": [],
},
},
}
registry.register_module("homework_watcher", fingerprint)
Lifecycle
start
Start Retina and SignalBus; subscribe the internal signal handler. Non-blocking.
Must be called after all modules are registered (or modules can be registered dynamically afterward).
stop
Gracefully stop Retina and SignalBus; persist model weights if model_save_path is set.
Blocks until all background threads have terminated.
Escalation Handling
on_escalation
def on_escalation(self, handler: Callable[[EscalationDecision], None]) -> None
Register (or replace) the escalation handler. This callback is invoked whenever a module’s score exceeds the threshold and Prefrontal determines an escalation is warranted.
handler
Callable[[EscalationDecision], None]
required
Example:
from pulse.prefrontal import EscalationDecision
def handle_escalation(decision: EscalationDecision) -> None:
if decision.should_escalate:
print(f"[{decision.module_id}] {decision.question}")
print(f"Confidence: {decision.confidence:.2f}")
registry.on_escalation(handle_escalation)
registry.start()
Training Feedback Loop
record_activation
def record_activation(self, module_id: str, window: list[SignalEvent]) -> str
Delegate to TrainingBuffer. Records an activation and returns a unique activation ID.
The module that triggered the activation.
window
list[SignalEvent]
required
The window of events that caused the activation.
Returns:
str: Unique activation ID (UUID hex string)
record_feedback
def record_feedback(self, activation_id: str, label: float) -> None
Attach explicit feedback to a stored activation record.
The activation ID returned by record_activation().
Feedback label in range [0.0, 1.0], where 1.0 means the activation was highly relevant.
Raises:
ValueError: If label is not in [0.0, 1.0]
drain_training
def drain_training(self) -> None
Train Limbic models on all ready activation records and remove them from the buffer.
A record is ready if:
- It has an explicit label (via
record_feedback()), OR
- It is older than 5 minutes (fallback to implicit label)
Called automatically 2 seconds after each escalation. Can also be called manually.
Internal Architecture
The registry owns and coordinates:
- Retina (Layer 1): Filesystem observer and time ticker
- SignalBus: Event distribution with push/pull modes
- LimbicLayer (Layer 2): Per-module LSTM scoring models
- PrefrontalLayer (Layer 3): Escalation gating and question formation
- TrainingBuffer: Activation recording and online training
All per-module scoring runs in a thread pool to avoid blocking the event dispatcher.
Complete Example
import time
from pathlib import Path
from pulse.registry import PulseRegistry
from pulse.prefrontal import EscalationDecision
# 1. Create registry
registry = PulseRegistry(
watch_dirs=["/home/user/workspace"],
threshold=0.7,
model_save_path=Path("/var/pulse/models.pt"),
)
# 2. Register modules
registry.register_module("homework_watcher", {
"module_id": "homework_watcher",
"cluster": "homework",
"version": "1.0.0",
"question_template": "New homework file detected at {location}. Should I review it?",
"default_threshold": 0.7,
"signal_priors": {
"filesystem": {
"watch_directories": ["~/Documents/Homework"],
"relevant_extensions": [".pdf", ".docx"],
"irrelevant_extensions": [".tmp"],
},
},
})
# 3. Register escalation handler
def handle_escalation(decision: EscalationDecision) -> None:
if decision.should_escalate:
print(f"Module {decision.module_id} suggests:")
print(f" Question: {decision.question}")
print(f" Confidence: {decision.confidence:.2%}")
registry.on_escalation(handle_escalation)
# 4. Start the system
registry.start()
try:
# System runs in background, processing events automatically
time.sleep(3600) # Run for 1 hour
finally:
# 5. Graceful shutdown
registry.stop()
See Also