flow_parser module parses PCAP files using Scapy and aggregates packets into 5-tuple flows (src IP, dst IP, src port, dst port, protocol). It computes per-flow timing metrics and inter-flow beacon intervals for C2 detection.
Data Structures
FlowRecord
Dataclass representing a single network flow with timing and size metrics.src_ip(str): Source IP addressdst_ip(str): Destination IP addresssrc_port(int): Source port (0 for non-TCP/UDP)dst_port(int): Destination port (0 for non-TCP/UDP)protocol(str): Protocol name (TCP,UDP,ICMP, or numeric ID)start_time(float): Unix timestamp of first packet in flowend_time(float): Unix timestamp of last packet in flowduration_s(float): Flow duration in seconds (rounded to 6 decimals)packet_count(int): Total number of packets in flowbyte_count(int): Total bytes transferred in flowinter_arrival_times(list[float]): Time deltas between consecutive packets within the flowpayload_sizes(list[int]): Per-packet payload sizes in bytesbeacon_iats(list[float]): Inter-flow arrival time to same destination (for beacon detection)
Functions
parse_pcap()
Parse a PCAP file and return one FlowRecord per 5-tuple flow, with beacon IATs computed.pcap_file(str): Path to PCAP file to parse
list[FlowRecord]: List of parsed flows with all metrics populated
FileNotFoundError: If PCAP file does not exist
- Streams packets one at a time to avoid loading entire PCAP into RAM
- Groups packets by 5-tuple (src IP, dst IP, src port, dst port, protocol)
- Computes per-flow metrics: duration, packet/byte counts, IATs, payload sizes
- Calls
compute_beacon_iats()to populate beacon timing for C2 detection - Logs parsing progress and statistics
compute_beacon_iats()
Compute inter-flow start time deltas for flows to the same destination.flows(list[FlowRecord]): List of flows to analyze (modified in-place)
- None (modifies
beacon_iatsfield of flows in-place)
- Groups flows by (dst_ip, dst_port) tuple
- Only tracks client-initiated flows (src_port > 1024, dst_port ≤ 1024)
- Sorts flows within each group by start_time
- For each flow, assigns the time delta to the next flow’s start_time as
beacon_iats[0] - Last flow in each group has empty
beacon_iatslist
save_flows()
Write flows to a JSON lines file.flows(list[FlowRecord]): Flows to serializeoutput_file(str): Output path for.flowsfile
- None
- Creates parent directories if needed
- Writes each flow as a JSON object on a single line (JSON Lines format)
- Logs save statistics
Protocol Mapping
The module maps IP protocol numbers to names:Command-Line Usage
--input: Input PCAP file path (required)--output: Output flows file path (required)
Beacon IAT Detection
The beacon IAT feature is critical for detecting periodic C2 callbacks:- Client-initiated flows: Only flows with high ephemeral source ports (> 1024) connecting to low destination ports (≤ 1024) are tracked
- Grouping: Flows are grouped by (dst_ip, dst_port) to identify repeated connections to the same C2 server
- Inter-flow timing: The time gap between consecutive connection start times captures the beacon interval
- Jitter analysis: Variance in beacon_iats reveals jitter applied to evade detection
Requirements
- Scapy (
scapy.all) - Depends on:
common.logger
Performance Notes
- Uses
PcapReaderstreaming to handle large PCAP files without loading into RAM - Flow aggregation uses dict-based grouping for O(1) lookups
- Timestamps are sorted within each flow to handle out-of-order packets
- Beacon IAT computation is O(n log n) due to per-group sorting
Notes
- All timestamps rounded to 6 decimal places (microsecond precision)
- Empty PCAPs return empty flow list with warning logged
- Non-IP packets are silently skipped
- ICMP and other non-TCP/UDP protocols use port 0