Skip to main content

Overview

The engine/forensic.py module is the heart of Chronos-DFIR’s forensic analysis capabilities. It provides:
  • Timeline Analysis — Adaptive time-bucketing for event clustering
  • Context Sanitization — EventID validation and forensic data cleaning
  • Hunting & Pattern Detection — TTP-based suspicious activity detection
  • MITRE ATT&CK Enrichment — Automatic tactic/technique mapping
  • WAF Threat Profiling — Attacker IP behavioral analysis
  • Identity & Process Analysis — User, host, and process correlation
All functions use Polars for vectorized operations. Zero pandas dependency.

Configuration

Time Hierarchy

The engine uses a priority-based approach to identify time columns:
TIME_HIERARCHY = [
    "EventTime", "ProcessLaunchTime", "FirstSeen",
    "LogReceivedTime", "LastSeen", "timestamp",
    "time", "date", "datetime", "utc"
]

EventID Hierarchy

For Windows event log analysis:
EVENT_ID_HIERARCHY = [
    "WinEventId", "EventId", "EventID",
    "Id", "ID", "EventName"
]

Core Functions

get_primary_time_column()

Standardized logic to select the best timestamp column from a dataset. Signature:
def get_primary_time_column(columns: List[str]) -> Optional[str]
Parameters:
columns
List[str]
List of column names from the DataFrame
Returns:
time_column
Optional[str]
Name of the best time column, or None if no suitable column found
Algorithm:
  1. Exact case-insensitive match against TIME_HIERARCHY
  2. Fallback to columns containing keywords: time, timestamp, date, datetime, seen, created
  3. Last resort: columns with timezone substring
Example:
from engine.forensic import get_primary_time_column
import polars as pl

df = pl.DataFrame({
    "EventTime": ["2024-01-01 12:00:00"],
    "Computer": ["DESKTOP-01"],
    "EventID": ["4624"]
})

time_col = get_primary_time_column(df.columns)
# Returns: "EventTime"

sanitize_context_data()

Applies forensic-grade sanitization to telemetry data. Ensures EventIDs are valid integers (1-65535) and removes artifacts. Signature:
def sanitize_context_data(lf: pl.LazyFrame) -> pl.LazyFrame
Parameters:
lf
pl.LazyFrame
Input forensic telemetry data
Returns:
sanitized_lf
pl.LazyFrame
LazyFrame with additional Validated_EventID column (Int64)
Sanitization Steps:
  1. Locate EventID column using EVENT_ID_HIERARCHY
  2. Cast to string, strip trailing .0
  3. Cast to Int64 (non-strict)
  4. Range validation: 0 < EventID < 65535
  5. Invalid values set to None
Example:
from engine.forensic import sanitize_context_data
import polars as pl

lf = pl.LazyFrame({
    "EventID": ["4624.0", "99999", "1102", "invalid"],
    "Computer": ["PC1", "PC2", "PC3", "PC4"]
})

sanitized = sanitize_context_data(lf)
df = sanitized.collect()

# df["Validated_EventID"] → [4624, None, 1102, None]

normalize_time_columns_in_df()

Converts all timestamp-like columns to standardized YYYY-MM-DD HH:MM:SS string format. Signature:
def normalize_time_columns_in_df(lf: pl.LazyFrame) -> pl.LazyFrame
Parameters:
lf
pl.LazyFrame
Input DataFrame with mixed timestamp formats
Returns:
normalized_lf
pl.LazyFrame
LazyFrame with all time columns normalized to %Y-%m-%d %H:%M:%S
Supported Formats:
  • Native Polars: pl.Datetime, pl.Date
  • Epoch seconds/milliseconds: Int64, Float64
  • ISO 8601: 2024-01-01T12:00:00.123Z
  • Custom formats: YYYY/MM/DD HH:MM:SS, DD/MM/YYYY HH:MM:SS
Example:
from engine.forensic import normalize_time_columns_in_df
import polars as pl

lf = pl.LazyFrame({
    "EventTime": ["2024-01-01T12:30:00Z", "1704112200"],  # ISO + Unix
    "Computer": ["PC1", "PC2"]
})

normalized = normalize_time_columns_in_df(lf)
df = normalized.collect()

# df["EventTime"] → ["2024-01-01 12:30:00", "2024-01-01 12:30:00"]

Sub-Analysis Functions

sub_analyze_timeline()

Generates timeline statistics with adaptive bucketing for histogram visualization. Signature:
def sub_analyze_timeline(df: pl.DataFrame) -> dict
Parameters:
df
pl.DataFrame
Input forensic dataset (collected, not lazy)
Returns:
stats
dict
Dictionary with keys:
  • type: "timeline"
  • peaks: List of {"hour": str, "count": int} (top 3 time buckets)
  • time_range: "YYYY-MM-DD HH:MM:SS to YYYY-MM-DD HH:MM:SS"
Adaptive Bucketing:
DurationBucket Size
< 3 hours5 minutes
3-6 hours15 minutes
6-12 hours30 minutes
12-48 hours1 hour
2-7 days6 hours
> 7 days1 day
Example:
from engine.forensic import sub_analyze_timeline
import polars as pl

df = pl.DataFrame({
    "EventTime": ["2024-01-01 10:00:00", "2024-01-01 10:05:00", "2024-01-01 14:00:00"],
    "EventID": ["4624", "4624", "1102"]
})

timeline_stats = sub_analyze_timeline(df)
# {
#   "type": "timeline",
#   "peaks": [
#       {"hour": "2024-01-01 10:00", "count": 2},
#       {"hour": "2024-01-01 14:00", "count": 1}
#   ],
#   "time_range": "2024-01-01 10:00:00 to 2024-01-01 14:00:00"
# }

sub_analyze_context()

Extracts forensic context: top EventIDs, tactics, IPs, users, hosts, processes, and commands. Signature:
def sub_analyze_context(df: pl.DataFrame) -> dict
Parameters:
df
pl.DataFrame
Input forensic dataset
Returns:
context
dict
Dictionary with keys:
  • type: "context"
  • event_ids: Top 10 EventIDs with labels (from SYSMON_EVENT_LABELS)
  • tactics: Top 10 forensic categories
  • threat_actors: Top 10 attacker IPs (WAF only)
  • ips, users, hosts, processes, commands, paths, violations: Top 8 of each
  • metadata: System info and action recommendations
Supported Columns: Example:
from engine.forensic import sub_analyze_context
import polars as pl

df = pl.DataFrame({
    "EventID": ["4624", "4624", "4625"],
    "User": ["admin", "admin", "guest"],
    "Computer": ["DC01", "DC01", "WKS02"]
})

context = sub_analyze_context(df)
# {
#   "type": "context",
#   "event_ids": [
#       {"id": "4624", "label": "Logon Success", "count": 2},
#       {"id": "4625", "label": "Logon Failure", "count": 1}
#   ],
#   "users": [{"id": "admin", "count": 2}, {"id": "guest", "count": 1}],
#   "hosts": [{"id": "DC01", "count": 2}, {"id": "WKS02", "count": 1}],
#   ...
# }

sub_analyze_hunting()

Detects suspicious patterns using TTP-based regex rules (LOLBins, persistence, credential access, etc.). Signature:
def sub_analyze_hunting(df: pl.DataFrame) -> dict
Parameters:
df
pl.DataFrame
Input forensic dataset
Returns:
hunting_stats
dict
Dictionary with keys:
  • type: "hunting"
  • patterns: Suspicious commands with timestamps, users, and tactic labels
  • network: Top network destinations
  • logons: Logon event summary
Detected TTPs: Example:
from engine.forensic import sub_analyze_hunting
import polars as pl

df = pl.DataFrame({
    "ProcessCmd": ["powershell.exe -enc <base64>", "whoami", "cmd.exe /c dir"],
    "EventTime": ["2024-01-01 10:00:00", "2024-01-01 10:05:00", "2024-01-01 10:10:00"],
    "ProcessUser": ["admin", "user1", "admin"]
})

hunting = sub_analyze_hunting(df)
# {
#   "type": "hunting",
#   "patterns": [
#       {
#           "timestamp": "2024-01-01 10:00:00",
#           "user": "admin",
#           "command": "[Execution/LOLBins] powershell.exe -enc <base64>",
#           "source_row_id": 1
#       },
#       {
#           "timestamp": "2024-01-01 10:05:00",
#           "user": "user1",
#           "command": "[Discovery] whoami",
#           "source_row_id": 2
#       }
#   ],
#   "network": [],
#   "logons": []
# }

sub_analyze_identity_and_procs()

Summarizes identity activity and process execution patterns. Signature:
def sub_analyze_identity_and_procs(df: pl.DataFrame) -> dict
Parameters:
df
pl.DataFrame
Input forensic dataset
Returns:
identity_stats
dict
Dictionary with keys:
  • type: "identity"
  • users: Top 5 users by event count
  • hosts: Top 5 hosts by event count
  • processes: Top 8 most common processes
  • rare_processes: Top 5 least common processes (anomaly detection)
  • rare_paths: Top 5 least common file paths
Filtering Logic: Excludes null, blank, pure numeric values (task IDs), and sentinel values:
_bad_vals = {"none", "null", "", "-", "n/a", "nan", "unknown", "undefined", "0", "system idle process"}
Example:
from engine.forensic import sub_analyze_identity_and_procs
import polars as pl

df = pl.DataFrame({
    "User": ["admin", "admin", "admin", "guest", "service"],
    "Computer": ["DC01", "DC01", "WKS01", "WKS01", "WKS02"],
    "ProcessName": ["powershell.exe", "cmd.exe", "cmd.exe", "explorer.exe", "svchost.exe"]
})

identity = sub_analyze_identity_and_procs(df)
# {
#   "type": "identity",
#   "users": [
#       {"name": "admin", "count": 3},
#       {"name": "guest", "count": 1},
#       {"name": "service", "count": 1}
#   ],
#   "hosts": [
#       {"name": "DC01", "count": 2},
#       {"name": "WKS01", "count": 2},
#       {"name": "WKS02", "count": 1}
#   ],
#   "processes": [
#       {"name": "cmd.exe", "count": 2},
#       {"name": "powershell.exe", "count": 1},
#       ...
#   ],
#   "rare_processes": [
#       {"name": "svchost.exe", "count": 1},
#       ...
#   ]
# }

MITRE ATT&CK Enrichment

enrich_with_mitre_attck()

Adds MITRE ATT&CK tactic/technique columns using fully vectorized Polars expressions. Signature:
def enrich_with_mitre_attck(df: pl.DataFrame, source_type: str = "auto") -> pl.DataFrame
Parameters:
df
pl.DataFrame
Input forensic dataset
source_type
str
default:"auto"
Data source type: "auto", "waf", or "evtx"
Returns:
enriched_df
pl.DataFrame
Original DataFrame with 3 new columns:
  • MITRE_Tactic: e.g., “Initial Access”, “Credential Access”
  • MITRE_ID: e.g., “T1190”, “T1110”
  • MITRE_Technique: Combined label, e.g., “T1190 — Initial Access”
WAF Mappings: EVTX Mappings: Example:
from engine.forensic import enrich_with_mitre_attck
import polars as pl

df = pl.DataFrame({
    "EventID": [4688, 4688, 7045],
    "CommandLine": ["powershell.exe -enc <base64>", "cmd.exe /c dir", ""],
    "ProcessName": ["powershell.exe", "cmd.exe", ""]
})

enriched = enrich_with_mitre_attck(df, source_type="evtx")
# New columns:
# MITRE_Tactic: ["Execution", "Unmapped", "Persistence"]
# MITRE_ID: ["T1059.001", "None", "T1543.003"]
# MITRE_Technique: ["T1059.001 — Execution", "Unmapped", "T1543.003 — Persistence"]

WAF Threat Profiling

generate_waf_threat_profiles()

Builds behavioral profiles for attacking IPs from WAF logs. Signature:
def generate_waf_threat_profiles(df: pl.DataFrame) -> list
Parameters:
df
pl.DataFrame
WAF log dataset with columns like ClientIP, RequestPath, ViolationCategory
Returns:
profiles
list[dict]
Top 10 attacker profiles, each containing:
  • ip: Attacker IP address
  • total: Total requests
  • first_seen: ISO timestamp
  • last_seen: ISO timestamp
  • dwell: Time span (e.g., “3m 45s”)
  • top_uris: Dict of {URI: count}
  • top_rules: Dict of {rule_name: count}
  • payload_samples: List of decoded attack payloads (up to 3)
  • mitre_id: Assigned MITRE technique ID
MITRE ID Assignment:
mitre_id = "T1190" if any(k in rule_str for k in ["sqli","xss","lfi","rce","inject"]) \
        else "T1110" if any(k in rule_str for k in ["login","brute","auth","cred"]) \
        else "T1595" if any(k in rule_str for k in ["scan","probe","recon"]) \
        else "T1498" if any(k in rule_str for k in ["dos","flood","ddos"]) \
        else "T1602" if any(k in rule_str for k in ["ssrf","metadata"]) \
        else "—"
Example:
from engine.forensic import generate_waf_threat_profiles
import polars as pl

df = pl.DataFrame({
    "ClientIP": ["1.2.3.4", "1.2.3.4", "5.6.7.8"],
    "RequestPath": ["/admin", "/login", "/api"],
    "ViolationCategory": ["SQLi", "SQLi", "XSS"],
    "Timestamp": ["2024-01-01 10:00:00", "2024-01-01 10:02:00", "2024-01-01 10:05:00"]
})

profiles = generate_waf_threat_profiles(df)
# [
#   {
#     "ip": "1.2.3.4",
#     "total": 2,
#     "first_seen": "2024-01-01 10:00:00",
#     "last_seen": "2024-01-01 10:02:00",
#     "dwell": "2m 0s",
#     "top_uris": {"/admin": 1, "/login": 1},
#     "top_rules": {"SQLi": 2},
#     "payload_samples": [],
#     "mitre_id": "T1190"
#   },
#   ...
# ]

EventID Labels

Sysmon Events

The engine includes a comprehensive mapping of Sysmon EventIDs to human-readable labels:

Windows Security Events

Full mapping available in SYSMON_EVENT_LABELS dict (lines 220-278 in forensic.py).

Best Practices

Forensic Integrity Rules:
  1. Never mutate original evidence — all transformations are non-destructive
  2. Use sanitize_context_data() early in analysis pipelines
  3. Always normalize timestamps with normalize_time_columns_in_df() before correlation
  4. Prefer LazyFrame for large datasets (streaming evaluation)

Performance Tips

Example Pipeline

import polars as pl
from engine.forensic import (
    sanitize_context_data,
    normalize_time_columns_in_df,
    sub_analyze_timeline,
    sub_analyze_context,
    sub_analyze_hunting,
    enrich_with_mitre_attck
)

# 1. Load data (streaming)
lf = pl.scan_csv("security.evtx.csv", ignore_errors=True, infer_schema_length=0)

# 2. Sanitize and normalize
lf = sanitize_context_data(lf)
lf = normalize_time_columns_in_df(lf)

# 3. Add MITRE context
df = lf.collect()
df = enrich_with_mitre_attck(df, source_type="evtx")

# 4. Analyze
timeline = sub_analyze_timeline(df)
context = sub_analyze_context(df)
hunting = sub_analyze_hunting(df)

print(timeline["peaks"])
print(context["event_ids"])
print(hunting["patterns"])

API Reference Summary

FunctionInputOutputPurpose
get_primary_time_column()List[str]Optional[str]Find best timestamp column
sanitize_context_data()pl.LazyFramepl.LazyFrameValidate EventIDs (1-65535)
normalize_time_columns_in_df()pl.LazyFramepl.LazyFrameStandardize timestamps
sub_analyze_timeline()pl.DataFramedictAdaptive time bucketing + peaks
sub_analyze_context()pl.DataFramedictTop N entities (IPs, users, hosts, etc.)
sub_analyze_hunting()pl.DataFramedictTTP-based pattern detection
sub_analyze_identity_and_procs()pl.DataFramedictUser/host/process summary + anomalies
enrich_with_mitre_attck()pl.DataFramepl.DataFrameAdd MITRE_Tactic, MITRE_ID columns
generate_waf_threat_profiles()pl.DataFramelist[dict]WAF attacker behavioral profiles

Build docs developers (and LLMs) love