Extending the Agent
Sentinel AI’s agent workflow is built on LangGraph, making it easy to add custom nodes and modify the decision graph to handle specialized scenarios.
Understanding the Agent Architecture
The agent workflow is defined in src/agent/graph.py. It uses a StateGraph with conditional edges:
from langgraph.graph import StateGraph, END
from .state import AgentState
from .nodes import monitor_node, diagnose_node, plan_node, approve_node, execute_node, verify_node
workflow = StateGraph(AgentState)
workflow.add_node( "monitor" , monitor_node)
workflow.add_node( "diagnose" , diagnose_node)
workflow.add_node( "plan" , plan_node)
workflow.add_node( "approval" , approve_node)
workflow.add_node( "execute" , execute_node)
workflow.add_node( "verify" , verify_node)
workflow.set_entry_point( "monitor" )
Creating Custom Nodes
Nodes are Python functions that receive and return an AgentState dictionary.
Node Function Signature
from src.agent.state import AgentState
def custom_node ( state : AgentState) -> AgentState:
"""
Custom node function.
Args:
state: Current agent state dictionary
Returns:
Updated state dictionary
"""
# Your custom logic here
return { "current_step" : "custom_node" , "custom_field" : "value" }
Example: Custom Notification Node
src/agent/nodes/notify.py
import requests
from src.agent.state import AgentState
from src.core.event_bus import log
def notify_node ( state : AgentState) -> AgentState:
"""
Send notifications when critical issues are detected.
"""
service = state.get( "affected_service" )
error = state.get( "current_error" )
if service and error:
# Send to Slack, PagerDuty, etc.
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
payload = {
"text" : f "🚨 Sentinel AI Alert: { service } is down" ,
"blocks" : [
{
"type" : "section" ,
"text" : {
"type" : "mrkdwn" ,
"text" : f "*Service:* { service } \n *Error:* { error } "
}
}
]
}
try :
response = requests.post(webhook_url, json = payload)
log( "notify" , f "Alert sent for { service } " )
except Exception as e:
log( "error" , f "Notification failed: { e } " )
return { "current_step" : "notify" , "notification_sent" : True }
Example: Custom Backup Node
src/agent/nodes/backup.py
from src.agent.state import AgentState
from src.tools.ssh import SSHClient
from src.core.config import config
from src.core.event_bus import log
def backup_node ( state : AgentState) -> AgentState:
"""
Create backup before executing repair commands.
"""
service = state.get( "affected_service" )
if service == "postgresql" :
ssh = SSHClient(
hostname = config. SSH_HOST ,
port = config. SSH_PORT ,
username = config. SSH_USER ,
password = config. SSH_PASS
)
try :
backup_cmd = f "pg_dump -U postgres sentinel_db > /backups/backup_ { service } .sql"
code, out, err = ssh.execute_command(backup_cmd)
if code == 0 :
log( "backup" , f "Database backup created for { service } " )
return { "current_step" : "backup" , "backup_created" : True }
else :
log( "error" , f "Backup failed: { err } " )
return { "current_step" : "backup" , "backup_created" : False }
finally :
ssh.close()
return { "current_step" : "backup" , "backup_created" : False }
Adding Nodes to the Workflow
Modify src/agent/graph.py to include your custom nodes:
from .nodes import monitor_node, diagnose_node, plan_node, approve_node, execute_node, verify_node
from .nodes.notify import notify_node # Your custom node
from .nodes.backup import backup_node # Your custom node
workflow = StateGraph(AgentState)
# Add existing nodes
workflow.add_node( "monitor" , monitor_node)
workflow.add_node( "diagnose" , diagnose_node)
workflow.add_node( "plan" , plan_node)
# Add your custom nodes
workflow.add_node( "notify" , notify_node)
workflow.add_node( "backup" , backup_node)
workflow.add_node( "approval" , approve_node)
workflow.add_node( "execute" , execute_node)
workflow.add_node( "verify" , verify_node)
Conditional Edges
Add conditional routing based on state:
def should_create_backup ( state : AgentState):
"""Decide if backup is needed before execution."""
service = state.get( "affected_service" )
# Only backup databases
if service in [ "postgresql" , "mongodb" , "redis" ]:
return "backup"
return "execute"
# Add conditional edge
workflow.add_conditional_edges(
"approval" ,
should_create_backup,
{ "backup" : "backup" , "execute" : "execute" }
)
# Connect backup to execute
workflow.add_edge( "backup" , "execute" )
Extended Workflow Example
Complete workflow with custom nodes:
workflow = StateGraph(AgentState)
# Original nodes
workflow.add_node( "monitor" , monitor_node)
workflow.add_node( "diagnose" , diagnose_node)
workflow.add_node( "plan" , plan_node)
workflow.add_node( "approval" , approve_node)
workflow.add_node( "execute" , execute_node)
workflow.add_node( "verify" , verify_node)
workflow.add_node( "report" , report_node)
workflow.add_node( "escalation" , escalation_node)
# Custom nodes
workflow.add_node( "notify" , notify_node)
workflow.add_node( "backup" , backup_node)
workflow.set_entry_point( "monitor" )
# Routing
workflow.add_conditional_edges( "monitor" , should_monitor_end, { "end" : END , "continue" : "notify" })
workflow.add_edge( "notify" , "diagnose" )
workflow.add_edge( "diagnose" , "plan" )
workflow.add_edge( "plan" , "approval" )
workflow.add_conditional_edges( "approval" , should_create_backup, { "backup" : "backup" , "execute" : "execute" , "escalate" : "escalation" , "end" : END })
workflow.add_edge( "backup" , "execute" )
workflow.add_edge( "execute" , "verify" )
workflow.add_conditional_edges( "verify" , should_verify_end, { "end" : "report" , "retry" : "diagnose" , "escalate" : "escalation" })
workflow.add_edge( "report" , END )
workflow.add_edge( "escalation" , END )
app = workflow.compile()
State Management
Extend the AgentState schema in src/agent/state.py:
from typing import TypedDict, List, Optional
class AgentState ( TypedDict , total = False ):
# Existing fields
messages: List
current_step: str
current_error: Optional[ str ]
affected_service: Optional[ str ]
diagnosis_log: List[ str ]
candidate_plan: Optional[ str ]
approval_status: str
retry_count: int
memory_consulted: bool
security_flags: List[ str ]
escalation_reason: Optional[ str ]
# Your custom fields
notification_sent: bool
backup_created: bool
backup_path: Optional[ str ]
custom_metric: float
Testing Custom Nodes
tests/test_custom_nodes.py
from src.agent.nodes.notify import notify_node
from src.agent.nodes.backup import backup_node
def test_notify_node ():
state = {
"affected_service" : "nginx" ,
"current_error" : "Connection refused"
}
result = notify_node(state)
assert result[ "notification_sent" ] == True
assert result[ "current_step" ] == "notify"
def test_backup_node ():
state = {
"affected_service" : "postgresql" ,
"current_error" : "Service down"
}
result = backup_node(state)
assert "backup_created" in result
Best Practices
Immutable State Return new state dictionaries, don’t modify input state
Error Handling Always use try/except blocks and log errors
Logging Use event_bus.log() for consistent logging
SSH Cleanup Always close SSH connections in finally blocks
Custom nodes have access to the entire agent state, including memory, diagnosis, and execution history.
Modifying core nodes (monitor, diagnose, execute) can affect the agent’s reliability. Test thoroughly before deploying.
Agent Workflow Understand the core workflow
Agent Nodes Reference for all nodes
Agent Graph SDK StateGraph API reference
Memory System Learn about episodic memory