Skip to main content

Overview

The engine/sigma_engine.py module translates Sigma YAML detection rules into Polars LazyFrame expressions and evaluates them against forensic DataFrames at analysis time.

Key Features

  • YAML β†’ Polars: Converts Sigma rules into vectorized Polars expressions (zero Python loops)
  • Field Modifiers: contains, endswith, startswith, all, any, not, re (regex)
  • Boolean Logic: and, or, all of them, 1 of pattern_*
  • EventID Matching: Automatic integer-string normalization for Windows event logs
  • Temporal Correlation: Supports timeframe, correlation, and custom aggregation blocks
  • Evidence Extraction: Returns sample rows + matched columns for forensic review
  • MITRE Integration: Auto-extracts MITRE ATT&CK tags from rule metadata
All rule evaluation is lazy and vectorized. Rules are loaded once and cached in-process.

Scope (v1.2)

Supported Features βœ…

Not Yet Implemented ⏳


Rule Loading

load_sigma_rules()

Walks the rules/sigma/ directory and loads all YAML files. Signature:
def load_sigma_rules(
    rules_dir: Optional[str] = None,
    force_reload: bool = False
) -> list
Parameters:
rules_dir
Optional[str]
Path to rules directory. Defaults to engine/../rules/sigma/
force_reload
bool
default:"false"
Bypass in-memory cache and reload from disk
Returns:
rules
list[dict]
List of parsed Sigma rules. Each dict has keys: title, detection, level, tags, _path, etc.
Example:
from engine.sigma_engine import load_sigma_rules

rules = load_sigma_rules()
print(f"Loaded {len(rules)} rules")
# Output: Loaded 86 rules

# Access first rule
rule = rules[0]
print(rule["title"])
# Output: "Suspicious PowerShell Execution"
Rule Directory Structure:
rules/sigma/
β”œβ”€β”€ windows/
β”‚   β”œβ”€β”€ process_creation/
β”‚   β”‚   β”œβ”€β”€ powershell_suspicious_encoded.yml
β”‚   β”‚   └── ...
β”‚   β”œβ”€β”€ builtin/
β”‚   └── ...
β”œβ”€β”€ linux/
β”œβ”€β”€ macos/
└── network/

Rule Evaluation

match_sigma_rules()

Evaluates all loaded rules against a DataFrame and returns hits. Signature:
def match_sigma_rules(
    df: pl.DataFrame,
    rules: Optional[list] = None
) -> list[dict]
Parameters:
df
pl.DataFrame
Forensic dataset to evaluate (must be collected, not lazy)
rules
Optional[list]
Pre-loaded rules. If None, calls load_sigma_rules() automatically
Returns:
hits
list[dict]
List of detection hits, sorted by severity and match count. Each hit contains:
  • title: Rule name
  • level: "critical", "high", "medium", "low", or "unknown"
  • mitre_technique: e.g., "T1059.001"
  • mitre_tactic: e.g., "Execution"
  • tags: List of tags (normalized to "attack.tXXXX" format)
  • matched_rows: Integer count
  • matched_columns: List of field names that triggered the rule
  • sample_evidence: First 150 matching rows (with _id + detection fields + forensic context)
  • all_row_ids: First 500 row IDs (for grid selection)
  • rule_path: Relative path to YAML file
Example:
from engine.sigma_engine import match_sigma_rules
import polars as pl

df = pl.DataFrame({
    "EventID": ["4688", "4688"],
    "CommandLine": ["powershell.exe -enc <base64>", "cmd.exe /c dir"],
    "ProcessName": ["powershell.exe", "cmd.exe"],
    "User": ["admin", "user1"],
    "Computer": ["DC01", "WKS01"],
    "EventTime": ["2024-01-01 10:00:00", "2024-01-01 10:05:00"]
})

hits = match_sigma_rules(df)
print(f"Detected {len(hits)} threats")

for hit in hits:
    print(f"{hit['title']} β€” {hit['matched_rows']} matches")
    print(f"  MITRE: {hit['mitre_technique']} ({hit['mitre_tactic']})")
    print(f"  Fields: {hit['matched_columns']}")
Output:
Detected 1 threats
Suspicious PowerShell Encoded Command β€” 1 matches
  MITRE: T1059.001 (Execution)
  Fields: ['CommandLine', 'ProcessName']

Field Modifiers

Supported Modifiers

Sigma field syntax: FieldName|modifier|modifier2

Example Rule

detection:
  selection_powershell:
    EventID: 4688
    ProcessName|endswith: '\powershell.exe'
    CommandLine|contains|any:
      - '-enc'
      - '-encodedcommand'
      - 'bypass'
      - 'hidden'
  condition: selection_powershell
Translation:
pl.col("EventID").cast(pl.Utf8).str.replace(r"\.0$", "").is_in(["4688"]) &
pl.col("ProcessName").cast(pl.Utf8).str.ends_with("\\powershell.exe") &
(
  pl.col("CommandLine").cast(pl.Utf8).str.contains("-enc", literal=True) |
  pl.col("CommandLine").cast(pl.Utf8).str.contains("-encodedcommand", literal=True) |
  pl.col("CommandLine").cast(pl.Utf8).str.contains("bypass", literal=True) |
  pl.col("CommandLine").cast(pl.Utf8).str.contains("hidden", literal=True)
)

Boolean Conditions

Condition Syntax

PatternMeaningPolars Expression
a and bBoth conditions must matchexpr_a & expr_b
a or bEither condition matchesexpr_a | expr_b
not aCondition must NOT match~expr_a
all of themAND of all named selectionsreduce(operator.and_, [expr1, expr2, ...])
1 of them / any of themOR of all named selectionsreduce(operator.or_, [expr1, expr2, ...])
1 of pattern_*OR of selections matching wildcardreduce(operator.or_, [matched_exprs])

Example: Multiple Selections

detection:
  selection_image:
    Image|endswith:
      - '\net.exe'
      - '\net1.exe'
  selection_user:
    CommandLine|contains: '/add'
  condition: selection_image and selection_user
Translation:
image_expr = (
  pl.col("Image").cast(pl.Utf8).str.ends_with("\\net.exe") |
  pl.col("Image").cast(pl.Utf8).str.ends_with("\\net1.exe")
)
user_expr = pl.col("CommandLine").cast(pl.Utf8).str.contains("/add", literal=True)
final_expr = image_expr & user_expr

Temporal Correlation

Standard Sigma Format

Uses timeframe + correlation block:
detection:
  selection:
    EventID: 4625  # Logon failure
  timeframe: 5m
  correlation:
    type: event_count
    group-by:
      - Computer
      - TargetUserName
    timespan: 5m
    condition:
      gte: 10  # 10+ failures in 5 minutes
  condition: selection
How It Works:
  1. Filter rows matching selection
  2. Parse time column (auto-detected from ["Time", "Timestamp", "EventTime", ...])
  3. Group by Computer + TargetUserName
  4. Apply group_by_dynamic() with every="5m"
  5. Count events per group per window
  6. Filter groups with count >= 10
  7. Return total matched events

Custom Chronos Format

Uses aggregation block (backward compatibility):
detection:
  selection:
    EventID: 22  # DNS query
    QueryName|endswith:
      - '.onion'
      - '.i2p'
  condition: selection
aggregation:
  group_by:
    - Computer
  time_window: 1h
  threshold: 50  # 50+ Tor/I2P queries in 1 hour

Implementation

Function:
def _evaluate_temporal_correlation(
    df_matched: pl.DataFrame,
    detection: dict,
    rule: dict,
) -> Optional[int]
Parameters:
df_matched
pl.DataFrame
Rows that matched the base detection condition
detection
dict
The detection block from the YAML rule
rule
dict
Full rule dict (for aggregation fallback)
Returns:
adjusted_count
Optional[int]
  • None: No temporal correlation needed
  • 0: No groups exceeded threshold
  • int: Total events in hot windows
Example:
df_matched = pl.DataFrame({
    "EventID": ["4625"] * 15,
    "Computer": ["DC01"] * 15,
    "TargetUserName": ["admin"] * 15,
    "EventTime": [f"2024-01-01 10:{i:02d}:00" for i in range(15)]
})

detection = {
    "timeframe": "5m",
    "correlation": {
        "group-by": ["Computer", "TargetUserName"],
        "timespan": "5m",
        "condition": {"gte": 10}
    }
}

count = _evaluate_temporal_correlation(df_matched, detection, {})
# Returns: 15 (one group with 15 events in 5-min window)

Evidence Extraction

Forensic Context Columns

The engine automatically enriches evidence samples with forensic context fields if present:
FORENSIC_CONTEXT_COLUMNS = [
    "UserName", "User", "AccountName", "SubjectUserName", "TargetUserName",
    "ProcessName", "Image", "NewProcessName", "ParentImage",
    "SourceIP", "IpAddress", "ClientIP", "EndpointIp",
    "CommandLine", "ParentCommandLine",
    "Status", "Result", "LogonType",
    "DestinationHostname", "DestinationIp", "DestPort",
    "ServiceName", "TaskName", "ObjectName",
]

Evidence Structure

Each hit includes:

Row IDs for Grid Selection

all_row_ids
list[int]
First 500 _id values from matched rows. Used by the frontend to:
  • Auto-select rows in the data grid
  • Enable β€œView all in Grid” button
  • Support forensic drill-down workflows

MITRE ATT&CK Integration

Tag Normalization

The engine normalizes MITRE tags to a consistent format:
# Input tags in rule YAML:
tags:
  - mitre.t1059.001
  - attack.execution

# Normalized to:
tags:
  - attack.t1059.001
  - attack.execution

Extraction Logic

Example Rule with MITRE

title: Mimikatz Credential Dumping
level: critical
tags:
  - attack.t1003.001
  - attack.credential_access
custom:
  mitre_technique: T1003.001
  mitre_tactic: Credential Access
detection:
  selection:
    CommandLine|contains: 'sekurlsa::logonpasswords'
  condition: selection
Hit Output:
{
  "title": "Mimikatz Credential Dumping",
  "level": "critical",
  "mitre_technique": "T1003.001",
  "mitre_tactic": "Credential Access",
  "tags": ["attack.t1003.001", "attack.credential_access"],
  "matched_rows": 3,
  ...
}

Performance Optimization

Lazy Evaluation

All expressions are lazy until collect() is called on the filtered DataFrame:
# Rule compilation (fast β€” no data processing)
named_exprs = {
    "selection": _build_named_condition(rule["detection"]["selection"], columns)
}
final_expr = _parse_condition_string("selection", named_exprs)

# Evaluation (lazy)
df_matched = df.filter(final_expr)  # No execution yet

# Only when needed:
match_count = df_matched.height  # Triggers execution

Vectorized Operations

All string operations use Polars’ native vectorized expressions:
# ❌ SLOW (Python loop):
df["match"] = ["-enc" in str(cmd) for cmd in df["CommandLine"]]

# βœ… FAST (vectorized):
pl.col("CommandLine").cast(pl.Utf8).str.contains("-enc", literal=True)

Caching

Rules are loaded once and cached in-process:
_RULES_CACHE: Optional[list] = None

def load_sigma_rules(...):
    global _RULES_CACHE
    if _RULES_CACHE is not None and not force_reload:
        return _RULES_CACHE
    # ... load from disk
    _RULES_CACHE = rules
    return rules
Memory Usage: ~1-2 MB for 86 rules.

Error Handling

Graceful Degradation

If a rule fails to compile or evaluate, it’s silently skipped (logged at DEBUG level):
try:
    df_matched = df.filter(final_expr)
    match_count = df_matched.height
except Exception as exc:
    logger.debug(f"Sigma rule '{rule.get('title', '?')}' eval error: {exc}")
    continue  # Skip this rule, continue with others

Common Issues


Example: Custom Detection Rule

Rule YAML

title: Suspicious Remote Thread Injection
level: high
tags:
  - attack.t1055.002
  - attack.defense_evasion
custom:
  mitre_technique: T1055.002
  mitre_tactic: Defense Evasion
detection:
  selection:
    EventID: 8  # Sysmon CreateRemoteThread
    TargetImage|endswith:
      - '\lsass.exe'
      - '\winlogon.exe'
      - '\csrss.exe'
  filter_system:
    SourceImage|startswith:
      - 'C:\Windows\System32\'
      - 'C:\Windows\SysWOW64\'
  condition: selection and not filter_system

Evaluation Code

from engine.sigma_engine import load_sigma_rules, match_sigma_rules
import polars as pl

# Load all rules
rules = load_sigma_rules()

# Create test data
df = pl.DataFrame({
    "EventID": ["8", "8", "8"],
    "SourceImage": [
        "C:\\Users\\admin\\malware.exe",
        "C:\\Windows\\System32\\svchost.exe",
        "C:\\Temp\\inject.exe"
    ],
    "TargetImage": [
        "C:\\Windows\\System32\\lsass.exe",
        "C:\\Windows\\System32\\lsass.exe",
        "C:\\Windows\\System32\\winlogon.exe"
    ],
    "EventTime": [
        "2024-01-01 10:00:00",
        "2024-01-01 10:05:00",
        "2024-01-01 10:10:00"
    ],
    "_id": [1, 2, 3]
})

# Evaluate
hits = match_sigma_rules(df, rules)

# Results
for hit in hits:
    if "Remote Thread" in hit["title"]:
        print(f"🚨 {hit['title']}")
        print(f"   Matched: {hit['matched_rows']} events")
        print(f"   MITRE: {hit['mitre_technique']}")
        print(f"   Evidence:")
        for ev in hit["sample_evidence"]:
            print(f"     {ev['EventTime']} | {ev['SourceImage']} β†’ {ev['TargetImage']}")
Output:
🚨 Suspicious Remote Thread Injection
   Matched: 2 events
   MITRE: T1055.002
   Evidence:
     2024-01-01 10:00:00 | C:\Users\admin\malware.exe β†’ C:\Windows\System32\lsass.exe
     2024-01-01 10:10:00 | C:\Temp\inject.exe β†’ C:\Windows\System32\winlogon.exe

API Reference Summary

FunctionInputOutputPurpose
load_sigma_rules()rules_dir, force_reloadlist[dict]Load YAML rules from disk
match_sigma_rules()df, ruleslist[dict]Evaluate all rules against data
_field_expr()col_name, columnsOptional[pl.Expr]Case-insensitive column lookup
_apply_modifier()expr, modifier, valuespl.ExprApply |contains, |endswith, etc.
_build_field_condition()field_raw, values, columnsOptional[pl.Expr]Parse Field|modifier|not
_build_named_condition()named_block, columnsOptional[pl.Expr]AND all fields in selection block
_parse_condition_string()condition_str, named_exprsOptional[pl.Expr]Parse a and b or c
_evaluate_temporal_correlation()df_matched, detection, ruleOptional[int]Apply timeframe + aggregation
_find_time_column()columnsOptional[str]Locate timestamp column

Best Practices

Rule Writing Guidelines:
  1. Always specify level (critical, high, medium, low)
  2. Use custom.mitre_technique for explicit MITRE mapping
  3. Test rules with _evaluate_temporal_correlation() for brute-force detection
  4. Use |contains|all for multi-token matching (e.g., β€œpowershell” AND β€œ-enc”)
  5. Add filter_ selections to reduce false positives

Performance Tips


Build docs developers (and LLMs) love