Skip to main content

Overview

Sentinel AI implements multi-layered security controls to prevent destructive actions while maintaining autonomous operation. The security system includes:
  • Forbidden command patterns (auto-reject)
  • Critical command detection (manual approval required)
  • SSH security (password-protected sudo, PTY allocation)
  • Episodic memory (prevents repeating failed commands)

Security Architecture

Security validation occurs in the Approval Node (src/agent/nodes/approve.py):
def approve_node(state: AgentState) -> Dict[str, Any]:
    plan = state.get("candidate_plan", "")
    commands = [cmd.strip() for cmd in plan.split("\n") if cmd.strip()]
    security_flags = []
    has_critical = False
    
    for cmd in commands:
        # 1. Check forbidden patterns (REJECT)
        is_valid, reason = validate_command(cmd)
        if not is_valid:
            return {
                "approval_status": "REJECTED",
                "security_flags": [f"BLOQUEADO: {reason}"],
                "escalation_reason": reason
            }
        
        # 2. Check critical patterns (REQUIRE APPROVAL)
        if is_critical(cmd):
            has_critical = True
            security_flags.append(f"CRITICO: {cmd}")
    
    if has_critical:
        return {
            "approval_status": "WAITING_APPROVAL",
            "security_flags": security_flags
        }
    else:
        return {
            "approval_status": "APPROVED",
            "security_flags": security_flags
        }

Command Validation

Forbidden Patterns

Commands matching these regex patterns are automatically rejected and trigger escalation. Implementation (src/core/security.py:5-21):
FORBIDDEN_PATTERNS = [
    r"rm\s+-rf\s+/\s*$",              # rm -rf /
    r"rm\s+-rf\s+/\*",                # rm -rf /*
    r"mkfs\.",                         # Format filesystem
    r"dd\s+if=",                       # Disk write operations
    r":\(\)\{.*\|.*&\}\;",            # Fork bomb
    r"shutdown\s",                     # System shutdown
    r"reboot\s*$",                     # System reboot
    r"halt\s*$",                       # System halt
    r"init\s+0",                       # Init shutdown
    r"fdisk\s",                        # Partition editor
    r"parted\s",                       # Partition editor
    r"wipefs\s",                       # Wipe filesystem signatures
    r">\s*/dev/sd",                    # Write to raw disk device
    r"chmod\s+-R\s+777\s+/\s*$",      # Chmod root directory
    r"chown\s+-R.*:\s*/\s*$",         # Chown root directory
]
Validation function (src/core/security.py:50-54):
def validate_command(command: str) -> Tuple[bool, str]:
    for pattern in FORBIDDEN_PATTERNS:
        if re.search(pattern, command, re.IGNORECASE):
            return False, f"Comando bloqueado por politica de seguridad: patron prohibido"
    return True, ""
Critical Protection: Forbidden commands immediately return approval_status: "REJECTED", bypassing the execute node and triggering escalation.

Critical Patterns

Commands matching these patterns require manual human approval before execution. Implementation (src/core/security.py:23-47):
CRITICAL_PATTERNS = [
    r"\brm\b",                         # File deletion
    r"\bdelete\b",                     # Database/file deletion
    r"\bdrop\b",                       # Database drop
    r"\btruncate\b",                   # Database truncate
    r"\bkill\s+-9\b",                  # Force kill process
    r"\bsystemctl\s+stop\b",          # Stop systemd service
    r"\bsystemctl\s+restart\b",       # Restart systemd service
    r"\bservice\s+\S+\s+stop\b",     # Stop SysV service
    r"\bservice\s+\S+\s+restart\b",  # Restart SysV service
    r"\biptables\b",                   # Firewall rules
    r"\bufw\b",                        # Firewall management
    r"\bpasswd\b",                     # Change user password
    r"\busermod\b",                    # Modify user
    r"\buserdel\b",                    # Delete user
    r"\bchmod\b",                      # Change file permissions
    r"\bchown\b",                      # Change file ownership
    r"\bmv\s+/",                       # Move system files
    r"\bcp\s+/dev/",                   # Copy device files
    r"\bapt\b",                        # Package manager
    r"\bapt-get\b",                    # Package manager
    r"\byum\b",                        # Package manager
    r"\bdnf\b",                        # Package manager
    r"\binstall\b",                    # Package installation
]
Detection function (src/core/security.py:57-61):
def is_critical(command: str) -> bool:
    for pattern in CRITICAL_PATTERNS:
        if re.search(pattern, command, re.IGNORECASE):
            return True
    return False
Manual Review: Critical commands set approval_status: "WAITING_APPROVAL", pausing the workflow until a human operator approves or rejects.

Approval Workflow

The workflow handles three approval states:
1
APPROVED (Auto-execute)
2
Commands pass all security checks and execute automatically.
3
if has_critical:
    # Manual approval required
else:
    return {
        "approval_status": "APPROVED",
        "security_flags": []
    }
4
Conditional edge (src/agent/graph.py:14-22):
5
def should_approve_continue(state: AgentState):
    status = state.get("approval_status")
    if status == "REJECTED":
        return "escalate"
    elif status == "WAITING_APPROVAL":
        return "end"  # Pause workflow
    return "execute"  # Auto-execute
6
WAITING_APPROVAL (Manual review)
7
Commands contain critical patterns and require human approval.
8
Pausing mechanism:
9
  • Workflow returns "end" from the approval node
  • Application exposes /api/approve/{incident_id} endpoint
  • Human reviews command in UI and calls /approve or /reject
  • Workflow resumes with resume_app (starts at execute node)
  • 10
    Resume workflow (src/agent/graph.py:82-104):
    11
    resume_workflow = StateGraph(AgentState)
    resume_workflow.set_entry_point("execute")  # Skip monitor/diagnose/plan/approval
    
    resume_workflow.add_node("execute", execute_node)
    resume_workflow.add_node("verify", verify_node)
    resume_workflow.add_node("report", report_node)
    resume_workflow.add_node("escalation", escalation_node)
    
    resume_app = resume_workflow.compile()
    
    12
    REJECTED (Escalate)
    13
    Commands match forbidden patterns or are rejected by human operator.
    14
    if not is_valid:
        return {
            "approval_status": "REJECTED",
            "escalation_reason": reason
        }
    
    15
    Escalation node (src/agent/graph.py:49-55):
    16
    def escalation_node(state: AgentState):
        service = state.get("affected_service", "desconocido")
        reason = state.get("escalation_reason") or "Limite de reintentos alcanzado."
        
        log("escalation", f"FALLA CRITICA en '{service}': {reason}")
        log("escalation", "Se requiere intervencion humana.")
        return {"current_step": "escalation", "escalation_reason": reason}
    

    SSH Security

    Sentinel AI uses Paramiko for secure SSH connections with privilege escalation.

    Connection Security

    Implementation (src/tools/ssh.py:16-31):
    def connect(self):
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.connect(
            hostname=self.hostname,
            port=self.port,
            username=self.username,
            password=self.password,
            key_filename=self.key_filename,
            timeout=10
        )
    
    Security features:
    • Authentication: Password or SSH key
    • Host key policy: AutoAddPolicy() (accepts new hosts)
    • Connection timeout: 10 seconds
    • Non-root user: Connects as sentinel user (from config.SSH_USER)
    The system uses a dedicated sentinel user with limited sudo permissions. The user is not root, requiring explicit sudo for administrative commands.

    Sudo Command Execution

    Implementation (src/tools/ssh.py:33-74):
    def execute_command(self, command: str, use_sudo: bool = False) -> Tuple[int, str, str]:
        if not self.client:
            self.connect()
        
        if use_sudo:
            command = f"sudo -S {command}"
            stdin, stdout, stderr = self.client.exec_command(command, get_pty=True)
            time.sleep(0.3)
            stdin.write(f"{self.password}\n")  # Send password to sudo prompt
            stdin.flush()
        else:
            stdin, stdout, stderr = self.client.exec_command(command)
        
        # Wait for command to finish
        while not stdout.channel.exit_status_ready():
            check_stop()  # Allow graceful interruption
            time.sleep(0.5)
        
        exit_code = stdout.channel.recv_exit_status()
        out = stdout.read().decode().strip()
        err = stderr.read().decode().strip()
        
        # Strip sudo password prompt from output
        if use_sudo and out.startswith("[sudo]"):
            lines = out.split("\n")
            out = "\n".join(lines[1:]).strip()
        
        return exit_code, out, err
    
    Sudo security:
    • PTY allocation: get_pty=True enables interactive sudo
    • Password injection: Password sent to stdin after 300ms delay
    • Output sanitization: Removes [sudo] password for sentinel: prompt
    • Exit code checking: Detects command failures
    Password in memory: The SSH password is stored in plaintext in config.SSH_PASS. Use SSH keys or secret management systems in production.

    Automatic Sudo Injection

    The plan node automatically adds sudo to administrative commands. Implementation (src/agent/nodes/plan.py:13-21):
    def ensure_sudo(command: str) -> str:
        if command.startswith("sudo "):
            return command
        prefixes = ["service ", "kill ", "pkill ", "rm ", "chmod ", "chown ",
                    "apt ", "dpkg ", "nginx", "systemctl ", "fuser ", "docker "]
        for prefix in prefixes:
            if command.strip().startswith(prefix):
                return f"sudo {command.strip()}"
        return command
    

    Memory-Based Prevention

    The system prevents repeating failed commands using episodic memory.

    Failed Command Retrieval

    Implementation (src/core/memory.py:57-71):
    def get_failed_commands(self, error: str) -> List[str]:
        error_lower = error.lower()
        error_keywords = set(error_lower.split())
        failed = set()
        
        for ep in self.episodes:
            if not ep["success"]:
                ep_keywords = set(ep.get("error", "").lower().split())
                if len(error_keywords & ep_keywords) >= 1:  # At least 1 keyword overlap
                    cmd = ep["command"].strip()
                    failed.add(cmd)
        
        return list(failed)
    

    LLM Prompt Injection

    Failed commands are injected into the plan node’s prompt. Implementation (src/agent/nodes/plan.py:31-36):
    failed_commands = memory.get_failed_commands(error)
    failed_str = ""
    if failed_commands:
        failed_str = "\nCOMANDOS QUE YA FALLARON (PROHIBIDO repetirlos):\n"
        for cmd in failed_commands:
            failed_str += f"- {cmd}\n"
    
    # ... included in SystemMessage
    SystemMessage(content=f"...
    {failed_str}
    ...")
    
    The LLM is explicitly instructed to never repeat commands from the failed list, preventing infinite retry loops on known-bad solutions.

    Security Layers Summary

    Layer 1: Forbidden Patterns

    Regex-based rejection of destructive commands (rm -rf /, mkfs, fork bombs)Action: Auto-reject → Escalate

    Layer 2: Critical Patterns

    Manual approval required for risky operations (restart, package install, chmod)Action: Pause → Wait for human approval

    Layer 3: Episodic Memory

    Prevent repeating failures by tracking command history and outcomesAction: LLM prompt injection → Alternative solution

    Layer 4: SSH Security

    Non-root user with sudo, password-protected, PTY-basedAction: Least privilege + command auditing

    Configuration

    Security settings in src/core/config.py:
    SSH_HOST = os.getenv("SSH_HOST", "localhost")
    SSH_PORT = int(os.getenv("SSH_PORT", 2222))
    SSH_USER = os.getenv("SSH_USER", "sentinel")
    SSH_PASS = os.getenv("SSH_PASS")
    
    MAX_RETRIES = 5  # Maximum retry attempts before escalation
    

    Best Practices

    For production deployments, configure key_filename instead of password in the SSH client:
    SSHClient(
        hostname=config.SSH_HOST,
        username=config.SSH_USER,
        key_filename="/path/to/private_key"
    )
    
    Limit the sentinel user’s sudo access to specific commands:
    # /etc/sudoers.d/sentinel
    sentinel ALL=(ALL) NOPASSWD: /usr/bin/service, /usr/bin/systemctl, /usr/bin/nginx
    
    Enable SSH session logging and integrate with SIEM systems to track all executed commands.
    Add organization-specific patterns to FORBIDDEN_PATTERNS based on your infrastructure:
    r"docker\s+rm\s+-f\s+prod-",  # Prevent deleting production containers
    

    Build docs developers (and LLMs) love