Overview
Somnium is most effective when integrated with your existing security infrastructure. This guide shows how to correlate Somnium test results with firewalls, SIEM platforms, and IDS/IPS systems to validate comprehensive security coverage.Integration Architecture
┌─────────────┐
│ Somnium │ Generates malicious traffic
│ Test System │
└──────┬──────┘
│
▼
┌─────────────────────────────────────┐
│ Security Control Layer │
├─────────────┬──────────────┬────────┤
│ Firewall │ IDS/IPS │ Proxy │
└──────┬──────┴──────┬───────┴───┬────┘
│ │ │
└─────────────┼───────────┘
▼
┌────────────┐
│ SIEM │
│ Platform │
└────────────┘
│
┌──────┴──────┐
▼ ▼
┌────────┐ ┌──────────┐
│ Alerts │ │ Reports │
└────────┘ └──────────┘
Firewall Integration
- Palo Alto Networks
- Cisco ASA/Firepower
- Fortinet FortiGate
- pfSense/OPNsense
Configuration
Enable threat prevention logging
# CLI configuration
set deviceconfig setting logging log-suppression no
set shared log-settings profiles <profile-name> match-list <entry> send-to-panorama yes
Configure security policy logging
set rulebase security rules <rule-name> log-start yes
set rulebase security rules <rule-name> log-end yes
Correlation Queries
After running Somnium tests, search Palo Alto logs:Search for blocked connections
Search for blocked connections
( action = deny or action = drop )
and ( src = <somnium_test_system_ip> )
and ( receive_time >= '<test_start_time>' )
Search for threat matches
Search for threat matches
( threat_category neq "any" )
and ( src = <somnium_test_system_ip> )
and ( receive_time >= '<test_start_time>' )
Search for allowed connections (security gaps)
Search for allowed connections (security gaps)
( action = allow )
and ( src = <somnium_test_system_ip> )
and ( dest_ip in <malicious_ip_list> )
Example Alert Rule
Create custom correlation object for Somnium testing:<entry name="Somnium-Unexpected-Allow">
<description>Alert when Somnium test traffic is allowed</description>
<filter>( action eq allow ) and ( src eq <test_system_ip> ) and ( threat_category neq any )</filter>
<threshold>1</threshold>
<interval>1</interval>
</entry>
Configuration
Enable connection logging
# ASA CLI
logging enable
logging timestamp
logging trap informational
logging host <siem_ip> <interface>
Enable intrusion events
# Enable IPS event logging
set policy-map type inspect ips ips-policy-map
class class-default
ips inline fail-close
Correlation Queries
ASA syslog search
ASA syslog search
# Search for denied connections from test system
grep "<somnium_ip>" /var/log/messages | grep "Deny"
# Count denied connections by destination
grep "<somnium_ip>" /var/log/messages | grep "Deny" | \
awk '{print $NF}' | sort | uniq -c | sort -rn
Firepower event search
Firepower event search
(Source IP = <somnium_ip>)
AND (First Packet Time >= <test_start>)
AND (Action = Block OR Impact Flag = Red)
Configuration
Enable UTM logging
config log memory filter
set severity information
set forward-traffic enable
set virus enable
set ips enable
set dns enable
end
Configure FortiGuard categories
config webfilter profile
edit "default"
config web
set blacklist enable
set urlfilter-table 1
end
next
end
Correlation Queries
CLI log search
CLI log search
# Search logs for denied traffic
execute log filter field srcip <somnium_ip>
execute log filter field action deny
execute log display
# Search for IPS detections
execute log filter category ips
execute log filter field srcip <somnium_ip>
execute log display
FortiAnalyzer query
FortiAnalyzer query
SELECT
from_itime(itime) as time,
srcip, dstip, dstport,
action, attack, severity
FROM $log
WHERE srcip='<somnium_ip>'
AND logver>=52
AND itime>timestamp('<test_start>')
ORDER BY itime DESC
Configuration
Enable firewall logging
- System > Advanced > Firewall & NAT
- Check “Log packets matched from the default block rules”
- Check “Log packets matched from the default pass rules”
Install Suricata/Snort
- System > Package Manager
- Install “suricata” or “snort”
- Services > Suricata > Global Settings
- Enable “Keep Suricata Settings After Deinstall”
Correlation Commands
Search firewall logs
Search firewall logs
# Search for blocked traffic
clog /var/log/filter.log | grep "<somnium_ip>" | grep "block"
# Count blocks by destination
clog /var/log/filter.log | grep "<somnium_ip>" | grep "block" | \
awk -F'DST=' '{print $2}' | awk '{print $1}' | sort | uniq -c
Search Suricata alerts
Search Suricata alerts
# View Suricata alerts during test
tail -f /var/log/suricata/suricata_<interface>/alerts.log | \
grep "<somnium_ip>"
# Parse EVE JSON logs
jq 'select(.src_ip=="<somnium_ip>")' \
/var/log/suricata/suricata_<interface>/eve.json
SIEM Integration
- Splunk
- Elastic Stack (ELK)
- QRadar
- Microsoft Sentinel
Somnium Result Ingestion
Configure file monitoring
[monitor:///opt/somnium/*_Results.txt]
disabled = false
sourcetype = somnium:results
index = security
Create field extractions
[somnium:results]
SHOULD_LINEMERGE = false
TIME_PREFIX = Timestamp:
TIME_FORMAT = %H:%M:%S
EXTRACT-ip_test = Timestamp:(?<timestamp>\S+)\s+IP:(?<dest_ip>\S+)\s+:\s+Port:(?<dest_port>\d+)\s+test\s+(?<result>\w+)
EXTRACT-url_test = Timestamp:(?<timestamp>\S+)\s+URL:(?<dest_url>\S+)\s+test\s+(?<result>\w+)
EXTRACT-doh_test = Timestamp:(?<timestamp>\S+)\s+(?<provider>\w+)\s+response\s+for\s+(?<query_domain>\S+)
Correlation Searches
Find successful connections not blocked by firewall
Find successful connections not blocked by firewall
index=security sourcetype=somnium:results result="SUCCESSFUL"
| eval somnium_time=strptime(timestamp, "%H:%M:%S")
| join type=left dest_ip
[ search index=firewall action=deny OR action=drop
| eval fw_time=_time ]
| where isnull(fw_time)
| table timestamp, dest_ip, dest_port, result
| eval status="SECURITY GAP: Connection succeeded but no firewall block"
Correlate with IPS alerts
Correlate with IPS alerts
index=security sourcetype=somnium:results
| eval test_time=strptime(timestamp, "%H:%M:%S")
| join type=left dest_ip
[ search index=ips signature!=""
| rename dest_ip as attacked_ip
| eval ips_time=_time ]
| eval time_diff=abs(test_time - ips_time)
| where time_diff < 60
| table timestamp, dest_ip, result, signature, action
Measure security control effectiveness
Measure security control effectiveness
index=security sourcetype=somnium:results
| stats count by result, source
| eval total_tests=sum(count)
| eval block_rate=if(result="FAILED", count/total_tests*100, null())
| table source, result, count, block_rate
| sort -block_rate
Identify gaps in DNS security
Identify gaps in DNS security
sourcetype=somnium:results source="*DoH_Results.txt"
| rex field=_raw "response for (?<domain>\S+)"
| join type=left domain
[ search index=firewall dest_category="DNS over HTTPS" action=deny ]
| where isnull(action)
| stats count by domain, provider
| eval issue="DoH queries succeeded - DNS security bypassed"
Dashboard Example
<dashboard>
<label>Somnium Security Validation</label>
<row>
<panel>
<title>Block Rate by Test Type</title>
<chart>
<search>
<query>
index=security sourcetype=somnium:results
| eval test_type=case(
source LIKE "%IP_Results%", "Bad IPs",
source LIKE "%URL_Results%", "Phishing",
source LIKE "%TOR_Results%", "TOR Nodes",
source LIKE "%Malware_Results%", "Malware",
1=1, "Other")
| stats count by test_type, result
| chart sum(count) over test_type by result
</query>
</search>
<option name="charting.chart">column</option>
<option name="charting.chart.stackMode">stacked</option>
</chart>
</panel>
</row>
</dashboard>
Filebeat Configuration
Configure Filebeat input
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/somnium/*_Results.txt
fields:
service: somnium
environment: security-testing
fields_under_root: true
Create Logstash pipeline
filter {
if [service] == "somnium" {
grok {
match => {
"message" => [
"Timestamp:%{TIME:timestamp} IP:%{IP:dest_ip} : Port:%{NUMBER:dest_port} test %{WORD:result}",
"Timestamp:%{TIME:timestamp} URL:%{URI:dest_url} test %{WORD:result}",
"Timestamp:%{TIME:timestamp} %{WORD:provider} response for %{HOSTNAME:query_domain}"
]
}
}
date {
match => [ "timestamp", "HH:mm:ss" ]
target => "@timestamp"
}
mutate {
add_field => {
"security_test" => "true"
}
}
}
}
Create Elasticsearch index template
PUT _index_template/somnium
{
"index_patterns": ["somnium-*"],
"template": {
"mappings": {
"properties": {
"timestamp": { "type": "date", "format": "HH:mm:ss" },
"dest_ip": { "type": "ip" },
"dest_port": { "type": "integer" },
"dest_url": { "type": "keyword" },
"result": { "type": "keyword" },
"provider": { "type": "keyword" },
"query_domain": { "type": "keyword" }
}
}
}
}
Kibana Queries
Find connections not blocked
Find connections not blocked
GET somnium-*/_search
{
"query": {
"bool": {
"must": [
{ "term": { "result": "SUCCESSFUL" }},
{ "range": { "@timestamp": { "gte": "now-1h" }}}
],
"must_not": [
{ "exists": { "field": "firewall.action" }}
]
}
},
"aggs": {
"by_dest": {
"terms": { "field": "dest_ip" }
}
}
}
Correlation with firewall logs
Correlation with firewall logs
GET logs-*/_search
{
"query": {
"bool": {
"should": [
{ "term": { "service": "somnium" }},
{ "term": { "event.category": "network" }}
],
"filter": {
"range": { "@timestamp": { "gte": "now-15m" }}
}
}
},
"aggs": {
"timeline": {
"date_histogram": {
"field": "@timestamp",
"interval": "1m"
},
"aggs": {
"by_service": {
"terms": { "field": "service" }
}
}
}
}
}
Configuration
Create log source
- Admin > Log Sources > Add Log Source
- Log Source Type: “Custom”
- Protocol: “SCP” or “SFTP”
- Configure retrieval of Somnium result files
Create custom properties
Property Name: Somnium Test Result
Property Type: String
Regex: test\s+(\w+)
Property Name: Somnium Destination IP
Property Type: IP
Regex: IP:(\d+\.\d+\.\d+\.\d+)
AQL Queries
Find successful tests without corresponding blocks
Find successful tests without corresponding blocks
SELECT
DATEFORMAT(starttime, 'yyyy-MM-dd HH:mm:ss') as time,
sourceip, destinationip, destinationport,
"Somnium Test Result"
FROM events
WHERE "Log Source" = 'Somnium'
AND "Somnium Test Result" = 'SUCCESSFUL'
AND destinationip NOT IN (
SELECT destinationip
FROM events
WHERE category = 'Firewall Deny'
AND starttime BETWEEN <test_start> AND <test_end>
)
LAST 24 HOURS
Correlate with offense data
Correlate with offense data
SELECT
o.description,
o.magnitude,
e.destinationip,
e."Somnium Test Result"
FROM events e
JOIN offenses o ON e.destinationip = o.offense_source
WHERE e."Log Source" = 'Somnium'
AND o.status = 'OPEN'
LAST 7 DAYS
Configuration
Create custom table
.create table SomniumResults (
TimeGenerated: datetime,
TestType: string,
DestinationIP: string,
DestinationPort: int,
DestinationURL: string,
Result: string,
Provider: string,
QueryDomain: string
)
Create data ingestion
import re
from azure.monitor.ingestion import LogsIngestionClient
def parse_somnium_results(file_path):
pattern_ip = r'Timestamp:(\S+) IP:(\S+) : Port:(\d+) test (\w+)'
pattern_url = r'Timestamp:(\S+) URL:(\S+) test (\w+)'
with open(file_path, 'r') as f:
for line in f:
if match := re.match(pattern_ip, line):
yield {
'TimeGenerated': match.group(1),
'DestinationIP': match.group(2),
'DestinationPort': int(match.group(3)),
'Result': match.group(4)
}
elif match := re.match(pattern_url, line):
yield {
'TimeGenerated': match.group(1),
'DestinationURL': match.group(2),
'Result': match.group(3)
}
KQL Queries
Find security gaps
Find security gaps
let TestResults = SomniumResults
| where TimeGenerated >= ago(1h)
| where Result == "SUCCESSFUL";
TestResults
| join kind=leftanti (
CommonSecurityLog
| where DeviceAction in ("deny", "drop", "block")
) on $left.DestinationIP == $right.DestinationIP
| project TimeGenerated, DestinationIP, DestinationPort, Result
| extend SecurityGap = "Traffic allowed but not blocked"
Measure control effectiveness
Measure control effectiveness
SomniumResults
| where TimeGenerated >= ago(7d)
| summarize
Total = count(),
Blocked = countif(Result == "FAILED"),
Allowed = countif(Result in ("SUCCESSFUL", "SUCCESFULL"))
by TestType
| extend BlockRate = (Blocked * 100.0) / Total
| project TestType, Total, Blocked, Allowed, BlockRate
| order by BlockRate asc
Create alert rule
Create alert rule
SomniumResults
| where TimeGenerated >= ago(5m)
| where Result in ("SUCCESSFUL", "SUCCESFULL")
| join kind=leftanti (
CommonSecurityLog
| where DeviceAction in ("deny", "drop")
| where TimeGenerated >= ago(5m)
) on $left.DestinationIP == $right.DestinationIP
| summarize
Count = count(),
Destinations = make_set(DestinationIP)
by bin(TimeGenerated, 5m)
| where Count > 0
IDS/IPS Integration
- Suricata
- Snort
- Zeek (Bro)
Custom Rule for Somnium Testing
Create detection rules for Somnium patterns:# /etc/suricata/rules/somnium.rules
# Detect connections to known bad IPs during testing
alert tcp $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium Test: Connection to Threat Intel IP"; \
threshold: type both, track by_src, count 5, seconds 60; \
sid:9000001; rev:1;)
# Detect DGA domain patterns
alert dns $HOME_NET any -> any 53 \
(msg:"Somnium Test: Possible DGA Domain Query"; \
dns_query; content:".xyz"; nocase; \
pcre:"/^[a-z]{8,15}\.(xyz|top|tk|gq|club)$/i"; \
sid:9000002; rev:1;)
# Detect DNS over HTTPS
alert tls $HOME_NET any -> $EXTERNAL_NET 443 \
(msg:"Somnium Test: DNS over HTTPS to Public Resolver"; \
tls_sni; content:"dns.google"; \
sid:9000003; rev:1;)
alert tls $HOME_NET any -> $EXTERNAL_NET 443 \
(msg:"Somnium Test: DNS over HTTPS to Cloudflare"; \
tls_sni; content:"cloudflare-dns.com"; \
sid:9000004; rev:1;)
# Detect known bad user agents
alert http $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium Test: Malicious User-Agent Detected"; \
http_user_agent; content:"sqlmap"; \
sid:9000005; rev:1;)
alert http $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium Test: Scanner User-Agent"; \
http_user_agent; content:"masscan"; \
sid:9000006; rev:1;)
Enable EVE JSON Logging
# /etc/suricata/suricata.yaml
outputs:
- eve-log:
enabled: yes
filetype: regular
filename: eve.json
types:
- alert
- dns
- tls
- http
Validation Query
# Check if Somnium tests triggered alerts
jq 'select(.event_type=="alert" and .alert.signature_id >= 9000000)' \
/var/log/suricata/eve.json | jq -s 'group_by(.alert.signature) | \
map({signature: .[0].alert.signature, count: length})'
Custom Somnium Rules
# /etc/snort/rules/local.rules
# High frequency connection attempts (Somnium IP testing)
alert tcp $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium: Multiple connections to external hosts"; \
threshold: type both, track by_src, count 10, seconds 30; \
sid:1000001; rev:1;)
# TOR exit node connection
alert tcp $HOME_NET any -> $EXTERNAL_NET [80,443] \
(msg:"Somnium: Connection to TOR exit node"; \
sid:1000002; rev:1; \
reference:url,github.com/SecOps-Institute/Tor-IP-Addresses;)
# Remote desktop tool detection
alert tcp $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium: TeamViewer connection detected"; \
content:"teamviewer.com"; nocase; \
sid:1000003; rev:1;)
alert tcp $HOME_NET any -> $EXTERNAL_NET any \
(msg:"Somnium: AnyDesk connection detected"; \
content:"anydesk.com"; nocase; \
sid:1000004; rev:1;)
Alert Validation
# View alerts during test window
cat /var/log/snort/alert | grep -A 5 "Somnium"
# Count alerts by signature
grep "Somnium" /var/log/snort/alert | \
awk -F'[][]' '{print $2}' | sort | uniq -c | sort -rn
Custom Somnium Scripts
Create/opt/zeek/share/zeek/site/somnium.zeek:@load base/frameworks/notice
module Somnium;
export {
redef enum Notice::Type += {
## DGA domain pattern detected
DGA_Pattern,
## Connection to known malicious IP
Known_Bad_IP,
## DNS over HTTPS detected
DNS_Over_HTTPS,
## Suspicious user agent
Bad_User_Agent,
};
}
# Detect DGA patterns in DNS queries
event dns_request(c: connection, msg: dns_msg, query: string, qtype: count, qclass: count) {
local high_risk_tlds = set(".xyz", ".top", ".tk", ".gq", ".club");
for (tld in high_risk_tlds) {
if (tld in query) {
# Check for high entropy / random characters
if (/^[a-z]{8,15}\.(xyz|top|tk|gq|club)$/i in query) {
NOTICE([$note=DGA_Pattern,
$msg=fmt("Possible DGA domain: %s", query),
$conn=c]);
}
}
}
}
# Detect DNS over HTTPS
event ssl_established(c: connection) {
if (c$ssl?$server_name) {
if (/dns\.google|cloudflare-dns\.com/ in c$ssl$server_name) {
NOTICE([$note=DNS_Over_HTTPS,
$msg=fmt("DoH connection to %s", c$ssl$server_name),
$conn=c]);
}
}
}
# Detect malicious user agents
event http_header(c: connection, is_orig: bool, name: string, value: string) {
if (is_orig && name == "USER-AGENT") {
local bad_agents = /sqlmap|masscan|nikto|nmap|openvas/;
if (bad_agents in value) {
NOTICE([$note=Bad_User_Agent,
$msg=fmt("Malicious user agent: %s", value),
$conn=c]);
}
}
}
Load the script
echo "@load somnium" >> /opt/zeek/share/zeek/site/local.zeek
zeekctl deploy
Query Zeek logs
# Search notice.log for Somnium detections
zeek-cut ts note msg < /opt/zeek/logs/current/notice.log | \
grep "Somnium"
# Parse DNS logs for DGA patterns
zeek-cut ts query < /opt/zeek/logs/current/dns.log | \
grep -E '\.(xyz|top|tk|gq|club)$'
Automated Validation Workflow
Schedule Somnium tests
# Run Somnium tests every Monday at 2 AM
0 2 * * 1 /usr/bin/python3 /opt/somnium/automated_test.py
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime
tests = [
(1, "Known Bad IPs"),
(2, "Phishing URLs"),
(3, "TOR Exit Nodes"),
(8, "Bad User Agents"),
(9, "DNS over HTTPS")
]
test_start = datetime.now()
print(f"Starting automated Somnium tests at {test_start}")
for test_num, test_name in tests:
print(f"Running test {test_num}: {test_name}")
proc = subprocess.Popen(
['python3', 'main.py'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
proc.communicate(input=f"{test_num}\n0\n".encode())
time.sleep(10) # Wait between tests
test_end = datetime.now()
print(f"Tests completed at {test_end}")
print(f"Total duration: {test_end - test_start}")
Trigger SIEM correlation
# Example: Trigger Splunk saved search
curl -k -u admin:password https://splunk:8089/services/saved/searches/somnium_validation/dispatch \
-d earliest_time="-30m" -d latest_time="now"
Generate validation report
import glob
from collections import Counter
def analyze_results():
report = {}
for result_file in glob.glob("*_Results.txt"):
test_type = result_file.replace("_Results.txt", "")
with open(result_file, 'r') as f:
content = f.read()
successful = content.count("SUCCESSFUL") + content.count("SUCCESFULL")
failed = content.count("FAILED")
total = successful + failed
if total > 0:
block_rate = (failed / total) * 100
report[test_type] = {
'total': total,
'blocked': failed,
'allowed': successful,
'block_rate': block_rate
}
return report
# Generate markdown report
report = analyze_results()
print("# Somnium Security Validation Report\n")
print(f"**Test Date:** {datetime.now()}\n")
print("| Test Type | Total | Blocked | Allowed | Block Rate |")
print("|-----------|-------|---------|---------|------------|")
for test, metrics in report.items():
print(f"| {test} | {metrics['total']} | {metrics['blocked']} | "
f"{metrics['allowed']} | {metrics['block_rate']:.1f}% |")
Alert on gaps
import smtplib
from email.mime.text import MIMEText
def send_alert(gaps):
if not gaps:
return
body = "Security gaps detected in Somnium testing:\n\n"
for gap in gaps:
body += f"- {gap}\n"
msg = MIMEText(body)
msg['Subject'] = 'ALERT: Somnium Security Gaps Detected'
msg['From'] = '[email protected]'
msg['To'] = '[email protected]'
smtp = smtplib.SMTP('localhost')
smtp.send_message(msg)
smtp.quit()
# Check for gaps
gaps = []
for test, metrics in report.items():
if metrics['allowed'] > 0:
gaps.append(f"{test}: {metrics['allowed']} threats not blocked")
if gaps:
send_alert(gaps)