UTMStack includes 30+ pre-built parsers, but you may need custom parsers for proprietary applications or unique log formats. This guide shows how to create and deploy custom parsers.
When to create custom parsers
Create a custom parser when:
- Your application uses a proprietary log format
- Pre-built parsers don’t extract all needed fields
- You need custom field mapping or enrichment
- Logs require special multiline handling
- You want to normalize vendor-specific formats
Before creating a custom parser, check if a pre-built parser exists in ~/workspace/source/filters/ directory.
Parser architecture
UTMStack uses Logstash for log parsing. Parsers are written in Logstash configuration syntax:
Parser structure
A Logstash parser consists of three sections:
input {
# Define input source (usually handled by UTMStack)
}
filter {
# Parse, transform, and enrich logs
grok { ... }
mutate { ... }
date { ... }
}
output {
# Send to OpenSearch (handled by UTMStack)
}
Creating a custom parser
Example: Parse custom application logs
Sample log format:
2024-03-03 12:34:56 INFO [UserService] User john.doe logged in from 192.168.1.100
2024-03-03 12:35:12 ERROR [PaymentService] Payment failed for order 12345: Insufficient funds
2024-03-03 12:36:45 WARN [EmailService] Email delivery delayed for [email protected]
Step 1: Create parser configuration
Create file /etc/utm/parsers/custom-app.conf:
filter {
# Only process logs with custom-app tag
if [log_type] == "custom-app" {
# Parse log with Grok pattern
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log_level} \[%{DATA:service}\] %{GREEDYDATA:log_message}"
}
}
# Parse timestamp
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
target => "@timestamp"
timezone => "UTC"
}
# Extract IP addresses from message
grok {
match => {
"log_message" => ".*from %{IP:source_ip}"
}
tag_on_failure => []
}
# Extract order IDs
grok {
match => {
"log_message" => ".*order %{NUMBER:order_id}"
}
tag_on_failure => []
}
# Normalize log level
mutate {
lowercase => [ "log_level" ]
}
# Map log level to severity
if [log_level] == "error" {
mutate {
add_field => { "severity" => "high" }
}
} else if [log_level] == "warn" {
mutate {
add_field => { "severity" => "medium" }
}
} else {
mutate {
add_field => { "severity" => "low" }
}
}
# Add custom tags
mutate {
add_tag => [ "custom-app", "parsed" ]
}
# Remove temporary fields
mutate {
remove_field => [ "timestamp" ]
}
}
}
Step 2: Test the parser
Test parser with sample logs:
# Create test input file
cat > /tmp/test-log.txt << EOF
2024-03-03 12:34:56 INFO [UserService] User john.doe logged in from 192.168.1.100
2024-03-03 12:35:12 ERROR [PaymentService] Payment failed for order 12345: Insufficient funds
EOF
# Test parser
sudo /usr/share/logstash/bin/logstash -f /etc/utm/parsers/custom-app.conf --path.data /tmp/logstash-test -t
# Test with input
cat /tmp/test-log.txt | sudo /usr/share/logstash/bin/logstash -f /etc/utm/parsers/custom-app.conf --path.data /tmp/logstash-test
Step 3: Deploy parser
Deploy parser to UTMStack:
# Copy parser to UTMStack parsers directory
sudo cp /etc/utm/parsers/custom-app.conf /etc/utmstack/parsers/
# Set permissions
sudo chown utm:utm /etc/utmstack/parsers/custom-app.conf
sudo chmod 644 /etc/utmstack/parsers/custom-app.conf
# Restart Logstash to load new parser
sudo systemctl restart utmstack-logstash
Configure your application or Filebeat to tag logs:
filebeat.inputs:
- type: log
paths:
- /var/log/custom-app/*.log
fields:
log_type: custom-app
fields_under_root: true
output.logstash:
hosts: ["utm-server.company.com:5044"]
Grok patterns
Grok is the primary tool for parsing unstructured logs. Common patterns:
Built-in patterns
%{NUMBER:field} # Match numbers: 123, 45.67
%{INT:field} # Match integers: 123, -456
%{IP:field} # Match IP addresses: 192.168.1.1
%{IPV6:field} # Match IPv6 addresses
%{HOSTNAME:field} # Match hostnames
%{USERNAME:field} # Match usernames
%{EMAILADDRESS:field} # Match email addresses
%{URI:field} # Match URIs
%{PATH:field} # Match file paths
%{TIMESTAMP_ISO8601:field} # Match ISO timestamps
%{LOGLEVEL:field} # Match log levels: INFO, ERROR, etc.
%{DATA:field} # Match any characters (non-greedy)
%{GREEDYDATA:field} # Match any characters (greedy)
Custom patterns
Define custom patterns in parser:
filter {
grok {
pattern_definitions => {
"ORDER_ID" => "ORD-\d{8}"
"SESSION_ID" => "[A-Za-z0-9]{32}"
"TRANSACTION_ID" => "TXN-[A-F0-9]{16}"
}
match => {
"message" => "Order %{ORDER_ID:order_id} processed"
}
}
}
Complex pattern example
Parse Apache access logs:
filter {
grok {
match => {
"message" => "%{IPORHOST:source_ip} - %{DATA:username} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:status_code} %{NUMBER:bytes_sent} \"%{DATA:referer}\" \"%{DATA:user_agent}\""
}
}
}
Advanced parsing techniques
Multiline logs
Parse multiline stack traces:
filter {
multiline {
pattern => "^\s"
what => "previous"
}
if [log_type] == "java-app" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:log_message}"
}
}
}
}
JSON logs
Parse JSON-formatted logs:
filter {
if [log_type] == "json-app" {
json {
source => "message"
target => "parsed"
}
# Promote fields to root level
mutate {
rename => {
"[parsed][timestamp]" => "@timestamp"
"[parsed][level]" => "log_level"
"[parsed][message]" => "log_message"
}
}
}
}
CSV logs
Parse CSV-formatted logs:
filter {
if [log_type] == "csv-app" {
csv {
separator => ","
columns => ["timestamp", "user", "action", "result", "ip_address"]
skip_header => true
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
}
}
}
XML logs
Parse XML-formatted logs:
filter {
if [log_type] == "xml-app" {
xml {
source => "message"
target => "parsed"
store_xml => false
xpath => [
"/event/timestamp/text()", "timestamp",
"/event/user/text()", "user",
"/event/action/text()", "action"
]
}
}
}
Field enrichment
Add context to parsed logs:
GeoIP enrichment
filter {
if [source_ip] {
geoip {
source => "source_ip"
target => "source_geo"
fields => ["city_name", "country_name", "location"]
}
}
}
DNS lookup
filter {
if [source_ip] {
dns {
reverse => [ "source_ip" ]
action => "replace"
nameserver => "8.8.8.8"
}
}
}
Translate fields
Map codes to descriptions:
filter {
translate {
field => "event_code"
destination => "event_description"
dictionary => {
"100" => "Login successful"
"101" => "Login failed"
"200" => "Password changed"
"300" => "Account locked"
}
}
}
Testing and debugging
Test Grok patterns
Use Grok Debugger:
# Install Ruby and jls-grok
sudo gem install jls-grok
# Test pattern
echo "2024-03-03 12:34:56 INFO Test" | grok "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
Or use online tool: https://grokdebugger.com
Enable debug output
filter {
# Add at end of filter
ruby {
code => 'logger.info("Parsed event: ", :event => event.to_hash)'
}
}
Check Logstash logs
# View Logstash logs
sudo tail -f /var/log/utm/logstash.log
# Check for parsing errors
sudo grep "_grokparsefailure" /var/log/utm/logstash.log
Best practices
Parser development tips:
- Start with simple patterns and iterate
- Test with diverse log samples
- Use specific patterns before generic ones
- Add descriptive field names
- Remove temporary fields to save storage
- Use conditional logic to handle variations
- Document custom patterns
- Monitor parsing success rate
- Use multiline codec at input, not filter
- Avoid expensive operations (DNS, external lookups)
Example parsers
Firewall logs parser
filter {
if [log_type] == "custom-firewall" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{HOSTNAME:firewall} %{WORD:action} %{WORD:protocol} %{IP:source_ip}:%{NUMBER:source_port} -> %{IP:destination_ip}:%{NUMBER:destination_port}"
}
}
date {
match => [ "timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ" ]
}
mutate {
lowercase => [ "action", "protocol" ]
convert => {
"source_port" => "integer"
"destination_port" => "integer"
}
}
if [destination_port] == 22 { mutate { add_tag => "ssh" } }
if [destination_port] == 3389 { mutate { add_tag => "rdp" } }
}
}
Authentication logs parser
filter {
if [log_type] == "custom-auth" {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} AUTH %{WORD:auth_result} user=%{USERNAME:user} from=%{IP:source_ip} method=%{WORD:auth_method}"
}
}
if [auth_result] == "FAILED" {
mutate {
add_field => { "event_type" => "authentication_failure" }
add_field => { "severity" => "medium" }
}
} else if [auth_result] == "SUCCESS" {
mutate {
add_field => { "event_type" => "authentication_success" }
add_field => { "severity" => "low" }
}
}
}
}
Next steps
Monitor parsers
Monitor parsing success rates
Alert rules
Create alerts on custom fields
Data sources
Configure data collection
Threat detection
Use parsed data for detection