Skip to main content
The flow_parser module parses PCAP files using Scapy and aggregates packets into 5-tuple flows (src IP, dst IP, src port, dst port, protocol). It computes per-flow timing metrics and inter-flow beacon intervals for C2 detection.

Data Structures

FlowRecord

Dataclass representing a single network flow with timing and size metrics.
@dataclass
class FlowRecord:
    src_ip: str
    dst_ip: str
    src_port: int
    dst_port: int
    protocol: str
    start_time: float           # Unix timestamp of first packet
    end_time: float             # Unix timestamp of last packet
    duration_s: float
    packet_count: int
    byte_count: int
    inter_arrival_times: list[float] = field(default_factory=list)
    payload_sizes: list[int] = field(default_factory=list)
    beacon_iats: list[float] = field(default_factory=list)  # inter-flow gaps to same dst
Fields:
  • src_ip (str): Source IP address
  • dst_ip (str): Destination IP address
  • src_port (int): Source port (0 for non-TCP/UDP)
  • dst_port (int): Destination port (0 for non-TCP/UDP)
  • protocol (str): Protocol name (TCP, UDP, ICMP, or numeric ID)
  • start_time (float): Unix timestamp of first packet in flow
  • end_time (float): Unix timestamp of last packet in flow
  • duration_s (float): Flow duration in seconds (rounded to 6 decimals)
  • packet_count (int): Total number of packets in flow
  • byte_count (int): Total bytes transferred in flow
  • inter_arrival_times (list[float]): Time deltas between consecutive packets within the flow
  • payload_sizes (list[int]): Per-packet payload sizes in bytes
  • beacon_iats (list[float]): Inter-flow arrival time to same destination (for beacon detection)
Example:
flow = FlowRecord(
    src_ip='192.168.1.10',
    dst_ip='10.0.0.5',
    src_port=54321,
    dst_port=443,
    protocol='TCP',
    start_time=1710163052.123456,
    end_time=1710163055.789012,
    duration_s=3.665556,
    packet_count=42,
    byte_count=8192,
    inter_arrival_times=[0.05, 0.12, 0.08, ...],
    payload_sizes=[1460, 1460, 876, ...],
    beacon_iats=[60.5]  # Next connection to same dst started 60.5s later
)

Functions

parse_pcap()

Parse a PCAP file and return one FlowRecord per 5-tuple flow, with beacon IATs computed.
def parse_pcap(pcap_file: str) -> list[FlowRecord]
Parameters:
  • pcap_file (str): Path to PCAP file to parse
Returns:
  • list[FlowRecord]: List of parsed flows with all metrics populated
Raises:
  • FileNotFoundError: If PCAP file does not exist
Behavior:
  • Streams packets one at a time to avoid loading entire PCAP into RAM
  • Groups packets by 5-tuple (src IP, dst IP, src port, dst port, protocol)
  • Computes per-flow metrics: duration, packet/byte counts, IATs, payload sizes
  • Calls compute_beacon_iats() to populate beacon timing for C2 detection
  • Logs parsing progress and statistics
Example:
from telemetry.flow_parser import parse_pcap

flows = parse_pcap('pcaps/beacon_capture.pcap')
for flow in flows:
    print(f"{flow.src_ip}:{flow.src_port} -> {flow.dst_ip}:{flow.dst_port}")
    print(f"  Packets: {flow.packet_count}, Bytes: {flow.byte_count}")
    print(f"  Duration: {flow.duration_s}s")
    if flow.beacon_iats:
        print(f"  Next beacon in: {flow.beacon_iats[0]}s")

compute_beacon_iats()

Compute inter-flow start time deltas for flows to the same destination.
def compute_beacon_iats(flows: list[FlowRecord]) -> None
Parameters:
  • flows (list[FlowRecord]): List of flows to analyze (modified in-place)
Returns:
  • None (modifies beacon_iats field of flows in-place)
Behavior:
  • Groups flows by (dst_ip, dst_port) tuple
  • Only tracks client-initiated flows (src_port > 1024, dst_port ≤ 1024)
  • Sorts flows within each group by start_time
  • For each flow, assigns the time delta to the next flow’s start_time as beacon_iats[0]
  • Last flow in each group has empty beacon_iats list
Algorithm:
# For flows F1, F2, F3 to same destination:
# F1.beacon_iats = [F2.start_time - F1.start_time]
# F2.beacon_iats = [F3.start_time - F2.start_time]
# F3.beacon_iats = []  # No subsequent flow
Example:
from telemetry.flow_parser import parse_pcap, compute_beacon_iats

flows = parse_pcap('beacon.pcap')
# beacon_iats already populated by parse_pcap()

# Manual re-computation if needed:
compute_beacon_iats(flows)

save_flows()

Write flows to a JSON lines file.
def save_flows(flows: list[FlowRecord], output_file: str) -> None
Parameters:
  • flows (list[FlowRecord]): Flows to serialize
  • output_file (str): Output path for .flows file
Returns:
  • None
Behavior:
  • Creates parent directories if needed
  • Writes each flow as a JSON object on a single line (JSON Lines format)
  • Logs save statistics
Example:
from telemetry.flow_parser import parse_pcap, save_flows

flows = parse_pcap('capture.pcap')
save_flows(flows, 'output/capture.flows')

Protocol Mapping

The module maps IP protocol numbers to names:
_PROTO_MAP = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}
Unknown protocols are represented by their numeric ID as a string.

Command-Line Usage

# Parse PCAP and save flows to JSON lines
python -m telemetry.flow_parser --input capture.pcap --output capture.flows
Arguments:
  • --input: Input PCAP file path (required)
  • --output: Output flows file path (required)

Beacon IAT Detection

The beacon IAT feature is critical for detecting periodic C2 callbacks:
  1. Client-initiated flows: Only flows with high ephemeral source ports (> 1024) connecting to low destination ports (≤ 1024) are tracked
  2. Grouping: Flows are grouped by (dst_ip, dst_port) to identify repeated connections to the same C2 server
  3. Inter-flow timing: The time gap between consecutive connection start times captures the beacon interval
  4. Jitter analysis: Variance in beacon_iats reveals jitter applied to evade detection
Example beacon pattern:
# Three beacon callbacks to 10.0.0.5:443 with ~60s interval
Flow 1: start_time=1000.0, beacon_iats=[60.2]  # Next callback in 60.2s
Flow 2: start_time=1060.2, beacon_iats=[59.8]  # Next callback in 59.8s
Flow 3: start_time=1120.0, beacon_iats=[]      # No subsequent flow

# Mean beacon interval: 60.0s, Jitter: ±0.2s

Requirements

  • Scapy (scapy.all)
  • Depends on: common.logger

Performance Notes

  • Uses PcapReader streaming to handle large PCAP files without loading into RAM
  • Flow aggregation uses dict-based grouping for O(1) lookups
  • Timestamps are sorted within each flow to handle out-of-order packets
  • Beacon IAT computation is O(n log n) due to per-group sorting

Notes

  • All timestamps rounded to 6 decimal places (microsecond precision)
  • Empty PCAPs return empty flow list with warning logged
  • Non-IP packets are silently skipped
  • ICMP and other non-TCP/UDP protocols use port 0

Build docs developers (and LLMs) love