Overview
The executor.py module provides safe command execution with blocklist filtering, timeout enforcement, and structured result capture. All commands run with shell=False to prevent injection attacks.
Module Location
Classes
TaskResult
Dataclass representing the result of a command execution.
Attributes
@dataclass
class TaskResult:
task_id: str # Unique task identifier
stdout: str # Standard output (capped at 64KB)
stderr: str # Standard error (capped at 64KB)
exit_code: int # Process exit code
duration_ms: int # Execution duration in milliseconds
Example
result = TaskResult(
task_id = 'task-123',
stdout = 'hello world\n',
stderr = '',
exit_code = 0,
duration_ms = 42,
)
Functions
execute(task_id: str, command: str, args: list, timeout_s: int) -> TaskResult
Executes a command safely and returns a TaskResult. Never uses shell=True.
Parameters
task_id (str): Unique identifier for this task
command (str): Command to execute (executable name only)
args (list): Command arguments as a list
timeout_s (int): Maximum execution time in seconds
Returns
TaskResult: Structured result containing stdout, stderr, exit code, and duration
Exit Codes
| Code | Meaning |
|---|
0 | Success |
1 | General executor error |
124 | Timeout exceeded |
126 | Blocked command |
127 | Command not found |
Execution Flow
Step 1: Input Normalization
args = args or []
command = (command or '').strip()
if not command:
return TaskResult(
task_id = task_id,
stdout = '',
stderr = 'BLOCKED: empty command',
exit_code = 126,
duration_ms = _elapsed(),
)
Step 2: Blocklist Check
if _is_blocked(command):
return TaskResult(
task_id = task_id,
stdout = '',
stderr = 'BLOCKED: prohibited command',
exit_code = 126,
duration_ms = _elapsed(),
)
Step 3: Build Command List
cmd_list = [command] + [str(a) for a in args]
Step 4: Execute with subprocess
result = subprocess.run(
cmd_list,
capture_output = True,
text = True,
timeout = timeout_s,
shell = False, # prevents injection
)
Step 5: Return Result
return TaskResult(
task_id = task_id,
stdout = (result.stdout or '')[:MAX_OUTPUT],
stderr = (result.stderr or '')[:MAX_OUTPUT],
exit_code = result.returncode,
duration_ms = _elapsed(),
)
Usage Examples
Successful Command:
result = execute('task-1', 'echo', ['hello', 'world'], 30)
assert result.exit_code == 0
assert 'hello world' in result.stdout
assert result.duration_ms > 0
Blocked Command:
result = execute('task-2', 'nmap', ['-sV', '10.0.0.1'], 30)
assert result.exit_code == 126
assert result.stderr == 'BLOCKED: prohibited command'
assert result.stdout == ''
Command Not Found:
result = execute('task-3', 'nonexistent_command', [], 30)
assert result.exit_code == 127
assert result.stderr == 'COMMAND NOT FOUND'
Timeout:
result = execute('task-4', 'sleep', ['100'], 2)
assert result.exit_code == 124
assert result.stderr == 'TIMEOUT'
With Arguments:
result = execute('task-5', 'ls', ['-la', '/tmp'], 10)
assert result.exit_code == 0
assert len(result.stdout) > 0
Helper Functions
_is_blocked(command: str) -> bool
Internal function - checks if command matches any entry in BLOCKED_COMMANDS.
Parameters:
command (str): Command to check
Returns:
bool: True if command is blocked
Logic:
command_lower = command.lower().strip()
return any(
command_lower == blocked.lower() or
command_lower.startswith(blocked.lower() + " ")
for blocked in config.BLOCKED_COMMANDS
)
Examples:
assert _is_blocked('nmap') # True
assert _is_blocked('NMAP') # True (case-insensitive)
assert _is_blocked('nmap -sV') # True (with args)
assert _is_blocked('ls') # False (not in blocklist)
Constants
MAX_OUTPUT = 65536 # 64 KB cap to prevent oversized responses
Output capping prevents memory exhaustion from commands with excessive output:
stdout = (result.stdout or '')[:MAX_OUTPUT]
stderr = (result.stderr or '')[:MAX_OUTPUT]
Exception Handling
TimeoutExpired
Command exceeded timeout_s:
except subprocess.TimeoutExpired as e:
logger.warning('command timed out', extra={
'task_id': task_id,
'timeout_s': timeout_s,
})
return TaskResult(
task_id = task_id,
stdout = e.stdout or '',
stderr = e.stderr or 'TIMEOUT',
exit_code = 124,
duration_ms = _elapsed(),
)
FileNotFoundError
Command executable not found in PATH:
except FileNotFoundError:
logger.warning('command not found', extra={
'task_id': task_id,
'command': command,
})
return TaskResult(
task_id = task_id,
stdout = '',
stderr = 'COMMAND NOT FOUND',
exit_code = 127,
duration_ms = _elapsed(),
)
General Exceptions
Unexpected errors during execution:
except Exception as e:
logger.error('unexpected executor error', extra={
'task_id': task_id,
'reason': str(e),
})
return TaskResult(
task_id = task_id,
stdout = '',
stderr = f'EXECUTOR ERROR: {e}',
exit_code = 1,
duration_ms = _elapsed(),
)
Logging
The executor emits structured logs for all operations:
Execution start:
logger.info('executing command', extra={
'task_id': task_id,
'command': command,
'cmd_args': args,
})
Execution complete:
logger.info('command complete', extra={
'task_id': task_id,
'exit_code': result.returncode,
'duration_ms': task_result.duration_ms,
})
Blocked command:
logger.warning('blocked command rejected', extra={
'task_id': task_id,
'command': command,
})
Timeout:
logger.warning('command timed out', extra={
'task_id': task_id,
'timeout_s': timeout_s,
})
Not found:
logger.warning('command not found', extra={
'task_id': task_id,
'command': command,
})
Security Features
The executor enforces strict security controls to prevent code injection and abuse.
1. Shell Injection Prevention
Always uses shell=False:
result = subprocess.run(
cmd_list,
shell = False, # prevents injection
...
)
Safe:
execute('t1', 'echo', ['arg1; rm -rf /'], 10)
# Executes: ['echo', 'arg1; rm -rf /']
# Output: 'arg1; rm -rf /' (string literal, not executed)
Unsafe (NOT used):
# This would be dangerous:
subprocess.run('echo arg1; rm -rf /', shell=True) # DON'T DO THIS
2. Command Blocklist
Blocked commands return exit code 126:
config.BLOCKED_COMMANDS = [
'nmap', 'masscan', 'netcat', 'nc',
'wireshark', 'tcpdump', 'aircrack-ng',
# ... more dangerous tools
]
3. Timeout Enforcement
Prevents runaway processes:
result = subprocess.run(
cmd_list,
timeout = timeout_s, # hard timeout
...
)
4. Output Capping
Prevents memory exhaustion:
stdout = (result.stdout or '')[:MAX_OUTPUT] # 64 KB max
stderr = (result.stderr or '')[:MAX_OUTPUT]
Rejects empty commands:
if not command:
return TaskResult(..., stderr='BLOCKED: empty command', exit_code=126)
Self-Test Suite
The module includes comprehensive self-tests:
Test Coverage:
- Blocked command returns exit code
126
- All
BLOCKED_COMMANDS entries are rejected
- Successful command returns exit code
0
- Command not found returns exit code
127
- Timeout returns exit code
124
- Duration measurement is positive
shell=False prevents injection
- Arguments passed correctly
Expected Output:
Running executor self-test...
[OK] blocked command returns exit_code 126
[OK] all BLOCKED_COMMANDS entries are rejected
[OK] successful command returns exit_code 0 with correct stdout
[OK] missing command returns exit_code 127
[OK] timed-out command returns exit_code 124
[OK] duration_ms is positive
[OK] shell=False prevents command injection
[OK] args passed correctly to command
All executor self-tests passed.
Best Practices
1. Always Specify Timeout
# Good
result = execute('t1', 'ls', ['-la'], timeout_s=30)
# Bad - missing timeout could hang indefinitely
result = execute('t1', 'ls', ['-la'], timeout_s=None) # Don't do this
2. Check Exit Codes
result = execute('t1', 'whoami', [], 10)
if result.exit_code == 0:
print(f"User: {result.stdout.strip()}")
elif result.exit_code == 126:
print("Command blocked by security policy")
elif result.exit_code == 127:
print("Command not found")
3. Handle Capped Output
result = execute('t1', 'cat', ['/huge/file.txt'], 30)
if len(result.stdout) >= MAX_OUTPUT:
print("Warning: output was truncated to 64 KB")
4. Use Separate Command and Args
# Good - command and args separate
execute('t1', 'ls', ['-la', '/tmp'], 10)
# Bad - embedding args in command string
execute('t1', 'ls -la /tmp', [], 10) # Won't work as expected
- beacon - Task dispatch and result reporting
- config -
BLOCKED_COMMANDS configuration
- logger - Logging utilities