Skip to main content

Extending the Agent

Sentinel AI’s agent workflow is built on LangGraph, making it easy to add custom nodes and modify the decision graph to handle specialized scenarios.

Understanding the Agent Architecture

The agent workflow is defined in src/agent/graph.py. It uses a StateGraph with conditional edges:
src/agent/graph.py
from langgraph.graph import StateGraph, END
from .state import AgentState
from .nodes import monitor_node, diagnose_node, plan_node, approve_node, execute_node, verify_node

workflow = StateGraph(AgentState)

workflow.add_node("monitor", monitor_node)
workflow.add_node("diagnose", diagnose_node)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approve_node)
workflow.add_node("execute", execute_node)
workflow.add_node("verify", verify_node)

workflow.set_entry_point("monitor")

Creating Custom Nodes

Nodes are Python functions that receive and return an AgentState dictionary.

Node Function Signature

from src.agent.state import AgentState

def custom_node(state: AgentState) -> AgentState:
    """
    Custom node function.
    
    Args:
        state: Current agent state dictionary
        
    Returns:
        Updated state dictionary
    """
    # Your custom logic here
    return {"current_step": "custom_node", "custom_field": "value"}

Example: Custom Notification Node

src/agent/nodes/notify.py
import requests
from src.agent.state import AgentState
from src.core.event_bus import log

def notify_node(state: AgentState) -> AgentState:
    """
    Send notifications when critical issues are detected.
    """
    service = state.get("affected_service")
    error = state.get("current_error")
    
    if service and error:
        # Send to Slack, PagerDuty, etc.
        webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
        
        payload = {
            "text": f"🚨 Sentinel AI Alert: {service} is down",
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*Service:* {service}\n*Error:* {error}"
                    }
                }
            ]
        }
        
        try:
            response = requests.post(webhook_url, json=payload)
            log("notify", f"Alert sent for {service}")
        except Exception as e:
            log("error", f"Notification failed: {e}")
    
    return {"current_step": "notify", "notification_sent": True}

Example: Custom Backup Node

src/agent/nodes/backup.py
from src.agent.state import AgentState
from src.tools.ssh import SSHClient
from src.core.config import config
from src.core.event_bus import log

def backup_node(state: AgentState) -> AgentState:
    """
    Create backup before executing repair commands.
    """
    service = state.get("affected_service")
    
    if service == "postgresql":
        ssh = SSHClient(
            hostname=config.SSH_HOST,
            port=config.SSH_PORT,
            username=config.SSH_USER,
            password=config.SSH_PASS
        )
        
        try:
            backup_cmd = f"pg_dump -U postgres sentinel_db > /backups/backup_{service}.sql"
            code, out, err = ssh.execute_command(backup_cmd)
            
            if code == 0:
                log("backup", f"Database backup created for {service}")
                return {"current_step": "backup", "backup_created": True}
            else:
                log("error", f"Backup failed: {err}")
                return {"current_step": "backup", "backup_created": False}
        finally:
            ssh.close()
    
    return {"current_step": "backup", "backup_created": False}

Adding Nodes to the Workflow

Modify src/agent/graph.py to include your custom nodes:
src/agent/graph.py
from .nodes import monitor_node, diagnose_node, plan_node, approve_node, execute_node, verify_node
from .nodes.notify import notify_node  # Your custom node
from .nodes.backup import backup_node  # Your custom node

workflow = StateGraph(AgentState)

# Add existing nodes
workflow.add_node("monitor", monitor_node)
workflow.add_node("diagnose", diagnose_node)
workflow.add_node("plan", plan_node)

# Add your custom nodes
workflow.add_node("notify", notify_node)
workflow.add_node("backup", backup_node)

workflow.add_node("approval", approve_node)
workflow.add_node("execute", execute_node)
workflow.add_node("verify", verify_node)

Conditional Edges

Add conditional routing based on state:
src/agent/graph.py
def should_create_backup(state: AgentState):
    """Decide if backup is needed before execution."""
    service = state.get("affected_service")
    # Only backup databases
    if service in ["postgresql", "mongodb", "redis"]:
        return "backup"
    return "execute"

# Add conditional edge
workflow.add_conditional_edges(
    "approval",
    should_create_backup,
    {"backup": "backup", "execute": "execute"}
)

# Connect backup to execute
workflow.add_edge("backup", "execute")

Extended Workflow Example

Complete workflow with custom nodes:
src/agent/graph.py
workflow = StateGraph(AgentState)

# Original nodes
workflow.add_node("monitor", monitor_node)
workflow.add_node("diagnose", diagnose_node)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approve_node)
workflow.add_node("execute", execute_node)
workflow.add_node("verify", verify_node)
workflow.add_node("report", report_node)
workflow.add_node("escalation", escalation_node)

# Custom nodes
workflow.add_node("notify", notify_node)
workflow.add_node("backup", backup_node)

workflow.set_entry_point("monitor")

# Routing
workflow.add_conditional_edges("monitor", should_monitor_end, {"end": END, "continue": "notify"})
workflow.add_edge("notify", "diagnose")
workflow.add_edge("diagnose", "plan")
workflow.add_edge("plan", "approval")
workflow.add_conditional_edges("approval", should_create_backup, {"backup": "backup", "execute": "execute", "escalate": "escalation", "end": END})
workflow.add_edge("backup", "execute")
workflow.add_edge("execute", "verify")
workflow.add_conditional_edges("verify", should_verify_end, {"end": "report", "retry": "diagnose", "escalate": "escalation"})
workflow.add_edge("report", END)
workflow.add_edge("escalation", END)

app = workflow.compile()

State Management

Extend the AgentState schema in src/agent/state.py:
src/agent/state.py
from typing import TypedDict, List, Optional

class AgentState(TypedDict, total=False):
    # Existing fields
    messages: List
    current_step: str
    current_error: Optional[str]
    affected_service: Optional[str]
    diagnosis_log: List[str]
    candidate_plan: Optional[str]
    approval_status: str
    retry_count: int
    memory_consulted: bool
    security_flags: List[str]
    escalation_reason: Optional[str]
    
    # Your custom fields
    notification_sent: bool
    backup_created: bool
    backup_path: Optional[str]
    custom_metric: float

Testing Custom Nodes

tests/test_custom_nodes.py
from src.agent.nodes.notify import notify_node
from src.agent.nodes.backup import backup_node

def test_notify_node():
    state = {
        "affected_service": "nginx",
        "current_error": "Connection refused"
    }
    
    result = notify_node(state)
    assert result["notification_sent"] == True
    assert result["current_step"] == "notify"

def test_backup_node():
    state = {
        "affected_service": "postgresql",
        "current_error": "Service down"
    }
    
    result = backup_node(state)
    assert "backup_created" in result

Best Practices

Immutable State

Return new state dictionaries, don’t modify input state

Error Handling

Always use try/except blocks and log errors

Logging

Use event_bus.log() for consistent logging

SSH Cleanup

Always close SSH connections in finally blocks
Custom nodes have access to the entire agent state, including memory, diagnosis, and execution history.
Modifying core nodes (monitor, diagnose, execute) can affect the agent’s reliability. Test thoroughly before deploying.

Agent Workflow

Understand the core workflow

Agent Nodes

Reference for all nodes

Agent Graph SDK

StateGraph API reference

Memory System

Learn about episodic memory

Build docs developers (and LLMs) love