Overview
Sentinel AI implements multi-layered security controls to prevent destructive actions while maintaining autonomous operation. The security system includes:
Forbidden command patterns (auto-reject)
Critical command detection (manual approval required)
SSH security (password-protected sudo, PTY allocation)
Episodic memory (prevents repeating failed commands)
Security Architecture
Security validation occurs in the Approval Node (src/agent/nodes/approve.py):
def approve_node ( state : AgentState) -> Dict[ str , Any]:
plan = state.get( "candidate_plan" , "" )
commands = [cmd.strip() for cmd in plan.split( " \n " ) if cmd.strip()]
security_flags = []
has_critical = False
for cmd in commands:
# 1. Check forbidden patterns (REJECT)
is_valid, reason = validate_command(cmd)
if not is_valid:
return {
"approval_status" : "REJECTED" ,
"security_flags" : [ f "BLOQUEADO: { reason } " ],
"escalation_reason" : reason
}
# 2. Check critical patterns (REQUIRE APPROVAL)
if is_critical(cmd):
has_critical = True
security_flags.append( f "CRITICO: { cmd } " )
if has_critical:
return {
"approval_status" : "WAITING_APPROVAL" ,
"security_flags" : security_flags
}
else :
return {
"approval_status" : "APPROVED" ,
"security_flags" : security_flags
}
Command Validation
Forbidden Patterns
Commands matching these regex patterns are automatically rejected and trigger escalation.
Implementation (src/core/security.py:5-21):
FORBIDDEN_PATTERNS = [
r "rm \s + -rf \s + / \s * $ " , # rm -rf /
r "rm \s + -rf \s + / \* " , # rm -rf /*
r "mkfs \. " , # Format filesystem
r "dd \s + if=" , # Disk write operations
r ": \(\)\{ . * \| . * & \}\; " , # Fork bomb
r "shutdown \s " , # System shutdown
r "reboot \s * $ " , # System reboot
r "halt \s * $ " , # System halt
r "init \s + 0" , # Init shutdown
r "fdisk \s " , # Partition editor
r "parted \s " , # Partition editor
r "wipefs \s " , # Wipe filesystem signatures
r "> \s * /dev/sd" , # Write to raw disk device
r "chmod \s + -R \s + 777 \s + / \s * $ " , # Chmod root directory
r "chown \s + -R . * : \s * / \s * $ " , # Chown root directory
]
Validation function (src/core/security.py:50-54):
def validate_command ( command : str ) -> Tuple[ bool , str ]:
for pattern in FORBIDDEN_PATTERNS :
if re.search(pattern, command, re. IGNORECASE ):
return False , f "Comando bloqueado por politica de seguridad: patron prohibido"
return True , ""
Critical Protection : Forbidden commands immediately return approval_status: "REJECTED", bypassing the execute node and triggering escalation.
Critical Patterns
Commands matching these patterns require manual human approval before execution.
Implementation (src/core/security.py:23-47):
CRITICAL_PATTERNS = [
r " \b rm \b " , # File deletion
r " \b delete \b " , # Database/file deletion
r " \b drop \b " , # Database drop
r " \b truncate \b " , # Database truncate
r " \b kill \s + -9 \b " , # Force kill process
r " \b systemctl \s + stop \b " , # Stop systemd service
r " \b systemctl \s + restart \b " , # Restart systemd service
r " \b service \s + \S + \s + stop \b " , # Stop SysV service
r " \b service \s + \S + \s + restart \b " , # Restart SysV service
r " \b iptables \b " , # Firewall rules
r " \b ufw \b " , # Firewall management
r " \b passwd \b " , # Change user password
r " \b usermod \b " , # Modify user
r " \b userdel \b " , # Delete user
r " \b chmod \b " , # Change file permissions
r " \b chown \b " , # Change file ownership
r " \b mv \s + /" , # Move system files
r " \b cp \s + /dev/" , # Copy device files
r " \b apt \b " , # Package manager
r " \b apt-get \b " , # Package manager
r " \b yum \b " , # Package manager
r " \b dnf \b " , # Package manager
r " \b install \b " , # Package installation
]
Detection function (src/core/security.py:57-61):
def is_critical ( command : str ) -> bool :
for pattern in CRITICAL_PATTERNS :
if re.search(pattern, command, re. IGNORECASE ):
return True
return False
Manual Review : Critical commands set approval_status: "WAITING_APPROVAL", pausing the workflow until a human operator approves or rejects.
Approval Workflow
The workflow handles three approval states:
Commands pass all security checks and execute automatically.
if has_critical:
# Manual approval required
else :
return {
"approval_status" : "APPROVED" ,
"security_flags" : []
}
Conditional edge (src/agent/graph.py:14-22):
def should_approve_continue ( state : AgentState):
status = state.get( "approval_status" )
if status == "REJECTED" :
return "escalate"
elif status == "WAITING_APPROVAL" :
return "end" # Pause workflow
return "execute" # Auto-execute
WAITING_APPROVAL (Manual review)
Commands contain critical patterns and require human approval.
Workflow returns "end" from the approval node
Application exposes /api/approve/{incident_id} endpoint
Human reviews command in UI and calls /approve or /reject
Workflow resumes with resume_app (starts at execute node)
Resume workflow (src/agent/graph.py:82-104):
resume_workflow = StateGraph(AgentState)
resume_workflow.set_entry_point( "execute" ) # Skip monitor/diagnose/plan/approval
resume_workflow.add_node( "execute" , execute_node)
resume_workflow.add_node( "verify" , verify_node)
resume_workflow.add_node( "report" , report_node)
resume_workflow.add_node( "escalation" , escalation_node)
resume_app = resume_workflow.compile()
Commands match forbidden patterns or are rejected by human operator.
if not is_valid:
return {
"approval_status" : "REJECTED" ,
"escalation_reason" : reason
}
Escalation node (src/agent/graph.py:49-55):
def escalation_node ( state : AgentState):
service = state.get( "affected_service" , "desconocido" )
reason = state.get( "escalation_reason" ) or "Limite de reintentos alcanzado."
log( "escalation" , f "FALLA CRITICA en ' { service } ': { reason } " )
log( "escalation" , "Se requiere intervencion humana." )
return { "current_step" : "escalation" , "escalation_reason" : reason}
SSH Security
Sentinel AI uses Paramiko for secure SSH connections with privilege escalation.
Connection Security
Implementation (src/tools/ssh.py:16-31):
def connect ( self ):
self .client = paramiko.SSHClient()
self .client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self .client.connect(
hostname = self .hostname,
port = self .port,
username = self .username,
password = self .password,
key_filename = self .key_filename,
timeout = 10
)
Security features :
Authentication : Password or SSH key
Host key policy : AutoAddPolicy() (accepts new hosts)
Connection timeout : 10 seconds
Non-root user : Connects as sentinel user (from config.SSH_USER)
The system uses a dedicated sentinel user with limited sudo permissions. The user is not root , requiring explicit sudo for administrative commands.
Sudo Command Execution
Implementation (src/tools/ssh.py:33-74):
def execute_command ( self , command : str , use_sudo : bool = False ) -> Tuple[ int , str , str ]:
if not self .client:
self .connect()
if use_sudo:
command = f "sudo -S { command } "
stdin, stdout, stderr = self .client.exec_command(command, get_pty = True )
time.sleep( 0.3 )
stdin.write( f " { self .password } \n " ) # Send password to sudo prompt
stdin.flush()
else :
stdin, stdout, stderr = self .client.exec_command(command)
# Wait for command to finish
while not stdout.channel.exit_status_ready():
check_stop() # Allow graceful interruption
time.sleep( 0.5 )
exit_code = stdout.channel.recv_exit_status()
out = stdout.read().decode().strip()
err = stderr.read().decode().strip()
# Strip sudo password prompt from output
if use_sudo and out.startswith( "[sudo]" ):
lines = out.split( " \n " )
out = " \n " .join(lines[ 1 :]).strip()
return exit_code, out, err
Sudo security :
PTY allocation : get_pty=True enables interactive sudo
Password injection : Password sent to stdin after 300ms delay
Output sanitization : Removes [sudo] password for sentinel: prompt
Exit code checking : Detects command failures
Password in memory : The SSH password is stored in plaintext in config.SSH_PASS. Use SSH keys or secret management systems in production.
Automatic Sudo Injection
The plan node automatically adds sudo to administrative commands.
Implementation (src/agent/nodes/plan.py:13-21):
def ensure_sudo ( command : str ) -> str :
if command.startswith( "sudo " ):
return command
prefixes = [ "service " , "kill " , "pkill " , "rm " , "chmod " , "chown " ,
"apt " , "dpkg " , "nginx" , "systemctl " , "fuser " , "docker " ]
for prefix in prefixes:
if command.strip().startswith(prefix):
return f "sudo { command.strip() } "
return command
Memory-Based Prevention
The system prevents repeating failed commands using episodic memory .
Failed Command Retrieval
Implementation (src/core/memory.py:57-71):
def get_failed_commands ( self , error : str ) -> List[ str ]:
error_lower = error.lower()
error_keywords = set (error_lower.split())
failed = set ()
for ep in self .episodes:
if not ep[ "success" ]:
ep_keywords = set (ep.get( "error" , "" ).lower().split())
if len (error_keywords & ep_keywords) >= 1 : # At least 1 keyword overlap
cmd = ep[ "command" ].strip()
failed.add(cmd)
return list (failed)
LLM Prompt Injection
Failed commands are injected into the plan node’s prompt.
Implementation (src/agent/nodes/plan.py:31-36):
failed_commands = memory.get_failed_commands(error)
failed_str = ""
if failed_commands:
failed_str = " \n COMANDOS QUE YA FALLARON (PROHIBIDO repetirlos): \n "
for cmd in failed_commands:
failed_str += f "- { cmd } \n "
# ... included in SystemMessage
SystemMessage( content = f "...
{ failed_str }
... ")
The LLM is explicitly instructed to never repeat commands from the failed list, preventing infinite retry loops on known-bad solutions.
Security Layers Summary
Layer 1: Forbidden Patterns Regex-based rejection of destructive commands (rm -rf /, mkfs, fork bombs)Action : Auto-reject → Escalate
Layer 2: Critical Patterns Manual approval required for risky operations (restart, package install, chmod)Action : Pause → Wait for human approval
Layer 3: Episodic Memory Prevent repeating failures by tracking command history and outcomesAction : LLM prompt injection → Alternative solution
Layer 4: SSH Security Non-root user with sudo, password-protected, PTY-basedAction : Least privilege + command auditing
Configuration
Security settings in src/core/config.py:
SSH_HOST = os.getenv( "SSH_HOST" , "localhost" )
SSH_PORT = int (os.getenv( "SSH_PORT" , 2222 ))
SSH_USER = os.getenv( "SSH_USER" , "sentinel" )
SSH_PASS = os.getenv( "SSH_PASS" )
MAX_RETRIES = 5 # Maximum retry attempts before escalation
Best Practices
Use SSH keys instead of passwords
For production deployments, configure key_filename instead of password in the SSH client: SSHClient(
hostname = config. SSH_HOST ,
username = config. SSH_USER ,
key_filename = "/path/to/private_key"
)
Restrict sudo permissions
Limit the sentinel user’s sudo access to specific commands: # /etc/sudoers.d/sentinel
sentinel ALL= ( ALL ) NOPASSWD: /usr/bin/service, /usr/bin/systemctl, /usr/bin/nginx
Enable SSH session logging and integrate with SIEM systems to track all executed commands.
Customize forbidden patterns
Add organization-specific patterns to FORBIDDEN_PATTERNS based on your infrastructure: r "docker\s+rm\s+-f\s+prod-" , # Prevent deleting production containers