Skip to main content
Python rules provide flexibility and access to the full Python standard library, making them ideal for complex detection logic and data analysis.

Rule Structure

Every Python rule must follow this structure:
import dr_semu_utils

def check(report_directory):
    verdict = b"CLEAN"
    
    # Your detection logic here
    
    return verdict
Python rules must:
  • Return verdict as bytes (use b"CLEAN" not "CLEAN")
  • Accept report_directory as bytes
  • Handle file paths as bytes

Module Imports

Python rules run in a restricted environment. You must whitelist imports in py_imports.config:
import json
import os
import dr_semu_utils

# don't forget to add module names into py_imports.config file
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:1-6

dr_semu_utils API

The dr_semu_utils module provides helper functions for accessing analysis data.

get_starter_details()

Retrieves information about the initial process from starter.json.
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
Parameters:
  • report_directory - Path to the report directory (bytes)
Returns: Tuple of (image_path, pid, sha_256)
  • image_path - Full path to the analyzed executable
  • pid - Process ID of the starter process
  • sha_256 - SHA-256 hash of the sample
  • Returns (None, None, None) if starter.json doesn’t exist
Source: /run_detections/dr_rules_files/dr_semu_utils.py:4-12 Implementation:
def get_starter_details(report_directory):
    starter_json = report_directory + b"\\starter.json"
    if not os.path.exists(starter_json):
        return None, None, None
    with open(starter_json) as json_file:
        data = json.load(json_file)
        return data["image_path"], data["starter_pid"], data["sha_256"]
    return None, None, None

get_json_from_file()

Loads and parses a JSON file from the report directory.
data = dr_semu_utils.get_json_from_file(file_path)
Parameters:
  • file_path - Full path to the JSON file (bytes)
Returns: Parsed JSON data (dict or list), or (None, None, None) if file doesn’t exist Source: /run_detections/dr_rules_files/dr_semu_utils.py:14-20 Implementation:
def get_json_from_file(file_path):
    if not os.path.exists(file_path):
        return None, None, None
    with open(file_path) as json_file:
        data = json.load(json_file)
        return data

Loading Analysis Data

Typical pattern for loading static and dynamic analysis:
import dr_semu_utils

def check(report_directory):
    # Get starter details
    image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
    
    # Load static analysis
    static_info = dr_semu_utils.get_json_from_file(
        report_directory + b"\\" + sha_256.encode() + b".json"
    )
    
    # Load dynamic analysis
    dynamic_info = dr_semu_utils.get_json_from_file(
        report_directory + b"\\" + str(pid).encode() + b".json"
    )
    
    # Your detection logic
    verdict = b"CLEAN"
    return verdict
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:10-12
Note the use of .encode() to convert strings to bytes when building file paths.

Analyzing Dynamic Behavior

Dynamic behavior is a list of API call dictionaries:
for win_func in dynamic_info:
    if "NtCreateKey" in win_func:
        # Access API call details
        success = win_func["NtCreateKey"]["success"]
        key_path = win_func["NtCreateKey"]["before"]["key_path"]

Checking Specific API Calls

Each API call is a dictionary with the API name as key:
for win_func in dynamic_info:
    if "NtCreateUserProcess" in win_func:
        image_path = win_func["NtCreateUserProcess"]["before"]["image_path"]
        
        if win_func["NtCreateUserProcess"]["success"]:
            proc_id = win_func["NtCreateUserProcess"]["after"]["proc_id"]

Accessing Before/After States

API calls have before (input) and after (output) dictionaries:
if "NtCreateUserProcess" in win_func:
    # Input parameters
    before = win_func["NtCreateUserProcess"]["before"]
    image_path = before["image_path"]
    
    # Output values (if successful)
    if win_func["NtCreateUserProcess"]["success"]:
        after = win_func["NtCreateUserProcess"]["after"]
        proc_id = after["proc_id"]

String Matching

Python provides powerful string operations:
image_path = win_func["NtCreateUserProcess"]["before"]["image_path"]

# Case-insensitive comparison
if image_path.lower().endswith("drsemu_eicar.exe"):
    return b"Win32.EICAR.Dr"

# Substring search
if "malicious" in image_path.lower():
    return b"Malware.Detected"

# Regex matching
import re
if re.search(r"cmd\.exe.*\/c", image_path, re.IGNORECASE):
    return b"Suspicious.Command"
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:20-21

Complete Example

Here’s the EICAR detection rule from the source:
import json
import os

import dr_semu_utils

# don't forget to add module names into py_imports.config file

def check(report_directory):
    
    image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
    static_info = dr_semu_utils.get_json_from_file(report_directory + b"\\" + sha_256.encode() + b".json")
    dynamic_info = dr_semu_utils.get_json_from_file(report_directory + b"\\" + str(pid).encode() + b".json")

    # code here
    verdict = b"CLEAN"

    for win_func in dynamic_info:
        if "NtCreateUserProcess" in win_func:
            image_path = win_func["NtCreateUserProcess"]["before"]["image_path"]
            if image_path.lower().endswith("drsemu_eicar.exe"):
                return b"Win32.EICAR.Dr"

    return verdict


if __name__ == "__main__":
    pass
Source: /run_detections/dr_rules_files/dr_semu_eicar.py This rule detects samples that spawn the EICAR test file executable.

Analyzing Static Data

Access PE file metadata from static analysis:
static_info = dr_semu_utils.get_json_from_file(
    report_directory + b"\\" + sha_256.encode() + b".json"
)

if static_info:
    is_x86 = static_info["generic"]["is_x86"]
    is_dll = static_info["generic"]["is_dll"]
    
    # Check imports
    if "imports" in static_info:
        for imp in static_info["imports"]:
            # Analyze imported functions
            pass

Analyzing Child Processes

Track process creation and analyze child processes:
for win_func in dynamic_info:
    if "NtCreateUserProcess" in win_func:
        if win_func["NtCreateUserProcess"]["success"]:
            # Get child process ID
            child_pid = win_func["NtCreateUserProcess"]["after"]["proc_id"]
            
            # Load child process behavior
            child_json = dr_semu_utils.get_json_from_file(
                report_directory + b"\\" + str(child_pid).encode() + b".json"
            )
            
            # Analyze child process
            if child_json:
                for child_func in child_json:
                    # Process child API calls
                    pass

Common API Calls

Frequently monitored Windows API calls:

Process Operations

if "NtCreateUserProcess" in win_func:
    # Process creation
    pass

if "NtTerminateProcess" in win_func:
    # Process termination
    pass

Registry Operations

if "NtCreateKey" in win_func:
    # Registry key creation
    key_path = win_func["NtCreateKey"]["before"]["key_path"]

if "NtSetValueKey" in win_func:
    # Registry value modification
    value_name = win_func["NtSetValueKey"]["before"]["value_name"]

if "NtDeleteKey" in win_func:
    # Registry key deletion
    pass

File Operations

if "NtCreateFile" in win_func:
    # File creation/opening
    file_path = win_func["NtCreateFile"]["before"]["file_path"]

if "NtWriteFile" in win_func:
    # File writing
    pass

if "NtDeleteFile" in win_func:
    # File deletion
    pass

Network Operations

if "InternetOpenUrlA" in win_func:
    # HTTP/HTTPS connections
    url = win_func["InternetOpenUrlA"]["before"]["url"]

if "InternetConnectA" in win_func:
    # Network connections
    server = win_func["InternetConnectA"]["before"]["server"]

Error Handling

Always handle missing data gracefully:
def check(report_directory):
    verdict = b"CLEAN"
    
    # Check if starter details exist
    image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
    if image_path is None:
        return verdict
    
    # Check if dynamic info exists
    dynamic_info = dr_semu_utils.get_json_from_file(
        report_directory + b"\\" + str(pid).encode() + b".json"
    )
    if dynamic_info is None:
        return verdict
    
    # Process data
    for win_func in dynamic_info:
        # Your logic
        pass
    
    return verdict

Return Values

Rules must return bytes:
def check(report_directory):
    verdict = b"CLEAN"  # Default: no detection
    
    if suspicious_behavior:
        return b"Win32.Malware.DR"  # Detection verdict
    
    return verdict
Always return bytes, not strings:
  • Correct: return b"Win32.Malware.DR"
  • Wrong: return "Win32.Malware.DR"

Performance Tips

Validate data exists to avoid errors:
if dynamic_info is None:
    return b"CLEAN"

for win_func in dynamic_info:
    if "NtCreateKey" in win_func:
        # Safe to access
        pass
Return immediately when detection occurs:
for win_func in dynamic_info:
    if malicious_behavior:
        return b"Malware.Detected"  # Stop processing
Avoid redundant computations:
# Cache expensive operations
lower_path = image_path.lower()

if "cmd.exe" in lower_path or "powershell.exe" in lower_path:
    # Use cached value
    pass

Advanced Patterns

Multi-Behavior Detection

Detect multiple suspicious behaviors:
def check(report_directory):
    verdict = b"CLEAN"
    
    image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
    dynamic_info = dr_semu_utils.get_json_from_file(
        report_directory + b"\\" + str(pid).encode() + b".json"
    )
    
    if not dynamic_info:
        return verdict
    
    # Track suspicious behaviors
    spawned_cmd = False
    modified_registry = False
    deleted_files = False
    
    for win_func in dynamic_info:
        if "NtCreateUserProcess" in win_func:
            img = win_func["NtCreateUserProcess"]["before"]["image_path"]
            if "cmd.exe" in img.lower():
                spawned_cmd = True
        
        if "NtSetValueKey" in win_func:
            modified_registry = True
        
        if "NtDeleteFile" in win_func:
            deleted_files = True
    
    # Combine indicators
    if spawned_cmd and modified_registry and deleted_files:
        return b"Win32.Suspicious.MultiIndicator"
    
    return verdict

Regex Pattern Matching

import re

def check(report_directory):
    verdict = b"CLEAN"
    
    image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
    dynamic_info = dr_semu_utils.get_json_from_file(
        report_directory + b"\\" + str(pid).encode() + b".json"
    )
    
    if not dynamic_info:
        return verdict
    
    # Suspicious command patterns
    cmd_pattern = re.compile(r"cmd\.exe.*/c\s+(del|rmdir|reg\s+delete)", re.IGNORECASE)
    
    for win_func in dynamic_info:
        if "NtCreateUserProcess" in win_func:
            cmd_line = win_func["NtCreateUserProcess"]["before"].get("command_line", "")
            if cmd_pattern.search(cmd_line):
                return b"Win32.Suspicious.Command"
    
    return verdict

Next Steps

Rule Examples

See more real-world rule examples

Best Practices

Learn rule development best practices

Build docs developers (and LLMs) love