Skip to main content
UTMStack includes 30+ pre-built parsers, but you may need custom parsers for proprietary applications or unique log formats. This guide shows how to create and deploy custom parsers.

When to create custom parsers

Create a custom parser when:
  • Your application uses a proprietary log format
  • Pre-built parsers don’t extract all needed fields
  • You need custom field mapping or enrichment
  • Logs require special multiline handling
  • You want to normalize vendor-specific formats
Before creating a custom parser, check if a pre-built parser exists in ~/workspace/source/filters/ directory.

Parser architecture

UTMStack uses Logstash for log parsing. Parsers are written in Logstash configuration syntax:

Parser structure

A Logstash parser consists of three sections:
input {
  # Define input source (usually handled by UTMStack)
}

filter {
  # Parse, transform, and enrich logs
  grok { ... }
  mutate { ... }
  date { ... }
}

output {
  # Send to OpenSearch (handled by UTMStack)
}

Creating a custom parser

Example: Parse custom application logs

Sample log format:
2024-03-03 12:34:56 INFO [UserService] User john.doe logged in from 192.168.1.100
2024-03-03 12:35:12 ERROR [PaymentService] Payment failed for order 12345: Insufficient funds
2024-03-03 12:36:45 WARN [EmailService] Email delivery delayed for [email protected]

Step 1: Create parser configuration

Create file /etc/utm/parsers/custom-app.conf:
custom-app.conf
filter {
  # Only process logs with custom-app tag
  if [log_type] == "custom-app" {
    
    # Parse log with Grok pattern
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log_level} \[%{DATA:service}\] %{GREEDYDATA:log_message}"
      }
    }
    
    # Parse timestamp
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
      target => "@timestamp"
      timezone => "UTC"
    }
    
    # Extract IP addresses from message
    grok {
      match => {
        "log_message" => ".*from %{IP:source_ip}"
      }
      tag_on_failure => []
    }
    
    # Extract order IDs
    grok {
      match => {
        "log_message" => ".*order %{NUMBER:order_id}"
      }
      tag_on_failure => []
    }
    
    # Normalize log level
    mutate {
      lowercase => [ "log_level" ]
    }
    
    # Map log level to severity
    if [log_level] == "error" {
      mutate {
        add_field => { "severity" => "high" }
      }
    } else if [log_level] == "warn" {
      mutate {
        add_field => { "severity" => "medium" }
      }
    } else {
      mutate {
        add_field => { "severity" => "low" }
      }
    }
    
    # Add custom tags
    mutate {
      add_tag => [ "custom-app", "parsed" ]
    }
    
    # Remove temporary fields
    mutate {
      remove_field => [ "timestamp" ]
    }
  }
}

Step 2: Test the parser

Test parser with sample logs:
# Create test input file
cat > /tmp/test-log.txt << EOF
2024-03-03 12:34:56 INFO [UserService] User john.doe logged in from 192.168.1.100
2024-03-03 12:35:12 ERROR [PaymentService] Payment failed for order 12345: Insufficient funds
EOF

# Test parser
sudo /usr/share/logstash/bin/logstash -f /etc/utm/parsers/custom-app.conf --path.data /tmp/logstash-test -t

# Test with input
cat /tmp/test-log.txt | sudo /usr/share/logstash/bin/logstash -f /etc/utm/parsers/custom-app.conf --path.data /tmp/logstash-test

Step 3: Deploy parser

Deploy parser to UTMStack:
# Copy parser to UTMStack parsers directory
sudo cp /etc/utm/parsers/custom-app.conf /etc/utmstack/parsers/

# Set permissions
sudo chown utm:utm /etc/utmstack/parsers/custom-app.conf
sudo chmod 644 /etc/utmstack/parsers/custom-app.conf

# Restart Logstash to load new parser
sudo systemctl restart utmstack-logstash

Step 4: Configure data source

Configure your application or Filebeat to tag logs:
filebeat.yml
filebeat.inputs:
  - type: log
    paths:
      - /var/log/custom-app/*.log
    fields:
      log_type: custom-app
    fields_under_root: true

output.logstash:
  hosts: ["utm-server.company.com:5044"]

Grok patterns

Grok is the primary tool for parsing unstructured logs. Common patterns:

Built-in patterns

%{NUMBER:field}           # Match numbers: 123, 45.67
%{INT:field}              # Match integers: 123, -456
%{IP:field}               # Match IP addresses: 192.168.1.1
%{IPV6:field}             # Match IPv6 addresses
%{HOSTNAME:field}         # Match hostnames
%{USERNAME:field}         # Match usernames
%{EMAILADDRESS:field}     # Match email addresses
%{URI:field}              # Match URIs
%{PATH:field}             # Match file paths
%{TIMESTAMP_ISO8601:field} # Match ISO timestamps
%{LOGLEVEL:field}         # Match log levels: INFO, ERROR, etc.
%{DATA:field}             # Match any characters (non-greedy)
%{GREEDYDATA:field}       # Match any characters (greedy)

Custom patterns

Define custom patterns in parser:
filter {
  grok {
    pattern_definitions => {
      "ORDER_ID" => "ORD-\d{8}"
      "SESSION_ID" => "[A-Za-z0-9]{32}"
      "TRANSACTION_ID" => "TXN-[A-F0-9]{16}"
    }
    match => {
      "message" => "Order %{ORDER_ID:order_id} processed"
    }
  }
}

Complex pattern example

Parse Apache access logs:
filter {
  grok {
    match => {
      "message" => "%{IPORHOST:source_ip} - %{DATA:username} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:status_code} %{NUMBER:bytes_sent} \"%{DATA:referer}\" \"%{DATA:user_agent}\""
    }
  }
}

Advanced parsing techniques

Multiline logs

Parse multiline stack traces:
filter {
  multiline {
    pattern => "^\s"
    what => "previous"
  }
  
  if [log_type] == "java-app" {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:log_message}"
      }
    }
  }
}

JSON logs

Parse JSON-formatted logs:
filter {
  if [log_type] == "json-app" {
    json {
      source => "message"
      target => "parsed"
    }
    
    # Promote fields to root level
    mutate {
      rename => {
        "[parsed][timestamp]" => "@timestamp"
        "[parsed][level]" => "log_level"
        "[parsed][message]" => "log_message"
      }
    }
  }
}

CSV logs

Parse CSV-formatted logs:
filter {
  if [log_type] == "csv-app" {
    csv {
      separator => ","
      columns => ["timestamp", "user", "action", "result", "ip_address"]
      skip_header => true
    }
    
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
    }
  }
}

XML logs

Parse XML-formatted logs:
filter {
  if [log_type] == "xml-app" {
    xml {
      source => "message"
      target => "parsed"
      store_xml => false
      xpath => [
        "/event/timestamp/text()", "timestamp",
        "/event/user/text()", "user",
        "/event/action/text()", "action"
      ]
    }
  }
}

Field enrichment

Add context to parsed logs:

GeoIP enrichment

filter {
  if [source_ip] {
    geoip {
      source => "source_ip"
      target => "source_geo"
      fields => ["city_name", "country_name", "location"]
    }
  }
}

DNS lookup

filter {
  if [source_ip] {
    dns {
      reverse => [ "source_ip" ]
      action => "replace"
      nameserver => "8.8.8.8"
    }
  }
}

Translate fields

Map codes to descriptions:
filter {
  translate {
    field => "event_code"
    destination => "event_description"
    dictionary => {
      "100" => "Login successful"
      "101" => "Login failed"
      "200" => "Password changed"
      "300" => "Account locked"
    }
  }
}

Testing and debugging

Test Grok patterns

Use Grok Debugger:
# Install Ruby and jls-grok
sudo gem install jls-grok

# Test pattern
echo "2024-03-03 12:34:56 INFO Test" | grok "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}"
Or use online tool: https://grokdebugger.com

Enable debug output

filter {
  # Add at end of filter
  ruby {
    code => 'logger.info("Parsed event: ", :event => event.to_hash)'
  }
}

Check Logstash logs

# View Logstash logs
sudo tail -f /var/log/utm/logstash.log

# Check for parsing errors
sudo grep "_grokparsefailure" /var/log/utm/logstash.log

Best practices

Parser development tips:
  • Start with simple patterns and iterate
  • Test with diverse log samples
  • Use specific patterns before generic ones
  • Add descriptive field names
  • Remove temporary fields to save storage
  • Use conditional logic to handle variations
  • Document custom patterns
  • Monitor parsing success rate
  • Use multiline codec at input, not filter
  • Avoid expensive operations (DNS, external lookups)

Example parsers

Firewall logs parser

filter {
  if [log_type] == "custom-firewall" {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{HOSTNAME:firewall} %{WORD:action} %{WORD:protocol} %{IP:source_ip}:%{NUMBER:source_port} -> %{IP:destination_ip}:%{NUMBER:destination_port}"
      }
    }
    
    date {
      match => [ "timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSZ" ]
    }
    
    mutate {
      lowercase => [ "action", "protocol" ]
      convert => {
        "source_port" => "integer"
        "destination_port" => "integer"
      }
    }
    
    if [destination_port] == 22 { mutate { add_tag => "ssh" } }
    if [destination_port] == 3389 { mutate { add_tag => "rdp" } }
  }
}

Authentication logs parser

filter {
  if [log_type] == "custom-auth" {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} AUTH %{WORD:auth_result} user=%{USERNAME:user} from=%{IP:source_ip} method=%{WORD:auth_method}"
      }
    }
    
    if [auth_result] == "FAILED" {
      mutate {
        add_field => { "event_type" => "authentication_failure" }
        add_field => { "severity" => "medium" }
      }
    } else if [auth_result] == "SUCCESS" {
      mutate {
        add_field => { "event_type" => "authentication_success" }
        add_field => { "severity" => "low" }
      }
    }
  }
}

Next steps

Monitor parsers

Monitor parsing success rates

Alert rules

Create alerts on custom fields

Data sources

Configure data collection

Threat detection

Use parsed data for detection

Build docs developers (and LLMs) love