Skip to main content

Overview

Agent nodes are Python functions that implement each step of the recovery workflow. Each node receives the current AgentState and returns a dictionary of state updates.

Node Function Signature

All nodes follow this pattern:
from typing import Dict, Any
from src.agent.state import AgentState

def example_node(state: AgentState) -> Dict[str, Any]:
    # Access state
    current_error = state.get("current_error")
    
    # Perform logic
    # ...
    
    # Return state updates
    return {
        "current_step": "example",
        "some_field": "updated_value"
    }

monitor_node

Checks the health of all configured services via SSH.

Import

from src.agent.nodes import monitor_node

Function Signature

def monitor_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Current workflow state (typically empty on first run)
current_step
str
Set to "monitor"
current_error
str | None
Error message if a service is down, otherwise None
affected_service
str | None
Name of the failed service (e.g., "nginx", "postgres") or None

Behavior

  1. Connects to the target server via SSH using credentials from config
  2. Iterates through all services in config.SERVICES
  3. Executes each service’s check_command
  4. Verifies output contains the running_indicator string
  5. Returns first detected failure or None if all services are healthy

Example Usage

from src.agent.nodes import monitor_node

initial_state = {
    "current_step": "",
    "current_error": None
}

result = monitor_node(initial_state)

if result["current_error"]:
    print(f"Service {result['affected_service']} is down!")
    print(f"Error: {result['current_error']}")
else:
    print("All services healthy")

Service Configuration

Services are defined in config.SERVICES:
SERVICES = {
    "nginx": {
        "type": "web_server",
        "check_command": "service nginx status",
        "running_indicator": "is running"
    },
    "postgres": {
        "type": "database",
        "check_command": "pg_isready",
        "running_indicator": "accepting connections"
    }
}
The monitor node emits status_update events via the event bus for real-time UI updates.

diagnose_node

Analyzes the detected error using LLM reasoning, memory lookups, and RAG retrieval.

Import

from src.agent.nodes import diagnose_node

Function Signature

def diagnose_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Must contain current_error and affected_service
current_step
str
Set to "diagnose"
diagnosis_log
List[str]
Appends the new diagnosis to the existing log
memory_consulted
bool
True if memory or RAG was used in the diagnosis

Behavior

  1. Memory Lookup: Queries episodic memory for similar past errors
  2. RAG Query: Searches vector knowledge base for relevant documentation
  3. LLM Analysis: Generates diagnosis using GPT-4 with context
  4. Logs Results: Appends diagnosis to diagnosis_log

Example Usage

from src.agent.nodes import diagnose_node

state_with_error = {
    "current_error": "nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)",
    "affected_service": "nginx",
    "diagnosis_log": [],
    "retry_count": 0
}

result = diagnose_node(state_with_error)

print(result["diagnosis_log"][-1])
# Output: "Port 80 is occupied by another process. Likely cause: stale nginx process or conflicting service."

Context Sources

The diagnosis integrates three sources:

Memory

Past solutions and failed commands

RAG

Official documentation embeddings

LLM

GPT-4 reasoning and synthesis

LLM Prompt Structure

messages = [
    SystemMessage(content=(
        "Eres Sentinel AI, un agente DevOps autonomo.\n"
        "Analiza el error y proporciona un diagnostico BREVE (maximo 3 lineas).\n"
        f"Servicio afectado: {service}\n"
        f"Historial: {prior_logs[-3:]}\n"
        f"{memory_context}\n"
        f"Documentacion: {rag_context[:1000]}\n"
    )),
    HumanMessage(content=f"Error: {error}")
]
The node instructs the LLM NOT to suggest commands that previously failed (from memory context).

plan_node

Generates 1-3 remediation commands based on the diagnosis.

Import

from src.agent.nodes import plan_node

Function Signature

def plan_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Must contain diagnosis_log with at least one diagnosis
current_step
str
Set to "plan"
candidate_plan
str
Newline-separated list of shell commands
approval_status
str
Set to "PENDING" for security validation

Behavior

  1. Extracts the latest diagnosis from diagnosis_log
  2. Queries memory for previously failed commands
  3. Prompts LLM to generate 1-3 remediation commands
  4. Auto-adds sudo prefix where needed via ensure_sudo()
  5. Returns commands as a newline-separated string

Example Usage

from src.agent.nodes import plan_node

state_with_diagnosis = {
    "current_error": "nginx port conflict",
    "affected_service": "nginx",
    "diagnosis_log": ["Port 80 occupied by stale process"],
    "retry_count": 0
}

result = plan_node(state_with_diagnosis)

print(result["candidate_plan"])
# Output:
# sudo ss -tulpn | grep :80
# sudo pkill -9 nginx
# sudo service nginx start

Command Generation Rules

The LLM is instructed to follow strict rules:
1
rule
Generate 1-3 commands, one per line
2
rule
NO command chaining with &&, ||, or ;
3
rule
ALL admin commands must use sudo
4
rule
Return ONLY commands, no explanations or backticks
5
rule
NEVER repeat commands that previously failed
6
rule
Base solution on the provided diagnosis
7
rule
Use -y or --yes flags for non-interactive execution

Auto-Sudo Logic

def ensure_sudo(command: str) -> str:
    if command.startswith("sudo "):
        return command
    
    prefixes = ["service ", "kill ", "pkill ", "rm ", "chmod ",
                "chown ", "apt ", "dpkg ", "nginx", "systemctl ",
                "fuser ", "docker "]
    
    for prefix in prefixes:
        if command.strip().startswith(prefix):
            return f"sudo {command.strip()}"
    
    return command
If the LLM fails to generate valid commands, the node falls back to sudo service {service} restart.

approve_node

Validates the security of generated commands before execution.

Import

from src.agent.nodes import approve_node

Function Signature

def approve_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Must contain candidate_plan with commands to validate
current_step
str
Set to "approval"
approval_status
str
One of: "APPROVED", "WAITING_APPROVAL", "REJECTED"
security_flags
List[str]
List of security warnings or blocks
escalation_reason
str | None
Reason for rejection if status is "REJECTED"

Behavior

  1. Splits candidate_plan into individual commands
  2. Validates each command using validate_command() from core.security
  3. Checks if commands are critical using is_critical()
  4. Returns approval status based on findings

Example Usage

from src.agent.nodes import approve_node

state_with_plan = {
    "candidate_plan": "sudo service nginx restart\nsudo apt-get update",
    "affected_service": "nginx"
}

result = approve_node(state_with_plan)

if result["approval_status"] == "APPROVED":
    print("Commands approved for automatic execution")
elif result["approval_status"] == "WAITING_APPROVAL":
    print("Critical commands detected, manual approval required")
    print(result["security_flags"])
else:  # REJECTED
    print(f"Commands blocked: {result['escalation_reason']}")

Security Validation

Commands are checked against a blacklist and pattern rules:
from src.core.security import validate_command, is_critical

is_valid, reason = validate_command("sudo rm -rf /var/log/*")
if not is_valid:
    print(f"Blocked: {reason}")
    # Blocked: Destructive rm -rf pattern detected

if is_critical("sudo systemctl stop postgres"):
    print("Requires manual approval")
Commands containing destructive patterns like rm -rf /, dd if=, or mkfs are automatically REJECTED.

execute_node

Executes approved commands via SSH and records results in memory.

Import

from src.agent.nodes import execute_node

Function Signature

def execute_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Must contain candidate_plan and approval_status == "APPROVED"
current_step
str
Set to "execute"
diagnosis_log
List[str]
Appends execution results to the log

Behavior

  1. Verifies approval_status == "APPROVED"
  2. Connects to target server via SSH
  3. Executes each command sequentially
  4. Captures exit codes, stdout, and stderr
  5. Saves episode to memory with success/failure status
  6. Closes SSH connection

Example Usage

from src.agent.nodes import execute_node

approved_state = {
    "candidate_plan": "sudo service nginx stop\nsudo service nginx start",
    "approval_status": "APPROVED",
    "current_error": "nginx port conflict",
    "diagnosis_log": ["Port occupied, restart required"]
}

result = execute_node(approved_state)

print(result["diagnosis_log"][-1])
# Output: "[sudo service nginx stop] codigo:0 salida:OK | [sudo service nginx start] codigo:0 salida:Starting nginx"

Command Execution

Each command is executed with proper sudo handling:
for command in commands:
    needs_sudo = command.startswith("sudo")
    clean = command.replace("sudo ", "", 1) if needs_sudo else command
    
    code, out, err = ssh.execute_command(clean, use_sudo=needs_sudo)
    
    if code != 0:
        overall_success = False
        log("error", f"Command failed with exit code {code}")

Memory Persistence

After execution, the episode is saved:
memory.save_episode(
    error=state["current_error"],
    diagnosis=state["diagnosis_log"][-1],
    command=candidate_plan,
    result=" | ".join(all_results),
    success=overall_success
)
Even failed executions are saved to memory to prevent repeating the same mistakes.

verify_node

Confirms whether the service has recovered after command execution.

Import

from src.agent.nodes import verify_node

Function Signature

def verify_node(state: AgentState) -> Dict[str, Any]
state
AgentState
required
Must contain affected_service matching a key in config.SERVICES
current_step
str
Set to "verify"
current_error
str | None
None if service recovered, otherwise original error
retry_count
int
Incremented by 1 if service is still down

Behavior

  1. Looks up service configuration from config.SERVICES
  2. Executes the service’s check_command via SSH
  3. Verifies output contains running_indicator
  4. Returns current_error: None if recovered
  5. Increments retry_count if still down

Example Usage

from src.agent.nodes import verify_node

state_after_execution = {
    "affected_service": "nginx",
    "current_error": "nginx port conflict",
    "retry_count": 0
}

result = verify_node(state_after_execution)

if result["current_error"] is None:
    print("Service recovered successfully!")
else:
    print(f"Still down, retry {result['retry_count']}/{config.MAX_RETRIES}")

Retry Logic

The workflow uses retry_count to limit recovery attempts:
if retry_count >= config.MAX_RETRIES:
    # Escalate to human
    return "escalate"
else:
    # Retry diagnosis
    return "retry"
Set config.MAX_RETRIES to control how many recovery cycles run before escalation.

Helper Nodes

report_node

Logs successful recovery details.
def report_node(state: AgentState):
    service = state.get("affected_service", "desconocido")
    error = state.get("current_error")
    command = state.get("candidate_plan", "N/A")
    attempts = state.get("retry_count", 0) + 1

    if not error:
        log("report", f"EXITO: Servicio '{service}' recuperado en {attempts} intento(s).")
        log("report", f"Solucion aplicada: {command}")
    
    return {"current_step": "report"}

escalation_node

Triggers human intervention for critical failures.
def escalation_node(state: AgentState):
    service = state.get("affected_service", "desconocido")
    reason = state.get("escalation_reason") or "Limite de reintentos alcanzado."
    
    log("escalation", f"FALLA CRITICA en '{service}': {reason}")
    log("escalation", "Se requiere intervencion humana.")
    
    return {
        "current_step": "escalation",
        "escalation_reason": reason
    }

Node Composition

You can create custom nodes that compose existing ones:
def intelligent_retry_node(state: AgentState) -> Dict[str, Any]:
    """Combines diagnosis and planning with backoff logic."""
    retry_count = state.get("retry_count", 0)
    
    # Progressive backoff
    if retry_count > 0:
        import time
        time.sleep(2 ** retry_count)  # 2s, 4s, 8s...
    
    # Re-diagnose
    state = {**state, **diagnose_node(state)}
    
    # Generate new plan
    state = {**state, **plan_node(state)}
    
    return state

Testing Nodes

Nodes are pure functions and easy to test:
import pytest
from src.agent.nodes import monitor_node

def test_monitor_detects_failure(monkeypatch):
    # Mock SSH client
    class MockSSH:
        def execute_command(self, cmd):
            return (1, "nginx is not running", "")
        def close(self):
            pass
    
    monkeypatch.setattr("src.agent.nodes.monitor.SSHClient", MockSSH)
    
    result = monitor_node({})
    
    assert result["current_error"] is not None
    assert result["affected_service"] == "nginx"

Best Practices

  1. Stateless Logic: Nodes should not maintain internal state between calls
  2. Idempotency: Design nodes to be safely re-executable
  3. Error Handling: Wrap SSH/API calls in try-except blocks
  4. Logging: Use log() from event_bus for observability
  5. Type Safety: Always use .get() for state access with defaults

Agent Graph

Orchestrate nodes into workflows

SSH Client

Execute remote commands in nodes

Knowledge Base

Query documentation in diagnose_node

Memory System

Store and retrieve past episodes

Build docs developers (and LLMs) love