Overview
The engine/sigma_engine.py module translates Sigma YAML detection rules into Polars LazyFrame expressions and evaluates them against forensic DataFrames at analysis time.
Key Features
YAML β Polars : Converts Sigma rules into vectorized Polars expressions (zero Python loops)
Field Modifiers : contains, endswith, startswith, all, any, not, re (regex)
Boolean Logic : and, or, all of them, 1 of pattern_*
EventID Matching : Automatic integer-string normalization for Windows event logs
Temporal Correlation : Supports timeframe, correlation, and custom aggregation blocks
Evidence Extraction : Returns sample rows + matched columns for forensic review
MITRE Integration : Auto-extracts MITRE ATT&CK tags from rule metadata
All rule evaluation is lazy and vectorized . Rules are loaded once and cached in-process.
Scope (v1.2)
Supported Features β
Show Detection Constructs
Field modifiers : |contains, |endswith, |startswith, |any, |all, |not, |re
EventID lists : Automatic is_in() with string normalization
Boolean conditions : and, or, not
Selection patterns : all of them, 1 of them, any of pattern_*
Temporal correlation : timeframe + correlation block with group-by, timespan, threshold
Custom aggregation : aggregation block with group_by, time_window, threshold
Metadata extraction : title, level, tags, custom.mitre_technique, custom.mitre_tactic
Not Yet Implemented β³
near queries (proximity searches)
base64offset modifier
cidr IP range matching
Complex condition logic (parentheses, precedence overrides)
Rule Loading
load_sigma_rules()
Walks the rules/sigma/ directory and loads all YAML files.
Signature:
def load_sigma_rules (
rules_dir : Optional[ str ] = None ,
force_reload : bool = False
) -> list
Parameters:
Path to rules directory. Defaults to engine/../rules/sigma/
Bypass in-memory cache and reload from disk
Returns:
List of parsed Sigma rules. Each dict has keys: title, detection, level, tags, _path, etc.
Example:
from engine.sigma_engine import load_sigma_rules
rules = load_sigma_rules()
print ( f "Loaded { len (rules) } rules" )
# Output: Loaded 86 rules
# Access first rule
rule = rules[ 0 ]
print (rule[ "title" ])
# Output: "Suspicious PowerShell Execution"
Rule Directory Structure:
rules/sigma/
βββ windows/
β βββ process_creation/
β β βββ powershell_suspicious_encoded.yml
β β βββ ...
β βββ builtin/
β βββ ...
βββ linux/
βββ macos/
βββ network/
Rule Evaluation
match_sigma_rules()
Evaluates all loaded rules against a DataFrame and returns hits.
Signature:
def match_sigma_rules (
df : pl.DataFrame,
rules : Optional[ list ] = None
) -> list[ dict ]
Parameters:
Forensic dataset to evaluate (must be collected, not lazy)
Pre-loaded rules. If None, calls load_sigma_rules() automatically
Returns:
List of detection hits, sorted by severity and match count. Each hit contains:
title: Rule name
level: "critical", "high", "medium", "low", or "unknown"
mitre_technique: e.g., "T1059.001"
mitre_tactic: e.g., "Execution"
tags: List of tags (normalized to "attack.tXXXX" format)
matched_rows: Integer count
matched_columns: List of field names that triggered the rule
sample_evidence: First 150 matching rows (with _id + detection fields + forensic context)
all_row_ids: First 500 row IDs (for grid selection)
rule_path: Relative path to YAML file
Example:
from engine.sigma_engine import match_sigma_rules
import polars as pl
df = pl.DataFrame({
"EventID" : [ "4688" , "4688" ],
"CommandLine" : [ "powershell.exe -enc <base64>" , "cmd.exe /c dir" ],
"ProcessName" : [ "powershell.exe" , "cmd.exe" ],
"User" : [ "admin" , "user1" ],
"Computer" : [ "DC01" , "WKS01" ],
"EventTime" : [ "2024-01-01 10:00:00" , "2024-01-01 10:05:00" ]
})
hits = match_sigma_rules(df)
print ( f "Detected { len (hits) } threats" )
for hit in hits:
print ( f " { hit[ 'title' ] } β { hit[ 'matched_rows' ] } matches" )
print ( f " MITRE: { hit[ 'mitre_technique' ] } ( { hit[ 'mitre_tactic' ] } )" )
print ( f " Fields: { hit[ 'matched_columns' ] } " )
Output:
Detected 1 threats
Suspicious PowerShell Encoded Command β 1 matches
MITRE: T1059.001 (Execution)
Fields: ['CommandLine', 'ProcessName']
Field Modifiers
Supported Modifiers
Sigma field syntax: FieldName|modifier|modifier2
Modifier Description Polars Expression containsSubstring match str.contains(value, literal=True)contains|anyMatch any value in list (OR) OR(str.contains(v1), str.contains(v2), ...)contains|allMatch all values in list (AND) AND(str.contains(v1), str.contains(v2), ...)endswithSuffix match str.ends_with(value)endswith|anyAny suffix (OR) OR(str.ends_with(v1), ...)endswith|allAll suffixes (AND) AND(str.ends_with(v1), ...)startswithPrefix match str.starts_with(value)startswith|anyAny prefix (OR) OR(str.starts_with(v1), ...)startswith|allAll prefixes (AND) AND(str.starts_with(v1), ...)reRegex match str.contains(pattern, literal=False)notNegate condition ~(expression)
Example Rule
detection :
selection_powershell :
EventID : 4688
ProcessName|endswith : '\powershell.exe'
CommandLine|contains|any :
- '-enc'
- '-encodedcommand'
- 'bypass'
- 'hidden'
condition : selection_powershell
Translation:
pl.col( "EventID" ).cast(pl.Utf8).str.replace( r " \. 0 $ " , "" ).is_in([ "4688" ]) &
pl.col( "ProcessName" ).cast(pl.Utf8).str.ends_with( " \\ powershell.exe" ) &
(
pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "-enc" , literal = True ) |
pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "-encodedcommand" , literal = True ) |
pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "bypass" , literal = True ) |
pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "hidden" , literal = True )
)
Boolean Conditions
Condition Syntax
Pattern Meaning Polars Expression a and bBoth conditions must match expr_a & expr_ba or bEither condition matches expr_a | expr_bnot aCondition must NOT match ~expr_aall of themAND of all named selections reduce(operator.and_, [expr1, expr2, ...])1 of them / any of themOR of all named selections reduce(operator.or_, [expr1, expr2, ...])1 of pattern_*OR of selections matching wildcard reduce(operator.or_, [matched_exprs])
Example: Multiple Selections
detection :
selection_image :
Image|endswith :
- '\net.exe'
- '\net1.exe'
selection_user :
CommandLine|contains : '/add'
condition : selection_image and selection_user
Translation:
image_expr = (
pl.col( "Image" ).cast(pl.Utf8).str.ends_with( " \\ net.exe" ) |
pl.col( "Image" ).cast(pl.Utf8).str.ends_with( " \\ net1.exe" )
)
user_expr = pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "/add" , literal = True )
final_expr = image_expr & user_expr
Temporal Correlation
Uses timeframe + correlation block:
detection :
selection :
EventID : 4625 # Logon failure
timeframe : 5m
correlation :
type : event_count
group-by :
- Computer
- TargetUserName
timespan : 5m
condition :
gte : 10 # 10+ failures in 5 minutes
condition : selection
How It Works:
Filter rows matching selection
Parse time column (auto-detected from ["Time", "Timestamp", "EventTime", ...])
Group by Computer + TargetUserName
Apply group_by_dynamic() with every="5m"
Count events per group per window
Filter groups with count >= 10
Return total matched events
Uses aggregation block (backward compatibility):
detection :
selection :
EventID : 22 # DNS query
QueryName|endswith :
- '.onion'
- '.i2p'
condition : selection
aggregation :
group_by :
- Computer
time_window : 1h
threshold : 50 # 50+ Tor/I2P queries in 1 hour
Implementation
Function:
def _evaluate_temporal_correlation (
df_matched : pl.DataFrame,
detection : dict ,
rule : dict ,
) -> Optional[ int ]
Parameters:
Rows that matched the base detection condition
The detection block from the YAML rule
Full rule dict (for aggregation fallback)
Returns:
None: No temporal correlation needed
0: No groups exceeded threshold
int: Total events in hot windows
Example:
df_matched = pl.DataFrame({
"EventID" : [ "4625" ] * 15 ,
"Computer" : [ "DC01" ] * 15 ,
"TargetUserName" : [ "admin" ] * 15 ,
"EventTime" : [ f "2024-01-01 10: { i :02d} :00" for i in range ( 15 )]
})
detection = {
"timeframe" : "5m" ,
"correlation" : {
"group-by" : [ "Computer" , "TargetUserName" ],
"timespan" : "5m" ,
"condition" : { "gte" : 10 }
}
}
count = _evaluate_temporal_correlation(df_matched, detection, {})
# Returns: 15 (one group with 15 events in 5-min window)
Forensic Context Columns
The engine automatically enriches evidence samples with forensic context fields if present:
FORENSIC_CONTEXT_COLUMNS = [
"UserName" , "User" , "AccountName" , "SubjectUserName" , "TargetUserName" ,
"ProcessName" , "Image" , "NewProcessName" , "ParentImage" ,
"SourceIP" , "IpAddress" , "ClientIP" , "EndpointIp" ,
"CommandLine" , "ParentCommandLine" ,
"Status" , "Result" , "LogonType" ,
"DestinationHostname" , "DestinationIp" , "DestPort" ,
"ServiceName" , "TaskName" , "ObjectName" ,
]
Evidence Structure
Each hit includes:
Show sample_evidence Structure
Type : list[dict]
Max Rows : 150
Columns Included (in order):
_id (row ID for grid linking)
Time column (if found: Time, Timestamp, EventTime, etc.)
Matched detection fields (fields referenced in the ruleβs detection block)
Forensic context columns (from FORENSIC_CONTEXT_COLUMNS if present, up to 12 total columns)
Example: [
{
"_id" : 42 ,
"EventTime" : "2024-01-01 10:00:00" ,
"CommandLine" : "powershell.exe -enc <base64>" ,
"ProcessName" : "powershell.exe" ,
"UserName" : "admin" ,
"Computer" : "DC01" ,
"EventID" : "4688"
},
...
]
Row IDs for Grid Selection
First 500 _id values from matched rows. Used by the frontend to:
Auto-select rows in the data grid
Enable βView all in Gridβ button
Support forensic drill-down workflows
MITRE ATT&CK Integration
Tag Normalization
The engine normalizes MITRE tags to a consistent format:
# Input tags in rule YAML:
tags:
- mitre.t1059.001
- attack.execution
# Normalized to:
tags:
- attack.t1059.001
- attack.execution
Show MITRE Field Priority
custom.mitre_technique (explicit override in YAML)
First tag starting with "attack.t" (e.g., "attack.t1059.001")
Empty string (unmapped)
Tactic Extraction:
custom.mitre_tactic (explicit override)
First tag starting with "attack." but NOT "attack.t" (e.g., "attack.execution")
Empty string
Example Rule with MITRE
title : Mimikatz Credential Dumping
level : critical
tags :
- attack.t1003.001
- attack.credential_access
custom :
mitre_technique : T1003.001
mitre_tactic : Credential Access
detection :
selection :
CommandLine|contains : 'sekurlsa::logonpasswords'
condition : selection
Hit Output:
{
"title" : "Mimikatz Credential Dumping" ,
"level" : "critical" ,
"mitre_technique" : "T1003.001" ,
"mitre_tactic" : "Credential Access" ,
"tags" : [ "attack.t1003.001" , "attack.credential_access" ],
"matched_rows" : 3 ,
...
}
Lazy Evaluation
All expressions are lazy until collect() is called on the filtered DataFrame:
# Rule compilation (fast β no data processing)
named_exprs = {
"selection" : _build_named_condition(rule[ "detection" ][ "selection" ], columns)
}
final_expr = _parse_condition_string( "selection" , named_exprs)
# Evaluation (lazy)
df_matched = df.filter(final_expr) # No execution yet
# Only when needed:
match_count = df_matched.height # Triggers execution
Vectorized Operations
All string operations use Polarsβ native vectorized expressions:
# β SLOW (Python loop):
df[ "match" ] = [ "-enc" in str (cmd) for cmd in df[ "CommandLine" ]]
# β
FAST (vectorized):
pl.col( "CommandLine" ).cast(pl.Utf8).str.contains( "-enc" , literal = True )
Caching
Rules are loaded once and cached in-process:
_RULES_CACHE : Optional[ list ] = None
def load_sigma_rules (...):
global _RULES_CACHE
if _RULES_CACHE is not None and not force_reload:
return _RULES_CACHE
# ... load from disk
_RULES_CACHE = rules
return rules
Memory Usage: ~1-2 MB for 86 rules.
Error Handling
Graceful Degradation
If a rule fails to compile or evaluate, itβs silently skipped (logged at DEBUG level):
try :
df_matched = df.filter(final_expr)
match_count = df_matched.height
except Exception as exc:
logger.debug( f "Sigma rule ' { rule.get( 'title' , '?' ) } ' eval error: { exc } " )
continue # Skip this rule, continue with others
Common Issues
Error Cause Solution ColumnNotFoundErrorRule references field not in dataset Rule is skipped automatically ComputeErrorInvalid regex pattern in |re Fix regex in YAML rule SchemaFieldNotFoundErrorTemporal correlation on data without time column Add time column or remove timeframe Empty sample_evidence All matched rows have null in detection fields Check data quality
Example: Custom Detection Rule
Rule YAML
title : Suspicious Remote Thread Injection
level : high
tags :
- attack.t1055.002
- attack.defense_evasion
custom :
mitre_technique : T1055.002
mitre_tactic : Defense Evasion
detection :
selection :
EventID : 8 # Sysmon CreateRemoteThread
TargetImage|endswith :
- '\lsass.exe'
- '\winlogon.exe'
- '\csrss.exe'
filter_system :
SourceImage|startswith :
- 'C:\Windows\System32\'
- 'C:\Windows\SysWOW64\'
condition : selection and not filter_system
Evaluation Code
from engine.sigma_engine import load_sigma_rules, match_sigma_rules
import polars as pl
# Load all rules
rules = load_sigma_rules()
# Create test data
df = pl.DataFrame({
"EventID" : [ "8" , "8" , "8" ],
"SourceImage" : [
"C: \\ Users \\ admin \\ malware.exe" ,
"C: \\ Windows \\ System32 \\ svchost.exe" ,
"C: \\ Temp \\ inject.exe"
],
"TargetImage" : [
"C: \\ Windows \\ System32 \\ lsass.exe" ,
"C: \\ Windows \\ System32 \\ lsass.exe" ,
"C: \\ Windows \\ System32 \\ winlogon.exe"
],
"EventTime" : [
"2024-01-01 10:00:00" ,
"2024-01-01 10:05:00" ,
"2024-01-01 10:10:00"
],
"_id" : [ 1 , 2 , 3 ]
})
# Evaluate
hits = match_sigma_rules(df, rules)
# Results
for hit in hits:
if "Remote Thread" in hit[ "title" ]:
print ( f "π¨ { hit[ 'title' ] } " )
print ( f " Matched: { hit[ 'matched_rows' ] } events" )
print ( f " MITRE: { hit[ 'mitre_technique' ] } " )
print ( f " Evidence:" )
for ev in hit[ "sample_evidence" ]:
print ( f " { ev[ 'EventTime' ] } | { ev[ 'SourceImage' ] } β { ev[ 'TargetImage' ] } " )
Output:
π¨ Suspicious Remote Thread Injection
Matched: 2 events
MITRE: T1055.002
Evidence:
2024-01-01 10:00:00 | C:\Users\admin\malware.exe β C:\Windows\System32\lsass.exe
2024-01-01 10:10:00 | C:\Temp\inject.exe β C:\Windows\System32\winlogon.exe
API Reference Summary
Function Input Output Purpose load_sigma_rules()rules_dir, force_reloadlist[dict]Load YAML rules from disk match_sigma_rules()df, ruleslist[dict]Evaluate all rules against data _field_expr()col_name, columnsOptional[pl.Expr]Case-insensitive column lookup _apply_modifier()expr, modifier, valuespl.ExprApply |contains, |endswith, etc. _build_field_condition()field_raw, values, columnsOptional[pl.Expr]Parse Field|modifier|not _build_named_condition()named_block, columnsOptional[pl.Expr]AND all fields in selection block _parse_condition_string()condition_str, named_exprsOptional[pl.Expr]Parse a and b or c _evaluate_temporal_correlation()df_matched, detection, ruleOptional[int]Apply timeframe + aggregation _find_time_column()columnsOptional[str]Locate timestamp column
Best Practices
Rule Writing Guidelines:
Always specify level (critical, high, medium, low)
Use custom.mitre_technique for explicit MITRE mapping
Test rules with _evaluate_temporal_correlation() for brute-force detection
Use |contains|all for multi-token matching (e.g., βpowershellβ AND β-encβ)
Add filter_ selections to reduce false positives
Show Optimization Guidelines
Use EventID filtering first (fastest column for Windows logs)
Prefer |endswith over |contains for process paths (more selective)
Combine multiple conditions into one selection block (fewer filters)
Avoid broad regex patterns (.*) β use literal strings when possible
Use group-by in temporal correlation to limit memory usage