Skip to main content

Overview

Chronos-DFIR features a 100% offline detection engine with 86 Sigma rules and 7 YARA rule sets covering the full MITRE ATT&CK matrix. All detection happens in real-time during file ingestion with zero external dependencies.

Sigma Detection Engine

The Sigma engine (engine/sigma_engine.py) translates YAML detection rules into Polars expressions for vectorized evaluation:
# From engine/sigma_engine.py:418-434
def match_sigma_rules(df: pl.DataFrame, rules: Optional[list] = None) -> list[dict]:
    """
    Evaluate all loaded Sigma rules against a DataFrame.
    Returns list of hits with title, level, MITRE technique, matched rows.
    """
    if rules is None:
        rules = load_sigma_rules()  # Load from rules/sigma/**/*.yml
    
    columns = df.columns
    hits = []
    
    for rule in rules:
        detection = rule.get("detection", {})
        condition_str = detection.get("condition", "")
        
        # Build Polars expressions from detection blocks
        final_expr = _parse_condition_string(condition_str, named_exprs)
        
        # Evaluate and count matches
        df_matched = df.filter(final_expr)
        match_count = df_matched.height

Supported Rule Features

Field Modifiers

  • contains / contains|all / contains|any
  • startswith / endswith
  • re (regex matching)
  • not (negation)

Logic Operators

  • and / or boolean conditions
  • all of them / 1 of them
  • Wildcard patterns (1 of selection_*)
  • Named detection blocks

Example Sigma Rule

# rules/sigma/credential_access/credential_dumping_lsass.yml
title: Credential Dumping via LSASS Access
status: stable
level: critical
custom:
  mitre_technique: T1003.001
  mitre_tactic: Credential Access
tags:
  - attack.credential_access
  - attack.t1003.001
detection:
  selection:
    EventID: 4656
    ObjectName|contains: 'lsass.exe'
    AccessMask:
      - '0x1410'
      - '0x1010'
      - '0x1438'
  condition: selection
Sigma Engine Translation:
# Polars expression generated by sigma_engine.py
final_expr = (
    (pl.col("EventID").cast(pl.Utf8) == "4656") &
    (pl.col("ObjectName").cast(pl.Utf8).str.contains("lsass.exe", literal=True)) &
    (pl.col("AccessMask").cast(pl.Utf8).is_in(["0x1410", "0x1010", "0x1438"]))
)
Zero Performance Penalty: Sigma rules compile to native Polars expressions using SIMD vectorization. Evaluation of 86 rules against 100K events takes < 2 seconds.

Temporal Correlation

Chronos supports timeframe-based correlation for detecting brute force and beaconing:
# Example: Brute force detection
title: SSH Brute Force Attack
level: high
custom:
  mitre_technique: T1110.001
aggregation:
  group_by: [SourceIP, TargetUser]
  time_window: "5m"
  threshold: 10
detection:
  selection:
    EventID: 4625  # Failed logon
    LogonType: 3   # Network
  condition: selection
Engine Implementation:
# From engine/sigma_engine.py:290-394
def _evaluate_temporal_correlation(
    df_matched: pl.DataFrame,
    detection: dict,
    rule: dict,
) -> Optional[int]:
    timeframe = detection.get("timeframe")
    aggregation = rule.get("aggregation")
    
    if aggregation:
        group_by_fields = aggregation.get("group_by", [])
        time_window = aggregation.get("time_window", "5m")
        threshold = aggregation.get("threshold", 5)
        
        # Group by time window + fields, count events per group
        windowed = (
            df_ts.sort("_sigma_ts")
            .group_by_dynamic("_sigma_ts", every=time_window, 
                              group_by=resolved_groups)
            .agg(pl.len().alias("_event_count"))
        )
        
        # Count groups that exceed threshold
        hot_groups = windowed.filter(pl.col("_event_count") >= threshold)
        return hot_groups["_event_count"].sum()
1

Parse Timeframe

Convert "5m", "1h", "60s" to Polars duration strings
2

Group Events

Use group_by_dynamic() with time window and group-by fields (SourceIP, User, etc.)
3

Apply Threshold

Filter groups where event count >= threshold (e.g., 10 failed logins in 5 minutes)

YARA Integration

YARA rules scan file contents and memory dumps for IOCs:
# From app.py:706-766 - YARA scan during forensic analysis
import yara

def _load_yara_rules():
    """Load all .yar files from rules/yara/ directory."""
    yara_dir = os.path.join(BASE_DIR, "rules", "yara")
    rule_files = {}
    
    for root, dirs, files in os.walk(yara_dir):
        for file in files:
            if file.endswith('.yar'):
                namespace = os.path.splitext(file)[0]
                rule_files[namespace] = os.path.join(root, file)
    
    return yara.compile(filepaths=rule_files)

# Scan during forensic report generation
yara_rules = _load_yara_rules()
with open(csv_path, "r", errors="replace") as yf:
    yara_text = yf.read(5 * 1024 * 1024)  # First 5MB

yara_matches = yara_rules.match(data=yara_text)
yara_hits = [{
    "rule": m.rule,
    "namespace": m.namespace,
    "tags": list(m.tags),
    "strings_matched": len(m.strings),
    "meta": {k: str(v) for k, v in (m.meta or {}).items()}
} for m in yara_matches]

YARA Rule Sets

7 detection rules:
  • lockbit.yar - LockBit 2.x/3.x: .lockbit extension, ransom note, PE imports
  • qilin_agenda.yar - QILIN/Agenda: ChaCha20/RSA-2048, ESXi targeting, rclone exfil
  • Generic ransomware patterns: file encryption loops, crypto APIs
rule LockBit_3_Ransomware {
    meta:
        description = "Detects LockBit 3.0 ransomware characteristics"
        mitre = "T1486"
    strings:
        $ext1 = ".lockbit" ascii
        $note = "Restore-My-Files.txt" ascii
        $crypt = { 8B 45 ?? 83 C0 01 89 45 ?? }  // Encryption loop
    condition:
        2 of them
}
YARA Scope: YARA scans the first 5MB of the ingested CSV file content. For memory dumps or large binaries, pre-process with dedicated YARA tools before ingestion.

MITRE ATT&CK Mapping

All detections are automatically mapped to MITRE techniques:
# From engine/sigma_engine.py:541-567
raw_tags = rule.get("tags", []) or []
tags = []
for t in raw_tags:
    t_lower = str(t).lower().strip()
    # Normalize "mitre.tXXXX" → "attack.tXXXX"
    if t_lower.startswith("mitre.t"):
        t_lower = "attack." + t_lower[6:]
    tags.append(t_lower)

hits.append({
    "title": rule.get("title", "Unknown Rule"),
    "level": rule.get("level", "unknown"),
    "mitre_technique": custom.get("mitre_technique", 
        next((t for t in tags if t.startswith("attack.t")), "")
    ),
    "mitre_tactic": custom.get("mitre_tactic",
        next((t for t in tags if t.startswith("attack.") 
              and not t.startswith("attack.t")), "")
    ),
})

Coverage

Sigma rules cover MITRE tactics:
  • TA0001 Initial Access
  • TA0002 Execution
  • TA0003 Persistence
  • TA0004 Privilege Escalation
  • TA0005 Defense Evasion
  • TA0006 Credential Access
  • TA0007 Discovery
  • TA0008 Lateral Movement
  • TA0009 Collection
  • TA0010 Exfiltration
  • TA0011 Command & Control
  • TA0040 Impact
Plus OWASP Top 10 web attack patterns

Risk Scoring

Detections feed into the Smart Risk Engine:
# From engine/forensic.py - calculate_smart_risk_m4()
def calculate_smart_risk_m4(df: pl.DataFrame, sigma_hits: list = None) -> dict:
    """Unified risk calculation from Sigma hits, EventIDs, and anomalies."""
    score = 0
    justifications = []
    
    # Sigma detections (weighted by level)
    if sigma_hits:
        critical = sum(1 for h in sigma_hits if h['level'] == 'critical')
        high = sum(1 for h in sigma_hits if h['level'] == 'high')
        medium = sum(1 for h in sigma_hits if h['level'] == 'medium')
        
        score += critical * 50  # Critical = 50 points each
        score += high * 30      # High = 30 points
        score += medium * 10    # Medium = 10 points
        
        if critical > 0:
            justifications.append(f"{critical} Critical Sigma detections")
    
    # High-risk EventIDs (4625=Failed logon, 4720=Account created)
    high_risk_events = [4625, 4720, 4728, 4732, 4756]
    event_col = next((c for c in df.columns if 'eventid' in c.lower()), None)
    if event_col:
        risk_count = df.filter(
            pl.col(event_col).cast(pl.Int32, strict=False).is_in(high_risk_events)
        ).height
        score += risk_count * 2
    
    # Determine risk level
    if score >= 100:
        level = "Critical"
    elif score >= 50:
        level = "High"
    elif score >= 20:
        level = "Medium"
    else:
        level = "Low"
    
    return {
        "level": level,
        "score": score,
        "justification": "; ".join(justifications)
    }
Risk Level Homologation: The same risk calculation is used in the dashboard, forensic modal, HTML report, and PDF export to ensure consistency (fixed in v177).

Detection Evidence

Sigma hits include sample evidence with forensic context:
# From engine/sigma_engine.py:402-537
FORENSIC_CONTEXT_COLUMNS = [
    "UserName", "User", "AccountName", "SubjectUserName",
    "ProcessName", "Image", "NewProcessName", "ParentImage",
    "SourceIP", "IpAddress", "SourceAddress", "ClientIP",
    "CommandLine", "ParentCommandLine",
    "Status", "Result", "LogonType",
    "DestinationHostname", "DestinationIp", "DestPort",
    "ServiceName", "TaskName", "ObjectName",
]

# Build sample evidence: first 150 rows with detection + context columns
evidence_cols = ["_id"]
if time_col:
    evidence_cols.append(time_col)
evidence_cols.extend(matched_columns)  # Columns referenced in detection

# Add forensic context columns (max 12 total)
for fc in FORENSIC_CONTEXT_COLUMNS:
    if len(evidence_cols) >= 12:
        break
    for c in df_matched.columns:
        if c.lower() == fc.lower() and c not in evidence_cols:
            evidence_cols.append(c)
            break

sample_df = df_matched.head(150).select(evidence_cols)
sample_evidence = sample_df.to_dicts()
Evidence structure returned:
{
  "title": "Credential Dumping via LSASS Access",
  "level": "critical",
  "mitre_technique": "T1003.001",
  "mitre_tactic": "Credential Access",
  "matched_rows": 12,
  "matched_columns": ["EventID", "ObjectName", "AccessMask"],
  "sample_evidence": [
    {
      "_id": 4523,
      "Time": "2026-03-08 14:23:17",
      "EventID": "4656",
      "ObjectName": "\\Device\\HarddiskVolume2\\Windows\\System32\\lsass.exe",
      "AccessMask": "0x1410",
      "SubjectUserName": "SYSTEM",
      "ProcessName": "mimikatz.exe"
    }
  ],
  "all_row_ids": [4523, 4524, 4530, ...],  // First 500 IDs for "View in Grid"
}
Interactive Drill-Down: Click a Sigma detection in the forensic modal to see the evidence table. Click “View all in Grid” to filter the main timeline to matching events.

Performance

Dataset SizeSigma Evaluation (86 rules)YARA Scan (5MB)Total Detection Time
10K events< 500ms< 100ms< 1 second
100K events1-2 seconds< 100ms< 3 seconds
1M events5-10 seconds< 100ms< 15 seconds
Parallel Execution: Sigma and YARA run in parallel using asyncio.gather() with 9 concurrent forensic analysis tasks during report generation.

Next Steps

Exports

Export detection results in CSV, XLSX, JSON, PDF, HTML

API Reference

Programmatic access to detection engine

Build docs developers (and LLMs) love