Overview
Agent nodes are Python functions that implement each step of the recovery workflow. Each node receives the current AgentState and returns a dictionary of state updates.
Node Function Signature
All nodes follow this pattern:
from typing import Dict, Any
from src.agent.state import AgentState
def example_node ( state : AgentState) -> Dict[ str , Any]:
# Access state
current_error = state.get( "current_error" )
# Perform logic
# ...
# Return state updates
return {
"current_step" : "example" ,
"some_field" : "updated_value"
}
monitor_node
Checks the health of all configured services via SSH.
Import
from src.agent.nodes import monitor_node
Function Signature
def monitor_node ( state : AgentState) -> Dict[ str , Any]
Current workflow state (typically empty on first run)
Error message if a service is down, otherwise None
Name of the failed service (e.g., "nginx", "postgres") or None
Behavior
Connects to the target server via SSH using credentials from config
Iterates through all services in config.SERVICES
Executes each service’s check_command
Verifies output contains the running_indicator string
Returns first detected failure or None if all services are healthy
Example Usage
from src.agent.nodes import monitor_node
initial_state = {
"current_step" : "" ,
"current_error" : None
}
result = monitor_node(initial_state)
if result[ "current_error" ]:
print ( f "Service { result[ 'affected_service' ] } is down!" )
print ( f "Error: { result[ 'current_error' ] } " )
else :
print ( "All services healthy" )
Service Configuration
Services are defined in config.SERVICES:
SERVICES = {
"nginx" : {
"type" : "web_server" ,
"check_command" : "service nginx status" ,
"running_indicator" : "is running"
},
"postgres" : {
"type" : "database" ,
"check_command" : "pg_isready" ,
"running_indicator" : "accepting connections"
}
}
The monitor node emits status_update events via the event bus for real-time UI updates.
diagnose_node
Analyzes the detected error using LLM reasoning, memory lookups, and RAG retrieval.
Import
from src.agent.nodes import diagnose_node
Function Signature
def diagnose_node ( state : AgentState) -> Dict[ str , Any]
Must contain current_error and affected_service
Appends the new diagnosis to the existing log
True if memory or RAG was used in the diagnosis
Behavior
Memory Lookup : Queries episodic memory for similar past errors
RAG Query : Searches vector knowledge base for relevant documentation
LLM Analysis : Generates diagnosis using GPT-4 with context
Logs Results : Appends diagnosis to diagnosis_log
Example Usage
from src.agent.nodes import diagnose_node
state_with_error = {
"current_error" : "nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)" ,
"affected_service" : "nginx" ,
"diagnosis_log" : [],
"retry_count" : 0
}
result = diagnose_node(state_with_error)
print (result[ "diagnosis_log" ][ - 1 ])
# Output: "Port 80 is occupied by another process. Likely cause: stale nginx process or conflicting service."
Context Sources
The diagnosis integrates three sources:
Memory Past solutions and failed commands
RAG Official documentation embeddings
LLM GPT-4 reasoning and synthesis
LLM Prompt Structure
messages = [
SystemMessage( content = (
"Eres Sentinel AI, un agente DevOps autonomo. \n "
"Analiza el error y proporciona un diagnostico BREVE (maximo 3 lineas). \n "
f "Servicio afectado: { service } \n "
f "Historial: { prior_logs[ - 3 :] } \n "
f " { memory_context } \n "
f "Documentacion: { rag_context[: 1000 ] } \n "
)),
HumanMessage( content = f "Error: { error } " )
]
The node instructs the LLM NOT to suggest commands that previously failed (from memory context).
plan_node
Generates 1-3 remediation commands based on the diagnosis.
Import
from src.agent.nodes import plan_node
Function Signature
def plan_node ( state : AgentState) -> Dict[ str , Any]
Must contain diagnosis_log with at least one diagnosis
Newline-separated list of shell commands
Set to "PENDING" for security validation
Behavior
Extracts the latest diagnosis from diagnosis_log
Queries memory for previously failed commands
Prompts LLM to generate 1-3 remediation commands
Auto-adds sudo prefix where needed via ensure_sudo()
Returns commands as a newline-separated string
Example Usage
from src.agent.nodes import plan_node
state_with_diagnosis = {
"current_error" : "nginx port conflict" ,
"affected_service" : "nginx" ,
"diagnosis_log" : [ "Port 80 occupied by stale process" ],
"retry_count" : 0
}
result = plan_node(state_with_diagnosis)
print (result[ "candidate_plan" ])
# Output:
# sudo ss -tulpn | grep :80
# sudo pkill -9 nginx
# sudo service nginx start
Command Generation Rules
The LLM is instructed to follow strict rules:
Generate 1-3 commands, one per line
NO command chaining with &&, ||, or ;
ALL admin commands must use sudo
Return ONLY commands, no explanations or backticks
NEVER repeat commands that previously failed
Base solution on the provided diagnosis
Use -y or --yes flags for non-interactive execution
Auto-Sudo Logic
def ensure_sudo ( command : str ) -> str :
if command.startswith( "sudo " ):
return command
prefixes = [ "service " , "kill " , "pkill " , "rm " , "chmod " ,
"chown " , "apt " , "dpkg " , "nginx" , "systemctl " ,
"fuser " , "docker " ]
for prefix in prefixes:
if command.strip().startswith(prefix):
return f "sudo { command.strip() } "
return command
If the LLM fails to generate valid commands, the node falls back to sudo service {service} restart.
approve_node
Validates the security of generated commands before execution.
Import
from src.agent.nodes import approve_node
Function Signature
def approve_node ( state : AgentState) -> Dict[ str , Any]
Must contain candidate_plan with commands to validate
One of: "APPROVED", "WAITING_APPROVAL", "REJECTED"
List of security warnings or blocks
Reason for rejection if status is "REJECTED"
Behavior
Splits candidate_plan into individual commands
Validates each command using validate_command() from core.security
Checks if commands are critical using is_critical()
Returns approval status based on findings
Example Usage
from src.agent.nodes import approve_node
state_with_plan = {
"candidate_plan" : "sudo service nginx restart \n sudo apt-get update" ,
"affected_service" : "nginx"
}
result = approve_node(state_with_plan)
if result[ "approval_status" ] == "APPROVED" :
print ( "Commands approved for automatic execution" )
elif result[ "approval_status" ] == "WAITING_APPROVAL" :
print ( "Critical commands detected, manual approval required" )
print (result[ "security_flags" ])
else : # REJECTED
print ( f "Commands blocked: { result[ 'escalation_reason' ] } " )
Security Validation
Commands are checked against a blacklist and pattern rules:
from src.core.security import validate_command, is_critical
is_valid, reason = validate_command( "sudo rm -rf /var/log/*" )
if not is_valid:
print ( f "Blocked: { reason } " )
# Blocked: Destructive rm -rf pattern detected
if is_critical( "sudo systemctl stop postgres" ):
print ( "Requires manual approval" )
Commands containing destructive patterns like rm -rf /, dd if=, or mkfs are automatically REJECTED .
execute_node
Executes approved commands via SSH and records results in memory.
Import
from src.agent.nodes import execute_node
Function Signature
def execute_node ( state : AgentState) -> Dict[ str , Any]
Must contain candidate_plan and approval_status == "APPROVED"
Appends execution results to the log
Behavior
Verifies approval_status == "APPROVED"
Connects to target server via SSH
Executes each command sequentially
Captures exit codes, stdout, and stderr
Saves episode to memory with success/failure status
Closes SSH connection
Example Usage
from src.agent.nodes import execute_node
approved_state = {
"candidate_plan" : "sudo service nginx stop \n sudo service nginx start" ,
"approval_status" : "APPROVED" ,
"current_error" : "nginx port conflict" ,
"diagnosis_log" : [ "Port occupied, restart required" ]
}
result = execute_node(approved_state)
print (result[ "diagnosis_log" ][ - 1 ])
# Output: "[sudo service nginx stop] codigo:0 salida:OK | [sudo service nginx start] codigo:0 salida:Starting nginx"
Command Execution
Each command is executed with proper sudo handling:
for command in commands:
needs_sudo = command.startswith( "sudo" )
clean = command.replace( "sudo " , "" , 1 ) if needs_sudo else command
code, out, err = ssh.execute_command(clean, use_sudo = needs_sudo)
if code != 0 :
overall_success = False
log( "error" , f "Command failed with exit code { code } " )
Memory Persistence
After execution, the episode is saved:
memory.save_episode(
error = state[ "current_error" ],
diagnosis = state[ "diagnosis_log" ][ - 1 ],
command = candidate_plan,
result = " | " .join(all_results),
success = overall_success
)
Even failed executions are saved to memory to prevent repeating the same mistakes.
verify_node
Confirms whether the service has recovered after command execution.
Import
from src.agent.nodes import verify_node
Function Signature
def verify_node ( state : AgentState) -> Dict[ str , Any]
Must contain affected_service matching a key in config.SERVICES
None if service recovered, otherwise original error
Incremented by 1 if service is still down
Behavior
Looks up service configuration from config.SERVICES
Executes the service’s check_command via SSH
Verifies output contains running_indicator
Returns current_error: None if recovered
Increments retry_count if still down
Example Usage
from src.agent.nodes import verify_node
state_after_execution = {
"affected_service" : "nginx" ,
"current_error" : "nginx port conflict" ,
"retry_count" : 0
}
result = verify_node(state_after_execution)
if result[ "current_error" ] is None :
print ( "Service recovered successfully!" )
else :
print ( f "Still down, retry { result[ 'retry_count' ] } / { config. MAX_RETRIES } " )
Retry Logic
The workflow uses retry_count to limit recovery attempts:
if retry_count >= config. MAX_RETRIES :
# Escalate to human
return "escalate"
else :
# Retry diagnosis
return "retry"
Set config.MAX_RETRIES to control how many recovery cycles run before escalation.
Helper Nodes
report_node
Logs successful recovery details.
def report_node ( state : AgentState):
service = state.get( "affected_service" , "desconocido" )
error = state.get( "current_error" )
command = state.get( "candidate_plan" , "N/A" )
attempts = state.get( "retry_count" , 0 ) + 1
if not error:
log( "report" , f "EXITO: Servicio ' { service } ' recuperado en { attempts } intento(s)." )
log( "report" , f "Solucion aplicada: { command } " )
return { "current_step" : "report" }
escalation_node
Triggers human intervention for critical failures.
def escalation_node ( state : AgentState):
service = state.get( "affected_service" , "desconocido" )
reason = state.get( "escalation_reason" ) or "Limite de reintentos alcanzado."
log( "escalation" , f "FALLA CRITICA en ' { service } ': { reason } " )
log( "escalation" , "Se requiere intervencion humana." )
return {
"current_step" : "escalation" ,
"escalation_reason" : reason
}
Node Composition
You can create custom nodes that compose existing ones:
def intelligent_retry_node ( state : AgentState) -> Dict[ str , Any]:
"""Combines diagnosis and planning with backoff logic."""
retry_count = state.get( "retry_count" , 0 )
# Progressive backoff
if retry_count > 0 :
import time
time.sleep( 2 ** retry_count) # 2s, 4s, 8s...
# Re-diagnose
state = { ** state, ** diagnose_node(state)}
# Generate new plan
state = { ** state, ** plan_node(state)}
return state
Testing Nodes
Nodes are pure functions and easy to test:
import pytest
from src.agent.nodes import monitor_node
def test_monitor_detects_failure ( monkeypatch ):
# Mock SSH client
class MockSSH :
def execute_command ( self , cmd ):
return ( 1 , "nginx is not running" , "" )
def close ( self ):
pass
monkeypatch.setattr( "src.agent.nodes.monitor.SSHClient" , MockSSH)
result = monitor_node({})
assert result[ "current_error" ] is not None
assert result[ "affected_service" ] == "nginx"
Best Practices
Stateless Logic : Nodes should not maintain internal state between calls
Idempotency : Design nodes to be safely re-executable
Error Handling : Wrap SSH/API calls in try-except blocks
Logging : Use log() from event_bus for observability
Type Safety : Always use .get() for state access with defaults
Agent Graph Orchestrate nodes into workflows
SSH Client Execute remote commands in nodes
Knowledge Base Query documentation in diagnose_node
Memory System Store and retrieve past episodes