Overview
The SSHClient class provides a secure interface for executing shell commands on remote servers. It wraps the Paramiko library and includes built-in sudo handling, connection management, and interruption support.
Import
from src.tools.ssh import SSHClient
SSHClient
Constructor
class SSHClient :
def __init__ (
self ,
hostname : str ,
username : str ,
password : Optional[ str ] = None ,
key_filename : Optional[ str ] = None ,
port : int = 22
)
Initializes an SSH client instance without establishing a connection.
IP address or hostname of the remote server
SSH username for authentication
Password for authentication (required if key_filename not provided)
Path to SSH private key file for key-based authentication
Attributes
Stored password (if provided)
Stored key file path (if provided)
client
paramiko.SSHClient | None
Paramiko SSH client instance (initialized after connect())
Example
from src.tools.ssh import SSHClient
# Password authentication
ssh = SSHClient(
hostname = "192.168.1.100" ,
username = "sentinel" ,
password = "secure_password" ,
port = 22
)
# Key-based authentication
ssh_key = SSHClient(
hostname = "production.server.com" ,
username = "deploy" ,
key_filename = "/home/user/.ssh/id_rsa"
)
Methods
connect
def connect ( self ) -> None
Establishes an SSH connection to the remote server.
Behavior:
Creates a new paramiko.SSHClient instance
Sets AutoAddPolicy to automatically accept unknown host keys
Connects using provided credentials (password or key file)
Sets a 10-second connection timeout
Prints connection status to stdout
Raises:
paramiko.AuthenticationException: Invalid credentials
paramiko.SSHException: SSH protocol errors
socket.timeout: Connection timeout
Example:
from src.tools.ssh import SSHClient
ssh = SSHClient(
hostname = "192.168.1.100" ,
username = "sentinel" ,
password = "my_password"
)
try :
ssh.connect()
# [SSH] Conexion establecida con [email protected] :22
except Exception as e:
print ( f "Connection failed: { e } " )
The AutoAddPolicy automatically accepts host keys without verification. For production, use RejectPolicy and manage known_hosts explicitly.
execute_command
def execute_command (
self ,
command : str ,
use_sudo : bool = False
) -> Tuple[ int , str , str ]
Executes a shell command on the remote server.
Whether to execute with sudo privileges
Command exit code (0 = success)
Standard output (trimmed and decoded)
Behavior:
Auto-connects if not already connected
Checks for interruption signals via check_stop()
If use_sudo=True:
Prefixes command with sudo -S
Requests a PTY (pseudo-terminal)
Automatically sends the password via stdin
Strips the [sudo] password prompt from output
Waits for command completion while checking for interruptions
Returns exit code and output streams
Example (Basic):
ssh = SSHClient( hostname = "192.168.1.100" , username = "sentinel" , password = "pass" )
ssh.connect()
code, out, err = ssh.execute_command( "uptime" )
print ( f "Exit Code: { code } " )
print ( f "Output: { out } " )
# Exit Code: 0
# Output: 14:23:45 up 3 days, 2:15, 1 user, load average: 0.52, 0.58, 0.59
Example (With Sudo):
ssh = SSHClient( hostname = "192.168.1.100" , username = "sentinel" , password = "pass" )
ssh.connect()
code, out, err = ssh.execute_command( "service nginx status" , use_sudo = True )
if "is running" in out:
print ( "Nginx is active" )
else :
print ( f "Nginx error: { err } " )
Example (Error Handling):
code, out, err = ssh.execute_command( "cat /nonexistent_file" )
if code != 0 :
print ( f "Command failed with code { code } " )
print ( f "Error: { err } " )
# Command failed with code 1
# Error: cat: /nonexistent_file: No such file or directory
Sudo Behavior
When use_sudo=True:
# Internal flow:
command = "service nginx restart"
# 1. Command is prefixed
full_command = f "sudo -S { command } " # "sudo -S service nginx restart"
# 2. PTY is requested for password input
stdin, stdout, stderr = client.exec_command(full_command, get_pty = True )
# 3. Password is sent automatically
time.sleep( 0.3 ) # Wait for prompt
stdin.write( f " { self .password } \n " )
stdin.flush()
# 4. Output is cleaned (removes [sudo] password prompt)
out = stdout.read().decode().strip()
if out.startswith( "[sudo]" ):
out = " \n " .join(out.split( " \n " )[ 1 :]).strip()
The 300ms sleep ensures the sudo password prompt is ready before sending credentials. This prevents “sudo: no password was provided” errors.
Interruption Handling
The method respects interruption signals:
while not stdout.channel.exit_status_ready():
try :
check_stop() # Raises exception if stop requested
except Exception :
print ( "[SSH] Interrupcion solicitada. Cerrando conexion." )
if self .client:
self .client.close()
raise
time.sleep( 0.5 )
This allows graceful cancellation of long-running commands:
import signal
import sys
def signal_handler ( sig , frame ):
print ( " \n Interrupt received, stopping..." )
sys.exit( 0 )
signal.signal(signal. SIGINT , signal_handler)
# Long-running command
code, out, err = ssh.execute_command( "sleep 300" )
# Press Ctrl+C to interrupt
# [SSH] Interrupcion solicitada. Cerrando conexion.
close
Closes the SSH connection and releases resources.
Example:
ssh = SSHClient( hostname = "192.168.1.100" , username = "sentinel" , password = "pass" )
ssh.connect()
# Execute commands
ssh.execute_command( "uptime" )
ssh.execute_command( "df -h" )
# Clean up
ssh.close()
Always call close() when done to prevent connection leaks. Use context managers for automatic cleanup (see Advanced Usage).
Usage Patterns
Single Command
from src.tools.ssh import SSHClient
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "pass" )
ssh.connect()
code, out, err = ssh.execute_command( "hostname" )
print ( f "Server: { out } " )
ssh.close()
Multiple Commands
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "pass" )
ssh.connect()
commands = [
"sudo service nginx stop" ,
"sudo pkill -9 nginx" ,
"sudo service nginx start"
]
for cmd in commands:
code, out, err = ssh.execute_command(cmd, use_sudo = True )
if code != 0 :
print ( f "Failed: { cmd } " )
print ( f "Error: { err } " )
break
print ( f "Success: { cmd } " )
ssh.close()
Service Health Check
def check_service_status ( ssh : SSHClient, service : str ) -> bool :
code, out, err = ssh.execute_command( f "service { service } status" , use_sudo = True )
return "is running" in out
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "pass" )
ssh.connect()
services = [ "nginx" , "postgres" , "docker" ]
for service in services:
status = "UP" if check_service_status(ssh, service) else "DOWN"
print ( f " { service } : { status } " )
ssh.close()
Integration with Agent Nodes
The SSH client is used throughout agent nodes:
monitor_node
from src.tools.ssh import SSHClient
from src.core.config import config
def get_ssh_client ():
return SSHClient(
hostname = config. SSH_HOST ,
port = config. SSH_PORT ,
username = config. SSH_USER ,
password = config. SSH_PASS
)
def monitor_node ( state ):
ssh = get_ssh_client()
for service_name, service_cfg in config. SERVICES .items():
code, out, err = ssh.execute_command(service_cfg[ "check_command" ])
is_running = service_cfg[ "running_indicator" ] in out
if not is_running:
ssh.close()
return {
"current_error" : f "Service { service_name } is down" ,
"affected_service" : service_name
}
ssh.close()
return { "current_error" : None }
execute_node
def execute_node ( state ):
plan = state.get( "candidate_plan" , "" )
commands = plan.split( " \n " )
ssh = get_ssh_client()
results = []
for command in commands:
needs_sudo = command.startswith( "sudo" )
clean = command.replace( "sudo " , "" , 1 ) if needs_sudo else command
code, out, err = ssh.execute_command(clean, use_sudo = needs_sudo)
results.append( f "[ { command } ] code: { code } out: { out[: 100 ] } " )
if code != 0 :
break
ssh.close()
return { "diagnosis_log" : results}
Advanced Usage
Context Manager
Create a context manager for automatic cleanup:
from contextlib import contextmanager
@contextmanager
def ssh_connection ( hostname , username , password ):
ssh = SSHClient( hostname = hostname, username = username, password = password)
ssh.connect()
try :
yield ssh
finally :
ssh.close()
# Usage
with ssh_connection( "192.168.1.100" , "admin" , "pass" ) as ssh:
code, out, err = ssh.execute_command( "uptime" )
print (out)
# Connection automatically closed
Key-Based Authentication
ssh = SSHClient(
hostname = "production.server.com" ,
username = "deploy" ,
key_filename = "/home/user/.ssh/id_rsa"
)
ssh.connect()
code, out, err = ssh.execute_command( "docker ps" )
ssh.close()
Connection Retry Logic
import time
def connect_with_retry ( ssh : SSHClient, max_retries = 3 ):
for attempt in range (max_retries):
try :
ssh.connect()
return True
except Exception as e:
print ( f "Attempt { attempt + 1 } failed: { e } " )
if attempt < max_retries - 1 :
time.sleep( 2 ** attempt) # Exponential backoff
return False
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "pass" )
if connect_with_retry(ssh):
code, out, err = ssh.execute_command( "hostname" )
ssh.close()
else :
print ( "Failed to connect after retries" )
Parallel Execution
import concurrent.futures
def check_server ( hostname ):
ssh = SSHClient( hostname = hostname, username = "admin" , password = "pass" )
ssh.connect()
code, out, err = ssh.execute_command( "uptime" )
ssh.close()
return hostname, out
servers = [ "192.168.1.100" , "192.168.1.101" , "192.168.1.102" ]
with concurrent.futures.ThreadPoolExecutor( max_workers = 3 ) as executor:
futures = [executor.submit(check_server, srv) for srv in servers]
for future in concurrent.futures.as_completed(futures):
hostname, uptime = future.result()
print ( f " { hostname } : { uptime } " )
Security Considerations
Never hardcode credentials in source code. Use environment variables or secure credential stores.
Using Environment Variables
import os
ssh = SSHClient(
hostname = os.getenv( "SSH_HOST" ),
username = os.getenv( "SSH_USER" ),
password = os.getenv( "SSH_PASS" )
)
Validating Commands
Before executing, validate against a whitelist:
from src.core.security import validate_command
def safe_execute ( ssh : SSHClient, command : str , use_sudo : bool = False ):
is_valid, reason = validate_command(command)
if not is_valid:
raise ValueError ( f "Command blocked: { reason } " )
return ssh.execute_command(command, use_sudo = use_sudo)
# Usage
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "pass" )
ssh.connect()
try :
code, out, err = safe_execute(ssh, "rm -rf /important/data" , use_sudo = True )
except ValueError as e:
print (e) # Command blocked: Destructive rm -rf pattern
ssh.close()
Error Handling
from paramiko import AuthenticationException, SSHException
import socket
ssh = SSHClient( hostname = "192.168.1.100" , username = "admin" , password = "wrong" )
try :
ssh.connect()
code, out, err = ssh.execute_command( "uptime" )
except AuthenticationException:
print ( "Invalid credentials" )
except SSHException as e:
print ( f "SSH protocol error: { e } " )
except socket.timeout:
print ( "Connection timeout" )
except Exception as e:
print ( f "Unexpected error: { e } " )
finally :
ssh.close()
Configuration
SSH settings from config.py:
Hostname or IP of the remote server
SSH password (for sudo commands)
Agent Nodes Use SSH client in monitor, execute, and verify nodes
Security Command validation and security policies
Configuration Configure SSH connection parameters
Execution Learn about command execution