Skip to main content

Overview

Sentinel AI uses LangGraph to orchestrate a multi-step autonomous workflow for detecting, diagnosing, and recovering from service failures. The workflow is implemented as a state machine with conditional edges that determine the agent’s next action based on execution results.

Workflow Architecture

The agent workflow is defined in src/agent/graph.py and consists of 8 interconnected nodes:
from langgraph.graph import StateGraph, END
from .state import AgentState

workflow = StateGraph(AgentState)

workflow.add_node("monitor", monitor_node)
workflow.add_node("diagnose", diagnose_node)
workflow.add_node("plan", plan_node)
workflow.add_node("approval", approve_node)
workflow.add_node("execute", execute_node)
workflow.add_node("verify", verify_node)
workflow.add_node("report", report_node)
workflow.add_node("escalation", escalation_node)

workflow.set_entry_point("monitor")

Agent State

All nodes share a typed state dictionary that tracks the workflow’s progress:
class AgentState(TypedDict):
    messages: List[str]
    current_step: str
    current_error: Optional[str]
    affected_service: Optional[str]
    diagnosis_log: List[str]
    candidate_plan: Optional[str]
    approval_status: str
    retry_count: int
    memory_consulted: bool
    security_flags: List[str]
    escalation_reason: Optional[str]

Workflow Nodes

1
Monitor Node
2
Connects via SSH to check service health status.
3
Implementation (src/agent/nodes/monitor.py:18-60):
4
def monitor_node(state: AgentState) -> Dict[str, Any]:
    log("monitor", "Verificando estado de los servicios...")
    
    ssh = get_ssh_client()
    services_snapshot = {}
    any_failure = None
    failed_service = None
    
    for service_name, service_cfg in config.SERVICES.items():
        code, out, err = ssh.execute_command(service_cfg["check_command"])
        is_running = service_cfg["running_indicator"] in out
        
        status = "running" if is_running else "stopped"
        services_snapshot[service_name] = {
            "status": status,
            "details": out.strip() if not is_running else "Service is active",
            "type": service_cfg["type"]
        }
        
        if not is_running and not any_failure:
            any_failure = f"Servicio '{service_name}' no esta activo."
            failed_service = service_name
    
    ssh.close()
    
    if any_failure:
        return {
            "current_step": "monitor",
            "current_error": any_failure,
            "affected_service": failed_service
        }
5
Conditional Edge:
6
def should_monitor_end(state: AgentState):
    if not state.get("current_error"):
        return "end"  # All services healthy
    return "continue"  # Failure detected → diagnose
7
Diagnose Node
8
Analyzes the failure using LLM + RAG + Memory.
9
Implementation (src/agent/nodes/diagnose.py:14-70):
10
def diagnose_node(state: AgentState) -> Dict[str, Any]:
    error = state.get("current_error", "")
    service = state.get("affected_service", "desconocido")
    
    # Query episodic memory for failed commands
    failed_commands = memory.get_failed_commands(error)
    memory_context = ""
    if failed_commands:
        memory_context += "COMANDOS QUE YA FALLARON (NO repetir):\n"
        for cmd in failed_commands:
            memory_context += f"- {cmd}\n"
    
    # Find similar past incidents
    similar = memory.find_similar(error)
    if similar and similar["success"]:
        memory_context += f"\nSolucion exitosa previa: {similar['command']}\n"
    
    # Query RAG knowledge base
    rag_context = ""
    if kb:
        rag_context = kb.query(f"How to fix: {error}")
    
    # LLM diagnosis
    messages = [
        SystemMessage(content=(
            "Eres Sentinel AI, un agente DevOps autonomo.\n"
            "Analiza el error y proporciona un diagnostico BREVE.\n"
            f"Servicio afectado: {service}\n"
            f"{memory_context}"
            f"Documentacion tecnica:\n{rag_context[:1000]}\n"
        )),
        HumanMessage(content=f"Error: {error}")
    ]
    
    response = llm.invoke(messages)
    diagnosis = response.content.strip()
    
    return {
        "current_step": "diagnose",
        "diagnosis_log": state.get("diagnosis_log", []) + [diagnosis],
        "memory_consulted": True
    }
11
Plan Node
12
Generates 1-3 remediation commands using LLM.
13
Implementation (src/agent/nodes/plan.py:24-85):
14
def plan_node(state: AgentState) -> Dict[str, Any]:
    diagnosis = state.get("diagnosis_log", [])[-1]
    error = state.get("current_error", "desconocido")
    service = state.get("affected_service", "desconocido")
    
    # Retrieve failed commands from memory
    failed_commands = memory.get_failed_commands(error)
    failed_str = ""
    if failed_commands:
        failed_str = "\nCOMANDOS QUE YA FALLARON (PROHIBIDO repetirlos):\n"
        for cmd in failed_commands:
            failed_str += f"- {cmd}\n"
    
    messages = [
        SystemMessage(content=(
            "Eres un motor de automatizacion DevOps.\n"
            "Genera un plan de remediacion con 1 a 3 comandos de shell.\n"
            f"- Servicio afectado: {service}\n"
            "- Te conectas via SSH como usuario 'sentinel' (NO root)\n"
            "- TODOS los comandos de administracion necesitan 'sudo'\n"
            "REGLAS:\n"
            "1. Genera entre 1 y 3 comandos, uno por linea.\n"
            "2. TODOS los comandos de admin llevan 'sudo'.\n"
            "3. NUNCA repitas un comando que ya fallo.\n"
            f"{failed_str}"
        )),
        HumanMessage(content=f"Error: {error}\nDiagnostico: {diagnosis}")
    ]
    
    response = llm.invoke(messages)
    raw = response.content.strip().replace("`", "")
    commands = [ensure_sudo(cmd) for cmd in raw.split("\n") if cmd.strip()]
    
    return {
        "current_step": "plan",
        "candidate_plan": "\n".join(commands),
        "approval_status": "PENDING"
    }
15
Approval Node
16
Validates commands against security policies.
17
Implementation (src/agent/nodes/approve.py:7-46):
18
def approve_node(state: AgentState) -> Dict[str, Any]:
    plan = state.get("candidate_plan", "")
    commands = [cmd.strip() for cmd in plan.split("\n") if cmd.strip()]
    security_flags = []
    has_critical = False
    
    for cmd in commands:
        # Check against forbidden patterns
        is_valid, reason = validate_command(cmd)
        if not is_valid:
            return {
                "current_step": "approval",
                "approval_status": "REJECTED",
                "security_flags": [f"BLOQUEADO: {reason}"],
                "escalation_reason": reason
            }
        
        # Check if command is critical (requires manual approval)
        if is_critical(cmd):
            has_critical = True
            security_flags.append(f"CRITICO: {cmd}")
    
    if has_critical:
        return {
            "current_step": "approval",
            "approval_status": "WAITING_APPROVAL",
            "security_flags": security_flags
        }
    else:
        return {
            "current_step": "approval",
            "approval_status": "APPROVED",
            "security_flags": security_flags
        }
19
Conditional Edge:
20
def should_approve_continue(state: AgentState):
    status = state.get("approval_status")
    if status == "REJECTED":
        return "escalate"  # Security violation
    elif status == "WAITING_APPROVAL":
        return "end"  # Pause for human review
    return "execute"  # Auto-approved
21
Execute Node
22
Runs approved commands via SSH and saves results to memory.
23
Implementation (src/agent/nodes/execute.py:19-92):
24
def execute_node(state: AgentState) -> Dict[str, Any]:
    plan = state.get("candidate_plan", "")
    commands = [cmd.strip() for cmd in plan.split("\n") if cmd.strip()]
    
    ssh = get_ssh_client()
    all_results = []
    overall_success = True
    
    for i, command in enumerate(commands):
        needs_sudo = command.strip().startswith("sudo")
        clean = command.replace("sudo ", "", 1) if needs_sudo else command
        
        code, out, err = ssh.execute_command(clean, use_sudo=needs_sudo)
        
        result_str = f"[{command}] codigo:{code}"
        if out:
            result_str += f" salida:{out[:200]}"
        if err:
            result_str += f" error:{err[:200]}"
        
        all_results.append(result_str)
        
        if code != 0:
            overall_success = False
    
    ssh.close()
    
    # Save to episodic memory
    memory.save_episode(
        error=state.get("current_error", ""),
        diagnosis=state.get("diagnosis_log", [""])[-1],
        command=plan,
        result=" | ".join(all_results),
        success=overall_success
    )
    
    return {
        "current_step": "execute",
        "diagnosis_log": state.get("diagnosis_log", []) + [" | ".join(all_results)]
    }
25
Verify Node
26
Re-checks service health to confirm recovery.
27
Implementation (src/agent/nodes/verify.py:17-52):
28
def verify_node(state: AgentState) -> Dict[str, Any]:
    service = state.get("affected_service", "")
    service_cfg = config.SERVICES.get(service, {})
    
    ssh = get_ssh_client()
    code, out, err = ssh.execute_command(service_cfg["check_command"])
    ssh.close()
    
    if service_cfg["running_indicator"] in out:
        log("verify", f"Servicio '{service}' RECUPERADO.")
        return {"current_step": "verify", "current_error": None}
    else:
        retry = state.get("retry_count", 0) + 1
        return {
            "current_step": "verify",
            "current_error": state.get("current_error"),
            "retry_count": retry
        }
29
Conditional Edge:
30
def should_verify_end(state: AgentState):
    if not state.get("current_error"):
        return "end"  # Success → report
    retry_count = state.get("retry_count", 0)
    if retry_count >= config.MAX_RETRIES:  # Default: 5
        return "escalate"  # Max retries exceeded
    return "retry"  # Try again → diagnose
31
Report Node
32
Logs successful recovery with resolution details.
33
Implementation (src/agent/graph.py:37-46):
34
def report_node(state: AgentState):
    service = state.get("affected_service", "desconocido")
    command = state.get("candidate_plan", "N/A")
    attempts = state.get("retry_count", 0) + 1
    
    log("report", f"EXITO: Servicio '{service}' recuperado en {attempts} intento(s).")
    log("report", f"Solucion aplicada: {command}")
    return {"current_step": "report"}
35
Escalation Node
36
Alerts for critical failures requiring human intervention.
37
Implementation (src/agent/graph.py:49-55):
38
def escalation_node(state: AgentState):
    service = state.get("affected_service", "desconocido")
    reason = state.get("escalation_reason") or "Limite de reintentos alcanzado."
    
    log("escalation", f"FALLA CRITICA en '{service}': {reason}")
    log("escalation", "Se requiere intervencion humana.")
    return {"current_step": "escalation", "escalation_reason": reason}

Workflow Compilation

The workflow is compiled into two executable graphs:
# Main workflow (starts at monitor)
app = workflow.compile()

# Resume workflow (starts at execute, for manual approval continuation)
resume_workflow = StateGraph(AgentState)
resume_workflow.set_entry_point("execute")
resume_app = resume_workflow.compile()

Execution Flow Diagram

Normal Recovery

Monitor → Diagnose → Plan → Approval → Execute → Verify → Report

Retry Loop

Verify (failed) → Diagnose → Plan → Approval → Execute → Verify

Manual Review

Plan → Approval (WAITING) → [Paused] → Resume → Execute

Escalation

Approval (REJECTED) → Escalation OR Verify (max retries) → Escalation

Key Features

Conditional Branching: The workflow uses intelligent routing based on execution results, security validation, and retry limits.
State Persistence: All intermediate results are stored in the AgentState dictionary, enabling pause/resume functionality.
Memory Integration: The diagnose and plan nodes consult episodic memory to avoid repeating failed commands.
Retry Limits: The workflow enforces a maximum of 5 retry attempts (configurable via config.MAX_RETRIES) before escalating to human operators.

Configuration

Workflow behavior is controlled via src/core/config.py:36:
MAX_RETRIES = 5  # Maximum retry attempts before escalation
MONITOR_INTERVAL = 30  # Seconds between monitoring cycles
  • RAG System - How the diagnose node queries technical documentation
  • Security - Command validation in the approval node
  • Knowledge Base - RAG data sources and indexing

Build docs developers (and LLMs) love