Skip to main content

Overview

The Database class provides an async context manager wrapping an aiosqlite connection. It manages persistent storage for all C2 server data including sessions, tasks, results, and nonce replay protection. Source: server/storage.py

Database Schema

sessions Table

CREATE TABLE IF NOT EXISTS sessions (
    session_id  TEXT PRIMARY KEY,
    hostname    TEXT NOT NULL,
    username    TEXT NOT NULL,
    os          TEXT NOT NULL,
    agent_ver   TEXT NOT NULL,
    first_seen  REAL NOT NULL,
    last_seen   REAL NOT NULL,
    jitter_pct  INTEGER NOT NULL,
    active      INTEGER NOT NULL DEFAULT 1
)
Stores agent session metadata.

tasks Table

CREATE TABLE IF NOT EXISTS tasks (
    task_id    TEXT PRIMARY KEY,
    session_id TEXT NOT NULL,
    command    TEXT NOT NULL,
    args       TEXT NOT NULL,
    timeout_s  INTEGER NOT NULL,
    queued_at  REAL NOT NULL,
    status     TEXT NOT NULL
)
Stores all tasks queued for agents. args stored as JSON string.

results Table

CREATE TABLE IF NOT EXISTS results (
    result_id   TEXT PRIMARY KEY,
    task_id     TEXT NOT NULL,
    stdout      TEXT NOT NULL,
    stderr      TEXT NOT NULL,
    exit_code   INTEGER NOT NULL,
    duration_ms INTEGER NOT NULL,
    received_at REAL NOT NULL
)
Stores task execution results.

nonces Table

CREATE TABLE IF NOT EXISTS nonces (
    nonce       TEXT PRIMARY KEY,
    received_at REAL NOT NULL
)
Stores nonces for replay attack detection. Entries older than 24 hours are pruned.

Database Class

Constructor

def __init__(self, db_path: str = DB_PATH)
Initializes database connection (connection established on __aenter__).
db_path
str
default:"logs/c2_server.db"
Path to SQLite database file. Use :memory: for in-memory testing.
Example:
from server.storage import Database

# Production database
db = Database()  # Uses logs/c2_server.db

# Test database
test_db = Database(':memory:')

# Custom path
custom_db = Database('/var/lib/c2/data.db')

Context Manager

async def __aenter__(self)
async def __aexit__(self, exc_type, exc_val, exc_tb)
Async context manager for automatic connection management. Example:
async with Database() as db:
    # Connection is open
    await db.insert_session(...)
    # Connection automatically closed on exit
Behavior:
  • __aenter__: Opens aiosqlite connection, sets row_factory, creates tables
  • __aexit__: Closes connection gracefully

Session Methods

insert_session()

async def insert_session(self, session_id: str, hostname: str, username: str,
                          os: str, agent_ver: str, jitter_pct: int) -> None
Insert a new session row with active=1 and first_seen/last_seen set to now.
session_id
str
required
UUID for the session
hostname
str
required
Agent’s hostname
username
str
required
Current username on agent system
os
str
required
Operating system version
agent_ver
str
required
Agent version string
jitter_pct
int
required
Beacon jitter percentage (0-100)
Example:
async with Database() as db:
    await db.insert_session(
        session_id='550e8400-e29b-41d4-a716-446655440000',
        hostname='VICTIM-PC',
        username='jdoe',
        os='Windows 10 22H2',
        agent_ver='1.0.0',
        jitter_pct=20
    )

get_session()

async def get_session(self, session_id: str) -> aiosqlite.Row | None
Return the session row for session_id, or None if not found.
session_id
str
required
UUID of the session
return
aiosqlite.Row | None
Row object with columns accessible by name, or None
Example:
row = await db.get_session(session_id)
if row:
    print(f"Hostname: {row['hostname']}")
    print(f"Active: {row['active']}")
    print(f"Last seen: {row['last_seen']}")

update_last_seen()

async def update_last_seen(self, session_id: str) -> None
Update last_seen timestamp for an active session.
session_id
str
required
UUID of the session
Example:
# Called on every beacon/heartbeat
await db.update_last_seen(session_id)

deactivate_session()

async def deactivate_session(self, session_id: str) -> None
Mark a session as inactive (active = 0).
session_id
str
required
UUID of the session to deactivate
Example:
# Operator kills session
await db.deactivate_session(session_id)
# Session remains in DB but active=0

list_sessions()

async def list_sessions(self) -> list
Return all session rows ordered by last_seen descending.
return
list[aiosqlite.Row]
List of session rows, most recent first
Example:
sessions = await db.list_sessions()
for row in sessions:
    status = "Active" if row['active'] else "Inactive"
    print(f"{row['hostname']} - {status}")

Task Methods

insert_task()

async def insert_task(self, task_id: str, session_id: str, command: str,
                      args: str, timeout_s: int) -> None
Insert a new task row with status PENDING.
task_id
str
required
UUID for the task
session_id
str
required
UUID of the session this task belongs to
command
str
required
Command type: shell, download, upload, etc.
args
str
required
JSON string of command arguments (e.g., '["whoami"]')
timeout_s
int
required
Maximum execution time in seconds
Example:
import json

await db.insert_task(
    task_id='abc-123',
    session_id='def-456',
    command='shell',
    args=json.dumps(['whoami']),
    timeout_s=30
)

update_task_status()

async def update_task_status(self, task_id: str, status: str) -> None
Update the status field of a task row.
task_id
str
required
UUID of the task
status
str
required
New status: PENDING, DISPATCHED, COMPLETE, ERROR
Example:
# Task sent to agent
await db.update_task_status(task_id, 'DISPATCHED')

# Task completed
await db.update_task_status(task_id, 'COMPLETE')

# Task failed
await db.update_task_status(task_id, 'ERROR')

get_pending_task()

async def get_pending_task(self, session_id: str) -> aiosqlite.Row | None
Return the oldest PENDING task for a session, or None.
session_id
str
required
UUID of the session
return
aiosqlite.Row | None
Oldest PENDING task row, or None if no pending tasks
Example:
task = await db.get_pending_task(session_id)
if task:
    print(f"Next task: {task['command']} {task['args']}")

get_tasks_for_session()

async def get_tasks_for_session(self, session_id: str) -> list
Return all task rows for a session ordered by queued_at.
session_id
str
required
UUID of the session
return
list[aiosqlite.Row]
List of task rows sorted by queued_at ascending
Example:
tasks = await db.get_tasks_for_session(session_id)
for task in tasks:
    print(f"{task['command']} - {task['status']}")

Result Methods

insert_result()

async def insert_result(self, result_id: str, task_id: str, stdout: str,
                         stderr: str, exit_code: int, duration_ms: int) -> None
Insert a task result row.
result_id
str
required
UUID for the result
task_id
str
required
UUID of the task this result belongs to
stdout
str
required
Command standard output
stderr
str
required
Command standard error
exit_code
int
required
Process exit code
duration_ms
int
required
Execution duration in milliseconds
Example:
import uuid

await db.insert_result(
    result_id=str(uuid.uuid4()),
    task_id='abc-123',
    stdout='VICTIM-PC\\jdoe',
    stderr='',
    exit_code=0,
    duration_ms=42
)

get_results_for_session()

async def get_results_for_session(self, session_id: str) -> list
Return all results for tasks belonging to a session via JOIN.
session_id
str
required
UUID of the session
return
list[aiosqlite.Row]
List of result rows sorted by received_at ascending
Example:
results = await db.get_results_for_session(session_id)
for result in results:
    print(f"Task {result['task_id']}:")
    print(f"  Exit code: {result['exit_code']}")
    print(f"  Output: {result['stdout']}")
    if result['stderr']:
        print(f"  Errors: {result['stderr']}")

Nonce Methods

check_and_store_nonce()

async def check_and_store_nonce(self, nonce: str) -> bool
Return True and store nonce if unseen in last 24h, False if replay detected.
nonce
str
required
Nonce string from beacon message
return
bool
True if nonce is new (beacon accepted), False if replay detected (reject)
Example:
if await db.check_and_store_nonce(nonce):
    # Process beacon
    pass
else:
    # Replay attack detected
    return JSONResponse(status_code=409, content={'error': 'replay detected'})
Behavior:
  1. Queries for nonce received within last 24 hours
  2. If found, logs warning and returns False (replay)
  3. If not found, inserts nonce with current timestamp
  4. Calls prune_old_nonces() to keep table lean
  5. Returns True (accept beacon)

prune_old_nonces()

async def prune_old_nonces(self) -> None
Delete nonce rows older than 24 hours. Example:
# Called automatically by check_and_store_nonce()
await db.prune_old_nonces()
Behavior:
  • Deletes rows where received_at < time.time() - 86400
  • Keeps nonce table size bounded
  • Prevents unbounded growth over long deployments

Constants

DB_PATH
str
default:"logs/c2_server.db"
Default database file path relative to project root
NONCE_EXPIRY_SECONDS
int
default:"86400"
Nonce retention period (24 hours)

Usage Patterns

Complete Session Workflow

async with Database() as db:
    # Agent checks in
    await db.insert_session(
        session_id='abc-123',
        hostname='VICTIM-PC',
        username='jdoe',
        os='Windows 10',
        agent_ver='1.0.0',
        jitter_pct=20
    )
    
    # Operator queues task
    await db.insert_task(
        task_id='def-456',
        session_id='abc-123',
        command='shell',
        args='["whoami"]',
        timeout_s=30
    )
    
    # Agent beacons
    await db.update_last_seen('abc-123')
    
    # Server dispatches task
    task = await db.get_pending_task('abc-123')
    await db.update_task_status('def-456', 'DISPATCHED')
    
    # Agent submits result
    await db.update_task_status('def-456', 'COMPLETE')
    await db.insert_result(
        result_id=str(uuid.uuid4()),
        task_id='def-456',
        stdout='VICTIM-PC\\jdoe',
        stderr='',
        exit_code=0,
        duration_ms=42
    )
    
    # Operator views results
    results = await db.get_results_for_session('abc-123')
    print(results[0]['stdout'])

Replay Protection

async with Database() as db:
    # First beacon
    nonce1 = 'abc123def456'
    assert await db.check_and_store_nonce(nonce1) is True
    
    # Replay attempt
    assert await db.check_and_store_nonce(nonce1) is False
    
    # New beacon
    nonce2 = 'xyz789uvw012'
    assert await db.check_and_store_nonce(nonce2) is True

Testing

Self-Test Suite

Run built-in tests:
python -m server.storage
Test Coverage:
  • insert_session / get_session
  • update_last_seen
  • list_sessions
  • insert_task / get_pending_task
  • update_task_status
  • insert_result / get_results_for_session
  • check_and_store_nonce replay detection
  • prune_old_nonces
  • deactivate_session
Output:
Running storage self-test...
  [OK] insert_session / get_session
  [OK] update_last_seen
  [OK] list_sessions
  [OK] insert_task / get_pending_task
  [OK] update_task_status
  [OK] insert_result / get_results_for_session
  [OK] check_and_store_nonce — replay correctly rejected
  [OK] prune_old_nonces
  [OK] deactivate_session

All storage self-tests passed.

In-Memory Testing

Use :memory: for fast unit tests:
async def test_example():
    async with Database(':memory:') as db:
        # Database exists only in RAM
        # No file I/O, very fast
        # Automatically destroyed on exit
        pass

Performance Considerations

aiosqlite.Row enables column access by name (e.g., row['hostname']) with minimal overhead.
Every write operation calls await self._conn.commit() for immediate durability. For bulk operations, consider batching commits.
No indexes defined. Consider adding indexes on session_id and status for large deployments.
Called on every beacon to prevent table bloat. For high traffic, consider background pruning task.

Logging

Structured logging with contextual fields:
logger.info('session inserted', extra={
    'session_id': session_id, 
    'hostname': hostname
})
Events:
  • Session inserted
  • Session deactivated
  • Task queued
  • Result stored
  • Nonce replay detected

Server Main

Main server integration

SessionManager

In-memory session management

CommandQueue

Task queue management

Configuration

Database path configuration

Build docs developers (and LLMs) love