Overview
Unlike traditional SIEM systems that correlate data after storage, UTMStack correlates events during the ingestion pipeline: Benefits:- Immediate threat detection (sub-second response)
- Reduced storage requirements (only store relevant data)
- Lower computational overhead
- Real-time alerting without query delays
Architecture
Core Components
1. Event Classifier
Categorizes incoming events for efficient routing to appropriate correlation rules:public class EventClassifier {
private final Map<String, EventCategory> categoryCache;
private final List<ClassificationRule> rules;
public EventCategory classify(LogEvent event) {
// Check cache first
String cacheKey = generateCacheKey(event);
if (categoryCache.containsKey(cacheKey)) {
return categoryCache.get(cacheKey);
}
// Apply classification rules
for (ClassificationRule rule : rules) {
if (rule.matches(event)) {
EventCategory category = rule.getCategory();
categoryCache.put(cacheKey, category);
return category;
}
}
return EventCategory.UNKNOWN;
}
}
- Authentication events
- Network traffic
- File operations
- Process execution
- Privilege escalation
- Data access
- System changes
2. State Manager
Maintains session state and context for stateful correlation:public class StateManager {
private final Cache<String, CorrelationState> stateCache;
private final int defaultTTL = 3600; // 1 hour
public CorrelationState getOrCreateState(String key, int ttl) {
CorrelationState state = stateCache.getIfPresent(key);
if (state == null) {
state = new CorrelationState();
stateCache.put(key, state);
// Schedule cleanup
scheduleExpiration(key, ttl > 0 ? ttl : defaultTTL);
}
return state;
}
public void updateState(String key, Consumer<CorrelationState> updater) {
CorrelationState state = getOrCreateState(key, defaultTTL);
synchronized (state) {
updater.accept(state);
}
}
}
public class CorrelationState {
private Map<String, Object> attributes;
private List<LogEvent> relatedEvents;
private Instant firstSeen;
private Instant lastSeen;
private int eventCount;
public void addEvent(LogEvent event) {
relatedEvents.add(event);
lastSeen = Instant.now();
eventCount++;
}
public boolean exceeds(String attribute, int threshold) {
Integer count = (Integer) attributes.getOrDefault(attribute, 0);
return count >= threshold;
}
}
3. Rule Engine
Executes correlation rules against incoming events:public interface CorrelationRule {
String getId();
String getName();
String getDescription();
RuleSeverity getSeverity();
List<String> getCategories();
boolean evaluate(LogEvent event, CorrelationContext context);
Alert generateAlert(LogEvent event, CorrelationContext context);
}
public class RuleEngine {
private final Map<String, List<CorrelationRule>> rulesByCategory;
private final StateManager stateManager;
private final ExecutorService executor;
public List<Alert> evaluateEvent(LogEvent event) {
List<Alert> alerts = new ArrayList<>();
EventCategory category = event.getCategory();
// Get applicable rules
List<CorrelationRule> rules = rulesByCategory.getOrDefault(
category.getName(),
Collections.emptyList()
);
// Evaluate rules in parallel
List<Future<Optional<Alert>>> futures = new ArrayList<>();
for (CorrelationRule rule : rules) {
futures.add(executor.submit(() -> evaluateRule(rule, event)));
}
// Collect results
for (Future<Optional<Alert>> future : futures) {
try {
future.get(100, TimeUnit.MILLISECONDS).ifPresent(alerts::add);
} catch (Exception e) {
log.error("Rule evaluation failed", e);
}
}
return alerts;
}
private Optional<Alert> evaluateRule(CorrelationRule rule, LogEvent event) {
CorrelationContext context = new CorrelationContext(
event,
stateManager,
assetDatabase,
threatIntelService
);
if (rule.evaluate(event, context)) {
return Optional.of(rule.generateAlert(event, context));
}
return Optional.empty();
}
}
Correlation Rule Types
1. Simple Threshold Rules
Detect when an event count exceeds a threshold:rule:
id: "failed-login-threshold"
name: "Excessive Failed Login Attempts"
description: "Multiple failed login attempts from single source"
severity: high
category: authentication
conditions:
event_type: "authentication_failure"
groupby:
- source_ip
- username
threshold: 5
timewindow: 300s # 5 minutes
actions:
- create_alert:
title: "Brute Force Attack Detected"
description: "{{threshold}} failed login attempts for user {{username}} from {{source_ip}}"
severity: high
mitre_technique: "T1110" # Brute Force
public class ThresholdRule implements CorrelationRule {
private final String eventType;
private final List<String> groupByFields;
private final int threshold;
private final Duration timeWindow;
@Override
public boolean evaluate(LogEvent event, CorrelationContext context) {
if (!event.getType().equals(eventType)) {
return false;
}
// Generate state key from groupby fields
String stateKey = generateStateKey(event, groupByFields);
// Update state
context.getStateManager().updateState(stateKey, state -> {
state.addEvent(event);
});
// Check threshold
CorrelationState state = context.getStateManager().getState(stateKey);
return state.getEventCount() >= threshold;
}
}
2. Sequence Rules
Detect specific event sequences (A followed by B within timeframe):rule:
id: "privilege-escalation-sequence"
name: "Privilege Escalation Attempt"
description: "User gained elevated privileges after failed access"
severity: critical
sequence:
- event: "access_denied"
fields:
resource_type: "admin_panel"
alias: "denied"
- event: "privilege_change"
fields:
new_privilege: "administrator"
alias: "escalation"
where: "escalation.username == denied.username"
within: 600s
actions:
- create_alert:
title: "Suspicious Privilege Escalation"
severity: critical
mitre_tactic: "TA0004" # Privilege Escalation
public class SequenceRule implements CorrelationRule {
private final List<SequenceStep> steps;
@Override
public boolean evaluate(LogEvent event, CorrelationContext context) {
// Find which step this event matches
for (int i = 0; i < steps.size(); i++) {
SequenceStep step = steps.get(i);
if (step.matches(event)) {
String stateKey = generateSequenceKey(event);
context.getStateManager().updateState(stateKey, state -> {
state.recordStep(i, event);
});
// Check if sequence is complete
CorrelationState state = context.getStateManager().getState(stateKey);
if (state.hasCompletedSequence(steps)) {
return true;
}
}
}
return false;
}
}
3. Anomaly Detection Rules
Detect deviations from learned baselines:rule:
id: "unusual-data-transfer"
name: "Unusual Data Transfer Volume"
description: "Data transfer exceeds normal baseline"
severity: medium
anomaly:
metric: "bytes_transferred"
groupby:
- username
- destination
baseline_period: 7d
threshold: 3.0 # Standard deviations
min_samples: 100
actions:
- create_alert:
title: "Anomalous Data Transfer"
description: "User {{username}} transferred {{bytes_transferred}} bytes ({{std_dev}}x normal)"
public class AnomalyDetectionRule implements CorrelationRule {
private final BaselineManager baselineManager;
private final String metric;
private final double stdDevThreshold;
@Override
public boolean evaluate(LogEvent event, CorrelationContext context) {
String baselineKey = generateBaselineKey(event);
Baseline baseline = baselineManager.getBaseline(baselineKey);
if (baseline == null || !baseline.hasSufficientSamples()) {
// Learn from this event
baselineManager.recordSample(baselineKey, getMetricValue(event));
return false;
}
double value = getMetricValue(event);
double zScore = baseline.calculateZScore(value);
if (Math.abs(zScore) >= stdDevThreshold) {
event.addField("std_dev", String.valueOf(zScore));
return true;
}
// Update baseline
baselineManager.recordSample(baselineKey, value);
return false;
}
}
public class Baseline {
private double mean;
private double stdDev;
private int sampleCount;
private final int minSamples;
public double calculateZScore(double value) {
if (stdDev == 0) return 0;
return (value - mean) / stdDev;
}
public void addSample(double value) {
// Update running statistics
sampleCount++;
double delta = value - mean;
mean += delta / sampleCount;
double delta2 = value - mean;
// ... update variance
}
}
4. Threat Intelligence Rules
Match against known threat indicators:rule:
id: "known-malicious-ip"
name: "Communication with Known Malicious IP"
description: "Detected connection to known threat actor IP"
severity: critical
threat_intel:
field: "destination_ip"
indicator_type: "ip"
min_confidence: 80
sources:
- "abuse_ch"
- "emerging_threats"
- "alienvault_otx"
actions:
- create_alert:
title: "Malicious IP Communication"
severity: critical
mitre_tactic: "TA0011" # Command and Control
public class ThreatIntelRule implements CorrelationRule {
private final ThreatIntelligenceService threatIntel;
private final String fieldName;
private final int minConfidence;
@Override
public boolean evaluate(LogEvent event, CorrelationContext context) {
String value = event.getField(fieldName);
if (value == null) return false;
ThreatIndicator indicator = threatIntel.lookup(value);
if (indicator != null && indicator.getConfidence() >= minConfidence) {
// Enrich event with threat intel
event.addField("threat_type", indicator.getType());
event.addField("threat_actor", indicator.getActor());
event.addField("threat_confidence", String.valueOf(indicator.getConfidence()));
event.addField("threat_sources", String.join(",", indicator.getSources()));
return true;
}
return false;
}
}
MITRE ATT&CK Integration
All correlation rules map to MITRE ATT&CK framework:public class MitreMapper {
public void enrichAlert(Alert alert, LogEvent event) {
List<MitreTechnique> techniques = identifyTechniques(event, alert);
alert.setMitreTactics(techniques.stream()
.map(MitreTechnique::getTactic)
.distinct()
.collect(Collectors.toList()));
alert.setMitreTechniques(techniques.stream()
.map(t -> new TechniqueReference(
t.getId(),
t.getName(),
t.getUrl()
))
.collect(Collectors.toList()));
}
}
- TA0001: Initial Access
- TA0002: Execution
- TA0003: Persistence
- TA0004: Privilege Escalation
- TA0005: Defense Evasion
- TA0006: Credential Access
- TA0007: Discovery
- TA0008: Lateral Movement
- TA0009: Collection
- TA0010: Exfiltration
- TA0011: Command and Control
- TA0040: Impact
Performance Optimization
1. Rule Indexing
public class RuleIndexer {
private final Map<String, List<CorrelationRule>> categoryIndex;
private final Map<String, List<CorrelationRule>> eventTypeIndex;
private final Map<String, List<CorrelationRule>> severityIndex;
public List<CorrelationRule> getApplicableRules(LogEvent event) {
Set<CorrelationRule> rules = new HashSet<>();
// Index by category
rules.addAll(categoryIndex.getOrDefault(
event.getCategory(), Collections.emptyList()));
// Index by event type
rules.addAll(eventTypeIndex.getOrDefault(
event.getType(), Collections.emptyList()));
return new ArrayList<>(rules);
}
}
2. Parallel Processing
public class ParallelCorrelationEngine {
private final ExecutorService executor;
private final int parallelism;
public List<Alert> correlate(List<LogEvent> events) {
return events.parallelStream()
.flatMap(event -> evaluateEvent(event).stream())
.collect(Collectors.toList());
}
}
3. Caching
@Cacheable("threat-intel")
public ThreatIndicator lookupThreat(String indicator) {
return threatIntelApi.query(indicator);
}
@Cacheable("asset-info")
public Asset getAssetInfo(String ip) {
return assetDatabase.findByIp(ip);
}
Alert Generation
public class AlertGenerator {
public Alert createAlert(CorrelationRule rule, LogEvent event, CorrelationContext context) {
Alert alert = new Alert();
alert.setId(UUID.randomUUID().toString());
alert.setName(rule.getName());
alert.setDescription(renderTemplate(rule.getDescription(), event));
alert.setSeverity(rule.getSeverity());
alert.setTimestamp(Instant.now());
alert.setSource(event.getSource());
alert.setCategory(rule.getCategory());
// Add related events
CorrelationState state = context.getState();
if (state != null) {
alert.setRelatedEvents(state.getRelatedEvents());
}
// MITRE ATT&CK mapping
mitreMapper.enrichAlert(alert, event);
// Threat intelligence
threatIntelEnricher.enrich(alert, event);
// Asset context
assetEnricher.enrich(alert, event);
return alert;
}
}
Next Steps
Data Flow
See how events flow through correlation
Backend API
Learn how alerts are processed
Performance Tuning
Optimize correlation performance
High Availability
Configure correlation for HA