Python rules provide flexibility and access to the full Python standard library, making them ideal for complex detection logic and data analysis.
Rule Structure
Every Python rule must follow this structure:
import dr_semu_utils
def check ( report_directory ):
verdict = b "CLEAN"
# Your detection logic here
return verdict
Python rules must:
Return verdict as bytes (use b"CLEAN" not "CLEAN")
Accept report_directory as bytes
Handle file paths as bytes
Module Imports
Python rules run in a restricted environment. You must whitelist imports in py_imports.config:
import json
import os
import dr_semu_utils
# don't forget to add module names into py_imports.config file
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:1-6
dr_semu_utils API
The dr_semu_utils module provides helper functions for accessing analysis data.
get_starter_details()
Retrieves information about the initial process from starter.json.
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
Parameters:
report_directory - Path to the report directory (bytes)
Returns: Tuple of (image_path, pid, sha_256)
image_path - Full path to the analyzed executable
pid - Process ID of the starter process
sha_256 - SHA-256 hash of the sample
Returns (None, None, None) if starter.json doesn’t exist
Source: /run_detections/dr_rules_files/dr_semu_utils.py:4-12
Implementation:
def get_starter_details ( report_directory ):
starter_json = report_directory + b " \\ starter.json"
if not os.path.exists(starter_json):
return None , None , None
with open (starter_json) as json_file:
data = json.load(json_file)
return data[ "image_path" ], data[ "starter_pid" ], data[ "sha_256" ]
return None , None , None
get_json_from_file()
Loads and parses a JSON file from the report directory.
data = dr_semu_utils.get_json_from_file(file_path)
Parameters:
file_path - Full path to the JSON file (bytes)
Returns: Parsed JSON data (dict or list), or (None, None, None) if file doesn’t exist
Source: /run_detections/dr_rules_files/dr_semu_utils.py:14-20
Implementation:
def get_json_from_file ( file_path ):
if not os.path.exists(file_path):
return None , None , None
with open (file_path) as json_file:
data = json.load(json_file)
return data
Loading Analysis Data
Typical pattern for loading static and dynamic analysis:
import dr_semu_utils
def check ( report_directory ):
# Get starter details
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
# Load static analysis
static_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + sha_256.encode() + b ".json"
)
# Load dynamic analysis
dynamic_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + str (pid).encode() + b ".json"
)
# Your detection logic
verdict = b "CLEAN"
return verdict
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:10-12
Note the use of .encode() to convert strings to bytes when building file paths.
Analyzing Dynamic Behavior
Dynamic behavior is a list of API call dictionaries:
for win_func in dynamic_info:
if "NtCreateKey" in win_func:
# Access API call details
success = win_func[ "NtCreateKey" ][ "success" ]
key_path = win_func[ "NtCreateKey" ][ "before" ][ "key_path" ]
Checking Specific API Calls
Each API call is a dictionary with the API name as key:
for win_func in dynamic_info:
if "NtCreateUserProcess" in win_func:
image_path = win_func[ "NtCreateUserProcess" ][ "before" ][ "image_path" ]
if win_func[ "NtCreateUserProcess" ][ "success" ]:
proc_id = win_func[ "NtCreateUserProcess" ][ "after" ][ "proc_id" ]
Accessing Before/After States
API calls have before (input) and after (output) dictionaries:
if "NtCreateUserProcess" in win_func:
# Input parameters
before = win_func[ "NtCreateUserProcess" ][ "before" ]
image_path = before[ "image_path" ]
# Output values (if successful)
if win_func[ "NtCreateUserProcess" ][ "success" ]:
after = win_func[ "NtCreateUserProcess" ][ "after" ]
proc_id = after[ "proc_id" ]
String Matching
Python provides powerful string operations:
image_path = win_func[ "NtCreateUserProcess" ][ "before" ][ "image_path" ]
# Case-insensitive comparison
if image_path.lower().endswith( "drsemu_eicar.exe" ):
return b "Win32.EICAR.Dr"
# Substring search
if "malicious" in image_path.lower():
return b "Malware.Detected"
# Regex matching
import re
if re.search( r "cmd \. exe . * \/ c" , image_path, re. IGNORECASE ):
return b "Suspicious.Command"
Source: /run_detections/dr_rules_files/dr_semu_eicar.py:20-21
Complete Example
Here’s the EICAR detection rule from the source:
import json
import os
import dr_semu_utils
# don't forget to add module names into py_imports.config file
def check ( report_directory ):
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
static_info = dr_semu_utils.get_json_from_file(report_directory + b " \\ " + sha_256.encode() + b ".json" )
dynamic_info = dr_semu_utils.get_json_from_file(report_directory + b " \\ " + str (pid).encode() + b ".json" )
# code here
verdict = b "CLEAN"
for win_func in dynamic_info:
if "NtCreateUserProcess" in win_func:
image_path = win_func[ "NtCreateUserProcess" ][ "before" ][ "image_path" ]
if image_path.lower().endswith( "drsemu_eicar.exe" ):
return b "Win32.EICAR.Dr"
return verdict
if __name__ == "__main__" :
pass
Source: /run_detections/dr_rules_files/dr_semu_eicar.py
This rule detects samples that spawn the EICAR test file executable.
Analyzing Static Data
Access PE file metadata from static analysis:
static_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + sha_256.encode() + b ".json"
)
if static_info:
is_x86 = static_info[ "generic" ][ "is_x86" ]
is_dll = static_info[ "generic" ][ "is_dll" ]
# Check imports
if "imports" in static_info:
for imp in static_info[ "imports" ]:
# Analyze imported functions
pass
Analyzing Child Processes
Track process creation and analyze child processes:
for win_func in dynamic_info:
if "NtCreateUserProcess" in win_func:
if win_func[ "NtCreateUserProcess" ][ "success" ]:
# Get child process ID
child_pid = win_func[ "NtCreateUserProcess" ][ "after" ][ "proc_id" ]
# Load child process behavior
child_json = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + str (child_pid).encode() + b ".json"
)
# Analyze child process
if child_json:
for child_func in child_json:
# Process child API calls
pass
Common API Calls
Frequently monitored Windows API calls:
Process Operations
if "NtCreateUserProcess" in win_func:
# Process creation
pass
if "NtTerminateProcess" in win_func:
# Process termination
pass
Registry Operations
if "NtCreateKey" in win_func:
# Registry key creation
key_path = win_func[ "NtCreateKey" ][ "before" ][ "key_path" ]
if "NtSetValueKey" in win_func:
# Registry value modification
value_name = win_func[ "NtSetValueKey" ][ "before" ][ "value_name" ]
if "NtDeleteKey" in win_func:
# Registry key deletion
pass
File Operations
if "NtCreateFile" in win_func:
# File creation/opening
file_path = win_func[ "NtCreateFile" ][ "before" ][ "file_path" ]
if "NtWriteFile" in win_func:
# File writing
pass
if "NtDeleteFile" in win_func:
# File deletion
pass
Network Operations
if "InternetOpenUrlA" in win_func:
# HTTP/HTTPS connections
url = win_func[ "InternetOpenUrlA" ][ "before" ][ "url" ]
if "InternetConnectA" in win_func:
# Network connections
server = win_func[ "InternetConnectA" ][ "before" ][ "server" ]
Error Handling
Always handle missing data gracefully:
def check ( report_directory ):
verdict = b "CLEAN"
# Check if starter details exist
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
if image_path is None :
return verdict
# Check if dynamic info exists
dynamic_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + str (pid).encode() + b ".json"
)
if dynamic_info is None :
return verdict
# Process data
for win_func in dynamic_info:
# Your logic
pass
return verdict
Return Values
Rules must return bytes:
def check ( report_directory ):
verdict = b "CLEAN" # Default: no detection
if suspicious_behavior:
return b "Win32.Malware.DR" # Detection verdict
return verdict
Always return bytes , not strings:
Correct: return b"Win32.Malware.DR"
Wrong: return "Win32.Malware.DR"
Check existence before processing
Validate data exists to avoid errors: if dynamic_info is None :
return b "CLEAN"
for win_func in dynamic_info:
if "NtCreateKey" in win_func:
# Safe to access
pass
Return immediately when detection occurs: for win_func in dynamic_info:
if malicious_behavior:
return b "Malware.Detected" # Stop processing
Cache repeated operations
Avoid redundant computations: # Cache expensive operations
lower_path = image_path.lower()
if "cmd.exe" in lower_path or "powershell.exe" in lower_path:
# Use cached value
pass
Advanced Patterns
Multi-Behavior Detection
Detect multiple suspicious behaviors:
def check ( report_directory ):
verdict = b "CLEAN"
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
dynamic_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + str (pid).encode() + b ".json"
)
if not dynamic_info:
return verdict
# Track suspicious behaviors
spawned_cmd = False
modified_registry = False
deleted_files = False
for win_func in dynamic_info:
if "NtCreateUserProcess" in win_func:
img = win_func[ "NtCreateUserProcess" ][ "before" ][ "image_path" ]
if "cmd.exe" in img.lower():
spawned_cmd = True
if "NtSetValueKey" in win_func:
modified_registry = True
if "NtDeleteFile" in win_func:
deleted_files = True
# Combine indicators
if spawned_cmd and modified_registry and deleted_files:
return b "Win32.Suspicious.MultiIndicator"
return verdict
Regex Pattern Matching
import re
def check ( report_directory ):
verdict = b "CLEAN"
image_path, pid, sha_256 = dr_semu_utils.get_starter_details(report_directory)
dynamic_info = dr_semu_utils.get_json_from_file(
report_directory + b " \\ " + str (pid).encode() + b ".json"
)
if not dynamic_info:
return verdict
# Suspicious command patterns
cmd_pattern = re.compile( r "cmd \. exe . * /c \s + ( del | rmdir | reg \s + delete ) " , re. IGNORECASE )
for win_func in dynamic_info:
if "NtCreateUserProcess" in win_func:
cmd_line = win_func[ "NtCreateUserProcess" ][ "before" ].get( "command_line" , "" )
if cmd_pattern.search(cmd_line):
return b "Win32.Suspicious.Command"
return verdict
Next Steps
Rule Examples See more real-world rule examples
Best Practices Learn rule development best practices