Skip to main content
The UTMStack correlation engine is a custom-built, high-performance system that analyzes security events in real-time before ingestion into the data store. This approach is a key differentiator that reduces overhead and dramatically improves threat detection speed.

Overview

Unlike traditional SIEM systems that correlate data after storage, UTMStack correlates events during the ingestion pipeline: Benefits:
  • Immediate threat detection (sub-second response)
  • Reduced storage requirements (only store relevant data)
  • Lower computational overhead
  • Real-time alerting without query delays

Architecture

Core Components

1. Event Classifier

Categorizes incoming events for efficient routing to appropriate correlation rules:
public class EventClassifier {
    private final Map<String, EventCategory> categoryCache;
    private final List<ClassificationRule> rules;
    
    public EventCategory classify(LogEvent event) {
        // Check cache first
        String cacheKey = generateCacheKey(event);
        if (categoryCache.containsKey(cacheKey)) {
            return categoryCache.get(cacheKey);
        }
        
        // Apply classification rules
        for (ClassificationRule rule : rules) {
            if (rule.matches(event)) {
                EventCategory category = rule.getCategory();
                categoryCache.put(cacheKey, category);
                return category;
            }
        }
        
        return EventCategory.UNKNOWN;
    }
}
Event Categories:
  • Authentication events
  • Network traffic
  • File operations
  • Process execution
  • Privilege escalation
  • Data access
  • System changes

2. State Manager

Maintains session state and context for stateful correlation:
public class StateManager {
    private final Cache<String, CorrelationState> stateCache;
    private final int defaultTTL = 3600; // 1 hour
    
    public CorrelationState getOrCreateState(String key, int ttl) {
        CorrelationState state = stateCache.getIfPresent(key);
        
        if (state == null) {
            state = new CorrelationState();
            stateCache.put(key, state);
            
            // Schedule cleanup
            scheduleExpiration(key, ttl > 0 ? ttl : defaultTTL);
        }
        
        return state;
    }
    
    public void updateState(String key, Consumer<CorrelationState> updater) {
        CorrelationState state = getOrCreateState(key, defaultTTL);
        synchronized (state) {
            updater.accept(state);
        }
    }
}

public class CorrelationState {
    private Map<String, Object> attributes;
    private List<LogEvent> relatedEvents;
    private Instant firstSeen;
    private Instant lastSeen;
    private int eventCount;
    
    public void addEvent(LogEvent event) {
        relatedEvents.add(event);
        lastSeen = Instant.now();
        eventCount++;
    }
    
    public boolean exceeds(String attribute, int threshold) {
        Integer count = (Integer) attributes.getOrDefault(attribute, 0);
        return count >= threshold;
    }
}

3. Rule Engine

Executes correlation rules against incoming events:
public interface CorrelationRule {
    String getId();
    String getName();
    String getDescription();
    RuleSeverity getSeverity();
    List<String> getCategories();
    boolean evaluate(LogEvent event, CorrelationContext context);
    Alert generateAlert(LogEvent event, CorrelationContext context);
}

public class RuleEngine {
    private final Map<String, List<CorrelationRule>> rulesByCategory;
    private final StateManager stateManager;
    private final ExecutorService executor;
    
    public List<Alert> evaluateEvent(LogEvent event) {
        List<Alert> alerts = new ArrayList<>();
        EventCategory category = event.getCategory();
        
        // Get applicable rules
        List<CorrelationRule> rules = rulesByCategory.getOrDefault(
            category.getName(), 
            Collections.emptyList()
        );
        
        // Evaluate rules in parallel
        List<Future<Optional<Alert>>> futures = new ArrayList<>();
        
        for (CorrelationRule rule : rules) {
            futures.add(executor.submit(() -> evaluateRule(rule, event)));
        }
        
        // Collect results
        for (Future<Optional<Alert>> future : futures) {
            try {
                future.get(100, TimeUnit.MILLISECONDS).ifPresent(alerts::add);
            } catch (Exception e) {
                log.error("Rule evaluation failed", e);
            }
        }
        
        return alerts;
    }
    
    private Optional<Alert> evaluateRule(CorrelationRule rule, LogEvent event) {
        CorrelationContext context = new CorrelationContext(
            event,
            stateManager,
            assetDatabase,
            threatIntelService
        );
        
        if (rule.evaluate(event, context)) {
            return Optional.of(rule.generateAlert(event, context));
        }
        
        return Optional.empty();
    }
}

Correlation Rule Types

1. Simple Threshold Rules

Detect when an event count exceeds a threshold:
rule:
  id: "failed-login-threshold"
  name: "Excessive Failed Login Attempts"
  description: "Multiple failed login attempts from single source"
  severity: high
  category: authentication
  
  conditions:
    event_type: "authentication_failure"
    groupby:
      - source_ip
      - username
    threshold: 5
    timewindow: 300s  # 5 minutes
  
  actions:
    - create_alert:
        title: "Brute Force Attack Detected"
        description: "{{threshold}} failed login attempts for user {{username}} from {{source_ip}}"
        severity: high
        mitre_technique: "T1110"  # Brute Force
Implementation:
public class ThresholdRule implements CorrelationRule {
    private final String eventType;
    private final List<String> groupByFields;
    private final int threshold;
    private final Duration timeWindow;
    
    @Override
    public boolean evaluate(LogEvent event, CorrelationContext context) {
        if (!event.getType().equals(eventType)) {
            return false;
        }
        
        // Generate state key from groupby fields
        String stateKey = generateStateKey(event, groupByFields);
        
        // Update state
        context.getStateManager().updateState(stateKey, state -> {
            state.addEvent(event);
        });
        
        // Check threshold
        CorrelationState state = context.getStateManager().getState(stateKey);
        return state.getEventCount() >= threshold;
    }
}

2. Sequence Rules

Detect specific event sequences (A followed by B within timeframe):
rule:
  id: "privilege-escalation-sequence"
  name: "Privilege Escalation Attempt"
  description: "User gained elevated privileges after failed access"
  severity: critical
  
  sequence:
    - event: "access_denied"
      fields:
        resource_type: "admin_panel"
      alias: "denied"
    
    - event: "privilege_change"
      fields:
        new_privilege: "administrator"
      alias: "escalation"
      where: "escalation.username == denied.username"
      within: 600s
  
  actions:
    - create_alert:
        title: "Suspicious Privilege Escalation"
        severity: critical
        mitre_tactic: "TA0004"  # Privilege Escalation
Implementation:
public class SequenceRule implements CorrelationRule {
    private final List<SequenceStep> steps;
    
    @Override
    public boolean evaluate(LogEvent event, CorrelationContext context) {
        // Find which step this event matches
        for (int i = 0; i < steps.size(); i++) {
            SequenceStep step = steps.get(i);
            
            if (step.matches(event)) {
                String stateKey = generateSequenceKey(event);
                
                context.getStateManager().updateState(stateKey, state -> {
                    state.recordStep(i, event);
                });
                
                // Check if sequence is complete
                CorrelationState state = context.getStateManager().getState(stateKey);
                if (state.hasCompletedSequence(steps)) {
                    return true;
                }
            }
        }
        
        return false;
    }
}

3. Anomaly Detection Rules

Detect deviations from learned baselines:
rule:
  id: "unusual-data-transfer"
  name: "Unusual Data Transfer Volume"
  description: "Data transfer exceeds normal baseline"
  severity: medium
  
  anomaly:
    metric: "bytes_transferred"
    groupby:
      - username
      - destination
    baseline_period: 7d
    threshold: 3.0  # Standard deviations
    min_samples: 100
  
  actions:
    - create_alert:
        title: "Anomalous Data Transfer"
        description: "User {{username}} transferred {{bytes_transferred}} bytes ({{std_dev}}x normal)"
Implementation:
public class AnomalyDetectionRule implements CorrelationRule {
    private final BaselineManager baselineManager;
    private final String metric;
    private final double stdDevThreshold;
    
    @Override
    public boolean evaluate(LogEvent event, CorrelationContext context) {
        String baselineKey = generateBaselineKey(event);
        Baseline baseline = baselineManager.getBaseline(baselineKey);
        
        if (baseline == null || !baseline.hasSufficientSamples()) {
            // Learn from this event
            baselineManager.recordSample(baselineKey, getMetricValue(event));
            return false;
        }
        
        double value = getMetricValue(event);
        double zScore = baseline.calculateZScore(value);
        
        if (Math.abs(zScore) >= stdDevThreshold) {
            event.addField("std_dev", String.valueOf(zScore));
            return true;
        }
        
        // Update baseline
        baselineManager.recordSample(baselineKey, value);
        return false;
    }
}

public class Baseline {
    private double mean;
    private double stdDev;
    private int sampleCount;
    private final int minSamples;
    
    public double calculateZScore(double value) {
        if (stdDev == 0) return 0;
        return (value - mean) / stdDev;
    }
    
    public void addSample(double value) {
        // Update running statistics
        sampleCount++;
        double delta = value - mean;
        mean += delta / sampleCount;
        double delta2 = value - mean;
        // ... update variance
    }
}

4. Threat Intelligence Rules

Match against known threat indicators:
rule:
  id: "known-malicious-ip"
  name: "Communication with Known Malicious IP"
  description: "Detected connection to known threat actor IP"
  severity: critical
  
  threat_intel:
    field: "destination_ip"
    indicator_type: "ip"
    min_confidence: 80
    sources:
      - "abuse_ch"
      - "emerging_threats"
      - "alienvault_otx"
  
  actions:
    - create_alert:
        title: "Malicious IP Communication"
        severity: critical
        mitre_tactic: "TA0011"  # Command and Control
Implementation:
public class ThreatIntelRule implements CorrelationRule {
    private final ThreatIntelligenceService threatIntel;
    private final String fieldName;
    private final int minConfidence;
    
    @Override
    public boolean evaluate(LogEvent event, CorrelationContext context) {
        String value = event.getField(fieldName);
        if (value == null) return false;
        
        ThreatIndicator indicator = threatIntel.lookup(value);
        
        if (indicator != null && indicator.getConfidence() >= minConfidence) {
            // Enrich event with threat intel
            event.addField("threat_type", indicator.getType());
            event.addField("threat_actor", indicator.getActor());
            event.addField("threat_confidence", String.valueOf(indicator.getConfidence()));
            event.addField("threat_sources", String.join(",", indicator.getSources()));
            return true;
        }
        
        return false;
    }
}

MITRE ATT&CK Integration

All correlation rules map to MITRE ATT&CK framework:
public class MitreMapper {
    public void enrichAlert(Alert alert, LogEvent event) {
        List<MitreTechnique> techniques = identifyTechniques(event, alert);
        
        alert.setMitreTactics(techniques.stream()
            .map(MitreTechnique::getTactic)
            .distinct()
            .collect(Collectors.toList()));
        
        alert.setMitreTechniques(techniques.stream()
            .map(t -> new TechniqueReference(
                t.getId(),
                t.getName(),
                t.getUrl()
            ))
            .collect(Collectors.toList()));
    }
}
MITRE Tactics:
  • TA0001: Initial Access
  • TA0002: Execution
  • TA0003: Persistence
  • TA0004: Privilege Escalation
  • TA0005: Defense Evasion
  • TA0006: Credential Access
  • TA0007: Discovery
  • TA0008: Lateral Movement
  • TA0009: Collection
  • TA0010: Exfiltration
  • TA0011: Command and Control
  • TA0040: Impact

Performance Optimization

1. Rule Indexing

public class RuleIndexer {
    private final Map<String, List<CorrelationRule>> categoryIndex;
    private final Map<String, List<CorrelationRule>> eventTypeIndex;
    private final Map<String, List<CorrelationRule>> severityIndex;
    
    public List<CorrelationRule> getApplicableRules(LogEvent event) {
        Set<CorrelationRule> rules = new HashSet<>();
        
        // Index by category
        rules.addAll(categoryIndex.getOrDefault(
            event.getCategory(), Collections.emptyList()));
        
        // Index by event type
        rules.addAll(eventTypeIndex.getOrDefault(
            event.getType(), Collections.emptyList()));
        
        return new ArrayList<>(rules);
    }
}

2. Parallel Processing

public class ParallelCorrelationEngine {
    private final ExecutorService executor;
    private final int parallelism;
    
    public List<Alert> correlate(List<LogEvent> events) {
        return events.parallelStream()
            .flatMap(event -> evaluateEvent(event).stream())
            .collect(Collectors.toList());
    }
}

3. Caching

@Cacheable("threat-intel")
public ThreatIndicator lookupThreat(String indicator) {
    return threatIntelApi.query(indicator);
}

@Cacheable("asset-info")
public Asset getAssetInfo(String ip) {
    return assetDatabase.findByIp(ip);
}

Alert Generation

public class AlertGenerator {
    public Alert createAlert(CorrelationRule rule, LogEvent event, CorrelationContext context) {
        Alert alert = new Alert();
        alert.setId(UUID.randomUUID().toString());
        alert.setName(rule.getName());
        alert.setDescription(renderTemplate(rule.getDescription(), event));
        alert.setSeverity(rule.getSeverity());
        alert.setTimestamp(Instant.now());
        alert.setSource(event.getSource());
        alert.setCategory(rule.getCategory());
        
        // Add related events
        CorrelationState state = context.getState();
        if (state != null) {
            alert.setRelatedEvents(state.getRelatedEvents());
        }
        
        // MITRE ATT&CK mapping
        mitreMapper.enrichAlert(alert, event);
        
        // Threat intelligence
        threatIntelEnricher.enrich(alert, event);
        
        // Asset context
        assetEnricher.enrich(alert, event);
        
        return alert;
    }
}

Next Steps

Data Flow

See how events flow through correlation

Backend API

Learn how alerts are processed

Performance Tuning

Optimize correlation performance

High Availability

Configure correlation for HA

Build docs developers (and LLMs) love