Overview
Sentinel AI uses LangGraph to orchestrate a multi-step autonomous workflow for detecting, diagnosing, and recovering from service failures. The workflow is implemented as a state machine with conditional edges that determine the agent’s next action based on execution results.Workflow Architecture
The agent workflow is defined insrc/agent/graph.py and consists of 8 interconnected nodes:
Agent State
All nodes share a typed state dictionary that tracks the workflow’s progress:Workflow Nodes
def monitor_node(state: AgentState) -> Dict[str, Any]:
log("monitor", "Verificando estado de los servicios...")
ssh = get_ssh_client()
services_snapshot = {}
any_failure = None
failed_service = None
for service_name, service_cfg in config.SERVICES.items():
code, out, err = ssh.execute_command(service_cfg["check_command"])
is_running = service_cfg["running_indicator"] in out
status = "running" if is_running else "stopped"
services_snapshot[service_name] = {
"status": status,
"details": out.strip() if not is_running else "Service is active",
"type": service_cfg["type"]
}
if not is_running and not any_failure:
any_failure = f"Servicio '{service_name}' no esta activo."
failed_service = service_name
ssh.close()
if any_failure:
return {
"current_step": "monitor",
"current_error": any_failure,
"affected_service": failed_service
}
def should_monitor_end(state: AgentState):
if not state.get("current_error"):
return "end" # All services healthy
return "continue" # Failure detected → diagnose
def diagnose_node(state: AgentState) -> Dict[str, Any]:
error = state.get("current_error", "")
service = state.get("affected_service", "desconocido")
# Query episodic memory for failed commands
failed_commands = memory.get_failed_commands(error)
memory_context = ""
if failed_commands:
memory_context += "COMANDOS QUE YA FALLARON (NO repetir):\n"
for cmd in failed_commands:
memory_context += f"- {cmd}\n"
# Find similar past incidents
similar = memory.find_similar(error)
if similar and similar["success"]:
memory_context += f"\nSolucion exitosa previa: {similar['command']}\n"
# Query RAG knowledge base
rag_context = ""
if kb:
rag_context = kb.query(f"How to fix: {error}")
# LLM diagnosis
messages = [
SystemMessage(content=(
"Eres Sentinel AI, un agente DevOps autonomo.\n"
"Analiza el error y proporciona un diagnostico BREVE.\n"
f"Servicio afectado: {service}\n"
f"{memory_context}"
f"Documentacion tecnica:\n{rag_context[:1000]}\n"
)),
HumanMessage(content=f"Error: {error}")
]
response = llm.invoke(messages)
diagnosis = response.content.strip()
return {
"current_step": "diagnose",
"diagnosis_log": state.get("diagnosis_log", []) + [diagnosis],
"memory_consulted": True
}
def plan_node(state: AgentState) -> Dict[str, Any]:
diagnosis = state.get("diagnosis_log", [])[-1]
error = state.get("current_error", "desconocido")
service = state.get("affected_service", "desconocido")
# Retrieve failed commands from memory
failed_commands = memory.get_failed_commands(error)
failed_str = ""
if failed_commands:
failed_str = "\nCOMANDOS QUE YA FALLARON (PROHIBIDO repetirlos):\n"
for cmd in failed_commands:
failed_str += f"- {cmd}\n"
messages = [
SystemMessage(content=(
"Eres un motor de automatizacion DevOps.\n"
"Genera un plan de remediacion con 1 a 3 comandos de shell.\n"
f"- Servicio afectado: {service}\n"
"- Te conectas via SSH como usuario 'sentinel' (NO root)\n"
"- TODOS los comandos de administracion necesitan 'sudo'\n"
"REGLAS:\n"
"1. Genera entre 1 y 3 comandos, uno por linea.\n"
"2. TODOS los comandos de admin llevan 'sudo'.\n"
"3. NUNCA repitas un comando que ya fallo.\n"
f"{failed_str}"
)),
HumanMessage(content=f"Error: {error}\nDiagnostico: {diagnosis}")
]
response = llm.invoke(messages)
raw = response.content.strip().replace("`", "")
commands = [ensure_sudo(cmd) for cmd in raw.split("\n") if cmd.strip()]
return {
"current_step": "plan",
"candidate_plan": "\n".join(commands),
"approval_status": "PENDING"
}
def approve_node(state: AgentState) -> Dict[str, Any]:
plan = state.get("candidate_plan", "")
commands = [cmd.strip() for cmd in plan.split("\n") if cmd.strip()]
security_flags = []
has_critical = False
for cmd in commands:
# Check against forbidden patterns
is_valid, reason = validate_command(cmd)
if not is_valid:
return {
"current_step": "approval",
"approval_status": "REJECTED",
"security_flags": [f"BLOQUEADO: {reason}"],
"escalation_reason": reason
}
# Check if command is critical (requires manual approval)
if is_critical(cmd):
has_critical = True
security_flags.append(f"CRITICO: {cmd}")
if has_critical:
return {
"current_step": "approval",
"approval_status": "WAITING_APPROVAL",
"security_flags": security_flags
}
else:
return {
"current_step": "approval",
"approval_status": "APPROVED",
"security_flags": security_flags
}
def should_approve_continue(state: AgentState):
status = state.get("approval_status")
if status == "REJECTED":
return "escalate" # Security violation
elif status == "WAITING_APPROVAL":
return "end" # Pause for human review
return "execute" # Auto-approved
def execute_node(state: AgentState) -> Dict[str, Any]:
plan = state.get("candidate_plan", "")
commands = [cmd.strip() for cmd in plan.split("\n") if cmd.strip()]
ssh = get_ssh_client()
all_results = []
overall_success = True
for i, command in enumerate(commands):
needs_sudo = command.strip().startswith("sudo")
clean = command.replace("sudo ", "", 1) if needs_sudo else command
code, out, err = ssh.execute_command(clean, use_sudo=needs_sudo)
result_str = f"[{command}] codigo:{code}"
if out:
result_str += f" salida:{out[:200]}"
if err:
result_str += f" error:{err[:200]}"
all_results.append(result_str)
if code != 0:
overall_success = False
ssh.close()
# Save to episodic memory
memory.save_episode(
error=state.get("current_error", ""),
diagnosis=state.get("diagnosis_log", [""])[-1],
command=plan,
result=" | ".join(all_results),
success=overall_success
)
return {
"current_step": "execute",
"diagnosis_log": state.get("diagnosis_log", []) + [" | ".join(all_results)]
}
def verify_node(state: AgentState) -> Dict[str, Any]:
service = state.get("affected_service", "")
service_cfg = config.SERVICES.get(service, {})
ssh = get_ssh_client()
code, out, err = ssh.execute_command(service_cfg["check_command"])
ssh.close()
if service_cfg["running_indicator"] in out:
log("verify", f"Servicio '{service}' RECUPERADO.")
return {"current_step": "verify", "current_error": None}
else:
retry = state.get("retry_count", 0) + 1
return {
"current_step": "verify",
"current_error": state.get("current_error"),
"retry_count": retry
}
def should_verify_end(state: AgentState):
if not state.get("current_error"):
return "end" # Success → report
retry_count = state.get("retry_count", 0)
if retry_count >= config.MAX_RETRIES: # Default: 5
return "escalate" # Max retries exceeded
return "retry" # Try again → diagnose
def report_node(state: AgentState):
service = state.get("affected_service", "desconocido")
command = state.get("candidate_plan", "N/A")
attempts = state.get("retry_count", 0) + 1
log("report", f"EXITO: Servicio '{service}' recuperado en {attempts} intento(s).")
log("report", f"Solucion aplicada: {command}")
return {"current_step": "report"}
def escalation_node(state: AgentState):
service = state.get("affected_service", "desconocido")
reason = state.get("escalation_reason") or "Limite de reintentos alcanzado."
log("escalation", f"FALLA CRITICA en '{service}': {reason}")
log("escalation", "Se requiere intervencion humana.")
return {"current_step": "escalation", "escalation_reason": reason}
Workflow Compilation
The workflow is compiled into two executable graphs:Execution Flow Diagram
Normal Recovery
Monitor → Diagnose → Plan → Approval → Execute → Verify → Report
Retry Loop
Verify (failed) → Diagnose → Plan → Approval → Execute → Verify
Manual Review
Plan → Approval (WAITING) → [Paused] → Resume → Execute
Escalation
Approval (REJECTED) → Escalation OR Verify (max retries) → Escalation
Key Features
Conditional Branching: The workflow uses intelligent routing based on execution results, security validation, and retry limits.
State Persistence: All intermediate results are stored in the
AgentState dictionary, enabling pause/resume functionality.Configuration
Workflow behavior is controlled viasrc/core/config.py:36:
Related Documentation
- RAG System - How the diagnose node queries technical documentation
- Security - Command validation in the approval node
- Knowledge Base - RAG data sources and indexing