Skip to main content

JSON Formatter for ELK

Configure go_logs to output logs in a format compatible with ELK stack:
package main

import (
    "os"
    
    "github.com/drossan/go_logs"
)

func main() {
    // Create logger with JSON formatter for ELK
    logger, _ := go_logs.New(
        go_logs.WithLevel(go_logs.InfoLevel),
        go_logs.WithFormatter(go_logs.NewJSONFormatter()),
        go_logs.WithRotatingFile("/var/log/app.log", 100, 5),
    )
    defer logger.Sync()
    
    // Add service metadata that will appear in all logs
    serviceLogger := logger.With(
        go_logs.String("service", "payment-api"),
        go_logs.String("version", "1.2.3"),
        go_logs.String("environment", "production"),
        go_logs.String("datacenter", "us-east-1"),
    )
    
    serviceLogger.Info("Payment processed",
        go_logs.String("transaction_id", "tx-12345"),
        go_logs.Float64("amount", 99.99),
        go_logs.String("currency", "USD"),
    )
}
JSON Output:
{"timestamp":"2026-03-03T10:30:00Z","level":"INFO","message":"Payment processed","fields":{"service":"payment-api","version":"1.2.3","environment":"production","datacenter":"us-east-1","transaction_id":"tx-12345","amount":99.99,"currency":"USD"}}

Filebeat Configuration

Configure Filebeat to ship logs to Elasticsearch:
# filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/app.log
    json.keys_under_root: true
    json.add_error_key: true
    fields:
      log_type: application
    fields_under_root: true

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "app-logs-%{+yyyy.MM.dd}"
  
setup.template.name: "app-logs"
setup.template.pattern: "app-logs-*"
setup.ilm.enabled: false

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

Logstash Pipeline

Process logs through Logstash before Elasticsearch:
# logstash.conf
input {
  file {
    path => "/var/log/app.log"
    codec => json
    type => "application"
  }
}

filter {
  # Parse JSON logs from go_logs
  json {
    source => "message"
    target => "parsed"
  }
  
  # Extract timestamp
  date {
    match => [ "[parsed][timestamp]", "ISO8601" ]
    target => "@timestamp"
  }
  
  # Add GeoIP for IP fields
  if [parsed][fields][client_ip] {
    geoip {
      source => "[parsed][fields][client_ip]"
      target => "geoip"
    }
  }
  
  # Parse error stack traces
  if [parsed][level] == "ERROR" or [parsed][level] == "FATAL" {
    mutate {
      add_tag => [ "error" ]
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
  }
  
  # Also output to stdout for debugging
  stdout {
    codec => rubydebug
  }
}

Enhanced Logger for ELK

Add ELK-specific fields to logs:
package main

import (
    "context"
    "os"
    "runtime"
    
    "github.com/drossan/go_logs"
)

type ELKLogger struct {
    logger go_logs.Logger
}

func NewELKLogger(serviceName, version string) (*ELKLogger, error) {
    logger, err := go_logs.New(
        go_logs.WithLevel(go_logs.InfoLevel),
        go_logs.WithFormatter(go_logs.NewJSONFormatter()),
        go_logs.WithRotatingFile("/var/log/app.log", 100, 5),
        go_logs.WithCaller(true),  // Include file and line number
    )
    if err != nil {
        return nil, err
    }
    
    // Add standard fields for ELK
    hostname, _ := os.Hostname()
    
    enrichedLogger := logger.With(
        go_logs.String("service.name", serviceName),
        go_logs.String("service.version", version),
        go_logs.String("host.name", hostname),
        go_logs.String("host.architecture", runtime.GOARCH),
        go_logs.String("host.os", runtime.GOOS),
    )
    
    return &ELKLogger{logger: enrichedLogger}, nil
}

func (l *ELKLogger) LogHTTPRequest(ctx context.Context, method, path string, statusCode int, duration float64) {
    l.logger.LogCtx(ctx, go_logs.InfoLevel, "HTTP request",
        go_logs.String("http.request.method", method),
        go_logs.String("http.request.path", path),
        go_logs.Int("http.response.status_code", statusCode),
        go_logs.Float64("http.response.duration_ms", duration),
    )
}

func (l *ELKLogger) LogError(ctx context.Context, err error, component string) {
    l.logger.LogCtx(ctx, go_logs.ErrorLevel, "Error occurred",
        go_logs.Err(err),
        go_logs.String("error.component", component),
        go_logs.String("error.type", getErrorType(err)),
    )
}

func getErrorType(err error) string {
    if err == nil {
        return "unknown"
    }
    return fmt.Sprintf("%T", err)
}

Elasticsearch Index Template

Create an index template for optimal searching:
{
  "index_patterns": ["app-logs-*"],
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "timestamp": {
        "type": "date"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "fields": {
        "properties": {
          "service": {
            "type": "keyword"
          },
          "version": {
            "type": "keyword"
          },
          "environment": {
            "type": "keyword"
          },
          "trace_id": {
            "type": "keyword"
          },
          "span_id": {
            "type": "keyword"
          },
          "user_id": {
            "type": "keyword"
          },
          "transaction_id": {
            "type": "keyword"
          },
          "error": {
            "type": "text"
          }
        }
      },
      "caller": {
        "type": "keyword"
      },
      "caller_func": {
        "type": "keyword"
      }
    }
  }
}
Apply template:
curl -X PUT "http://elasticsearch:9200/_index_template/app-logs" \
  -H 'Content-Type: application/json' \
  -d @template.json

Kibana Dashboard Setup

Create Kibana visualizations for your logs:

1. Create Index Pattern

# In Kibana UI:
# Management > Stack Management > Index Patterns > Create
# Index pattern: app-logs-*
# Time field: @timestamp

2. Useful Kibana Queries

Filter by service:
fields.service: "payment-api"
Find errors:
level: "ERROR" OR level: "FATAL"
Trace requests:
fields.trace_id: "trace-abc-123"
Slow requests:
fields.duration_ms > 1000
Failed payments:
message: "payment failed" AND fields.service: "payment-api"

Complete ELK Integration Example

package main

import (
    "context"
    "net/http"
    "time"
    
    "github.com/drossan/go_logs"
)

func main() {
    // Create ELK-optimized logger
    elkLogger, _ := NewELKLogger("payment-api", "1.2.3")
    
    // Start HTTP server with logging
    http.HandleFunc("/api/payment", func(w http.ResponseWriter, r *http.Request) {
        start := time.Now()
        
        // Create trace context
        traceID := r.Header.Get("X-Trace-ID")
        if traceID == "" {
            traceID = generateTraceID()
        }
        ctx := go_logs.WithTraceID(r.Context(), traceID)
        
        // Process payment
        statusCode, err := processPayment(ctx, elkLogger)
        
        if err != nil {
            elkLogger.LogError(ctx, err, "payment-processor")
        }
        
        // Log request
        duration := time.Since(start).Milliseconds()
        elkLogger.LogHTTPRequest(ctx, r.Method, r.URL.Path, statusCode, float64(duration))
        
        w.WriteHeader(statusCode)
    })
    
    http.ListenAndServe(":8080", nil)
}

func processPayment(ctx context.Context, logger *ELKLogger) (int, error) {
    // Payment processing logic
    return http.StatusOK, nil
}

Docker Compose for ELK Stack

Deploy complete ELK stack with your application:
# docker-compose.yml
version: '3.8'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
  
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - /var/log:/var/log:ro
    depends_on:
      - elasticsearch
  
  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch
  
  app:
    build: .
    volumes:
      - /var/log:/var/log
    ports:
      - "8080:8080"
    depends_on:
      - elasticsearch
      - logstash

volumes:
  elasticsearch-data:
Start stack:
docker-compose up -d

# Access Kibana at http://localhost:5601
# Access Elasticsearch at http://localhost:9200

Log Retention and ILM

Manage log retention with Elasticsearch Index Lifecycle Management:
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}
Apply ILM policy:
curl -X PUT "http://elasticsearch:9200/_ilm/policy/app-logs-policy" \
  -H 'Content-Type: application/json' \
  -d @ilm-policy.json

Alerting in Kibana

Create alerts for critical errors:
{
  "name": "High Error Rate Alert",
  "tags": ["errors", "production"],
  "rule_type_id": ".es-query",
  "params": {
    "index": ["app-logs-*"],
    "timeField": "@timestamp",
    "esQuery": {
      "query": {
        "bool": {
          "filter": [
            {
              "terms": {
                "level": ["ERROR", "FATAL"]
              }
            },
            {
              "term": {
                "fields.environment": "production"
              }
            }
          ]
        }
      }
    },
    "threshold": [100],
    "thresholdComparator": ">",
    "timeWindowSize": 5,
    "timeWindowUnit": "m"
  },
  "actions": [
    {
      "group": "default",
      "id": "slack-connector",
      "params": {
        "message": "High error rate detected: {{context.hits}} errors in 5 minutes"
      }
    }
  ]
}

Next Steps

OpenTelemetry

Modern observability with OpenTelemetry

Production Setup

Complete production deployment guide

Build docs developers (and LLMs) love