Skip to main content

Overview

Pulse is designed to be embedded in an agent kernel, the orchestration layer that manages the agent’s lifecycle, memory, and execution. This guide shows how to wire Pulse into your system.

Architecture Context

Pulse is independent of the kernel. It doesn’t know about:
  • How agents are implemented
  • How memory is stored
  • How tools are executed
It only knows:
  • How to detect signals
  • How to compute relevance scores
  • How to form scoped questions
The kernel bridges Pulse and the agent by:
  1. Registering modules with Pulse
  2. Handling escalation callbacks
  3. Providing training feedback
┌─────────────────────────────────────────┐
│  KERNEL                                 │
│  ┌─────────────────────────────────┐   │
│  │ Agent Executor                   │   │
│  └─────────────────────────────────┘   │
│             ▲                           │
│             │ scoped question           │
│  ┌──────────┴──────────────────────┐   │
│  │ Pulse Integration Layer          │   │
│  │ • on_escalation callback         │   │
│  │ • Module registration            │   │
│  │ • Training feedback              │   │
│  └──────────┬──────────────────────┘   │
│             │                           │
└─────────────┼───────────────────────────┘

      ┌───────▼────────┐
      │  PulseRegistry │
      │  (Pulse core)  │
      └────────────────┘
1
Step 1: Initialize PulseRegistry
2
Create the registry in your kernel’s initialization:
3
from pathlib import Path
from pulse import PulseRegistry

class AgentKernel:
    def __init__(self):
        # Initialize Pulse
        self.pulse = PulseRegistry(
            watch_dirs=[],  # Populated from module fingerprints
            threshold=0.65,  # Global default (can be overridden per-module)
            model_save_path=Path.home() / ".macroa" / "pulse" / "models"
        )
        
        # Track activation IDs for feedback
        self._active_questions: dict[str, str] = {}  # question -> activation_id
        
        # Don't start yet - register modules first
4
Configuration:
5
  • watch_dirs: Start empty; directories are added when modules register
  • threshold: Default relevance threshold (0.0–1.0)
  • model_save_path: Where to persist model weights (set to None to disable persistence)
  • 6
    Step 2: Register Modules
    7
    Register modules during kernel setup or when they’re dynamically loaded:
    8
    class AgentKernel:
        def register_pulse_module(self, module_id: str, fingerprint: dict):
            """
            Register a module with Pulse.
            
            The fingerprint can come from:
            - A static configuration file
            - The module's own registration method
            - Dynamic generation based on user preferences
            """
            try:
                self.pulse.register_module(module_id, fingerprint)
                print(f"✓ Registered {module_id} with Pulse")
            except ValueError as e:
                print(f"✗ Failed to register {module_id}: {e}")
    
    9
    Example registration:
    10
    # Static registration during kernel init
    kernel = AgentKernel()
    
    homework_fingerprint = {
        "module_id": "homework-agent",
        "cluster": "academic",
        "version": "1.0",
        "question_template": "A new file appeared at {location}. Is this file related to a course assignment or homework?",
        "default_threshold": 0.65,
        "signal_priors": {
            "filesystem": {
                "watch_directories": ["~/Downloads", "~/Documents"],
                "relevant_extensions": [".pdf", ".docx", ".pptx"],
                "irrelevant_extensions": [".exe", ".zip"]
            },
            "time": {
                "active_hours": [8, 23],
                "active_days": [0, 1, 2, 3, 4],
                "typical_interval_hours": 24
            }
        }
    }
    
    kernel.register_pulse_module("homework-agent", homework_fingerprint)
    
    11
    Step 3: Implement the Escalation Handler
    12
    This is where Pulse hands control back to the kernel:
    13
    from pulse import EscalationDecision
    
    class AgentKernel:
        def _on_pulse_escalation(self, decision: EscalationDecision):
            """
            Called by Pulse when a module's relevance score exceeds threshold.
            
            Responsibility:
            1. Wake the agent with the scoped question
            2. Track the activation for later feedback
            3. Let the agent handle the question
            """
            print(f"\n[PULSE] {decision.module_id} triggered")
            print(f"        Question: {decision.question}")
            print(f"        Confidence: {decision.confidence:.2f}")
            
            # Track this activation (needed for feedback later)
            # Note: The current EscalationDecision doesn't include activation_id,
            # so we'll need to derive it or enhance the API
            activation_id = self._track_activation(decision)
            
            # Wake the agent with the scoped question
            result = self.agent.handle_question(
                question=decision.question,
                context={
                    "module_id": decision.module_id,
                    "confidence": decision.confidence,
                    "events": decision.triggering_events
                }
            )
            
            # Provide feedback based on result
            self._provide_feedback(activation_id, result)
        
        def _track_activation(self, decision: EscalationDecision) -> str:
            """
            Store the activation for later feedback.
            
            In the current architecture, we need to manually create an activation
            record. Future versions may include the activation_id in the
            EscalationDecision.
            """
            # The registry already recorded the activation, but we don't have the ID
            # This is a design limitation - see pulse/registry.py:152
            # For now, generate a synthetic ID based on the question
            activation_id = hash(decision.question)
            self._active_questions[decision.question] = str(activation_id)
            return str(activation_id)
        
        def _provide_feedback(self, activation_id: str, result):
            """
            Provide training feedback based on agent's response.
            """
            # Calculate label based on what the agent did
            if result.action_taken:
                # Agent took action (wrote memory, executed tool, etc.)
                label = 1.0
            elif result.dismissed:
                # Agent explicitly dismissed as irrelevant
                label = 0.0
            else:
                # Agent acknowledged but took no action (maybe informational)
                label = 0.5
            
            # Submit feedback
            self.pulse.record_feedback(activation_id, label)
    
    14
    Design Note: The current EscalationDecision doesn’t include the activation_id that was created in pulse/registry.py:152. This means the kernel must track the relationship between questions and activation IDs manually.A future enhancement would be to include activation_id in the EscalationDecision dataclass.
    15
    Step 4: Wire the Callback and Start Pulse
    16
    Connect the escalation handler and start Pulse:
    17
    class AgentKernel:
        def start(self):
            """Start the kernel and Pulse subsystem."""
            # Register the escalation callback
            self.pulse.on_escalation(self._on_pulse_escalation)
            
            # Start Pulse (begins watching filesystem, time ticks, etc.)
            self.pulse.start()
            
            print("✓ Pulse is running")
            print(f"  Watching: {self.pulse._watch_dirs}")
            
            # Start other kernel components...
        
        def stop(self):
            """Gracefully shut down the kernel."""
            print("Shutting down Pulse...")
            
            # Stop Pulse (saves model weights)
            self.pulse.stop()
            
            print("✓ Pulse stopped and models saved")
    
    18
    Step 5: Handle Agent Responses
    19
    When the agent processes a Pulse question, extract actionable feedback:
    20
    class Agent:
        def handle_question(self, question: str, context: dict):
            """
            Process a scoped question from Pulse.
            
            Returns:
                Result object with:
                - action_taken: bool
                - dismissed: bool
                - confidence_score: float
            """
            # Use LLM or rule-based logic to answer the question
            response = self.reason(question, context)
            
            # Decide if action is needed
            if self._is_relevant(response):
                # Take action (e.g., store in memory, notify user)
                self.memory.write(f"/pulse/{context['module_id']}/last_seen", {
                    "question": question,
                    "timestamp": time.time(),
                    "response": response
                })
                
                return Result(action_taken=True, dismissed=False, confidence_score=0.9)
            else:
                return Result(action_taken=False, dismissed=True, confidence_score=0.3)
    

    Complete Integration Example

    Here’s a minimal kernel with Pulse integration:
    from pathlib import Path
    from pulse import PulseRegistry, EscalationDecision
    
    class MinimalKernel:
        def __init__(self):
            # Initialize Pulse
            self.pulse = PulseRegistry(
                watch_dirs=[],
                threshold=0.65,
                model_save_path=Path.home() / ".macroa" / "pulse" / "models"
            )
            
            # State
            self._activations = {}  # activation_id -> decision
        
        def register_modules(self):
            """Register all Pulse modules."""
            homework = {
                "module_id": "homework-agent",
                "cluster": "academic",
                "version": "1.0",
                "question_template": "New file at {location}. Related to homework?",
                "default_threshold": 0.65,
                "signal_priors": {
                    "filesystem": {
                        "watch_directories": ["~/Downloads"],
                        "relevant_extensions": [".pdf"],
                        "irrelevant_extensions": [".zip"]
                    }
                }
            }
            self.pulse.register_module("homework-agent", homework)
        
        def start(self):
            """Start the kernel."""
            self.register_modules()
            self.pulse.on_escalation(self._on_escalation)
            self.pulse.start()
            print("✓ Kernel running")
        
        def stop(self):
            """Stop the kernel."""
            self.pulse.stop()
            print("✓ Kernel stopped")
        
        def _on_escalation(self, decision: EscalationDecision):
            """Handle Pulse escalation."""
            print(f"\n[ESCALATION] {decision.question}")
            
            # Simulate agent processing
            action_taken = self._agent_process(decision)
            
            # Provide feedback
            # Note: We don't have the activation_id in the decision,
            # so in practice you'd need to enhance the API or track it separately
            label = 1.0 if action_taken else 0.0
            # self.pulse.record_feedback(activation_id, label)  # Needs activation_id
        
        def _agent_process(self, decision: EscalationDecision) -> bool:
            """Simulate agent deciding if action is needed."""
            # In a real system, this would call the LLM or rule engine
            print(f"  → Agent processing: {decision.module_id}")
            
            # Simulate decision
            if decision.confidence > 0.7:
                print("  → Action taken!")
                return True
            else:
                print("  → Dismissed as irrelevant")
                return False
    
    # Usage
    if __name__ == "__main__":
        kernel = MinimalKernel()
        kernel.start()
        
        try:
            # Keep running (Pulse watches in background)
            import time
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            kernel.stop()
    

    Advanced: Dynamic Module Registration

    Modules can be registered at runtime based on user activity:
    class AgentKernel:
        def learn_new_pattern(self, user_feedback: dict):
            """
            Dynamically create a new Pulse module based on user feedback.
            
            Example: User says "Track when I get emails from my professor"
            """
            # Extract signal patterns from feedback
            fingerprint = {
                "module_id": f"email-{user_feedback['sender']}",
                "cluster": "communication",
                "version": "1.0",
                "question_template": f"Email from {user_feedback['sender']} at {{location}}. Should I notify?",
                "default_threshold": 0.7,
                "signal_priors": {
                    "memory": {
                        "watch_namespaces": ["/mem/inbox/"],
                        "high_relevance_keys": ["from", "subject"]
                    }
                }
            }
            
            # Register with Pulse (can be called while Pulse is running)
            self.pulse.register_module(fingerprint["module_id"], fingerprint)
            
            print(f"✓ Now tracking emails from {user_feedback['sender']}")
    

    Troubleshooting

    Check:
    1. Are modules registered before pulse.start()?
    2. Are watch directories correct? Print them:
      print(f"Watching: {pulse._watch_dirs}")
      
    3. Is the escalation handler registered?
      pulse.on_escalation(your_handler)
      
    4. Make a test file in a watched directory and check Layer 1 output
    Debug: See the Monitoring Signals guide for detailed debugging steps.
    The current architecture has a gap: EscalationDecision doesn’t include the activation_id created in pulse/registry.py:152.Workaround:
    1. Hash the question to create a synthetic ID:
      activation_id = str(hash(decision.question))
      
    2. Track the mapping yourself in the kernel
    Better solution: Enhance EscalationDecision to include activation_id:
    @dataclass
    class EscalationDecision:
        activation_id: str  # Add this field
        module_id: str
        question: str
        confidence: float
        triggering_events: list[SignalEvent]
        should_escalate: bool
    
    Check that model_save_path is set:
    pulse = PulseRegistry(
        watch_dirs=[],
        model_save_path=Path.home() / ".macroa" / "pulse" / "models"
    )
    
    And that you call pulse.stop() before exit (this triggers model saving).
    Not recommended. Pulse uses filesystem watchers (via watchdog), which can conflict if multiple instances watch the same directories.If you need multiple kernels, use a single shared PulseRegistry and route escalations based on module_id.

    Best Practices

    Register modules early

    Register all modules before calling pulse.start(). While dynamic registration works, it’s cleaner to register upfront.

    Always provide feedback

    The training loop depends on feedback. Even simple heuristics (action taken → 1.0, dismissed → 0.0) improve model accuracy over time.

    Set model_save_path

    Always persist models. Without persistence, the model resets to synthetic priors on every restart, losing learned patterns.

    Monitor escalation frequency

    If escalations are too frequent (noise), increase default_threshold. If too rare (missing events), lower it or provide more training data.

    Next Steps

    Registering Modules

    Learn how to create signal fingerprints

    Training Models

    Understand online learning and feedback

    Monitoring Signals

    Debug signal flow and troubleshoot issues

    Architecture

    Deep dive into Pulse’s three-layer design

    Build docs developers (and LLMs) love