JSON Formatter for ELK
Configure go_logs to output logs in a format compatible with ELK stack:package main
import (
"os"
"github.com/drossan/go_logs"
)
func main() {
// Create logger with JSON formatter for ELK
logger, _ := go_logs.New(
go_logs.WithLevel(go_logs.InfoLevel),
go_logs.WithFormatter(go_logs.NewJSONFormatter()),
go_logs.WithRotatingFile("/var/log/app.log", 100, 5),
)
defer logger.Sync()
// Add service metadata that will appear in all logs
serviceLogger := logger.With(
go_logs.String("service", "payment-api"),
go_logs.String("version", "1.2.3"),
go_logs.String("environment", "production"),
go_logs.String("datacenter", "us-east-1"),
)
serviceLogger.Info("Payment processed",
go_logs.String("transaction_id", "tx-12345"),
go_logs.Float64("amount", 99.99),
go_logs.String("currency", "USD"),
)
}
{"timestamp":"2026-03-03T10:30:00Z","level":"INFO","message":"Payment processed","fields":{"service":"payment-api","version":"1.2.3","environment":"production","datacenter":"us-east-1","transaction_id":"tx-12345","amount":99.99,"currency":"USD"}}
Filebeat Configuration
Configure Filebeat to ship logs to Elasticsearch:# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app.log
json.keys_under_root: true
json.add_error_key: true
fields:
log_type: application
fields_under_root: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "app-logs-%{+yyyy.MM.dd}"
setup.template.name: "app-logs"
setup.template.pattern: "app-logs-*"
setup.ilm.enabled: false
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
Logstash Pipeline
Process logs through Logstash before Elasticsearch:# logstash.conf
input {
file {
path => "/var/log/app.log"
codec => json
type => "application"
}
}
filter {
# Parse JSON logs from go_logs
json {
source => "message"
target => "parsed"
}
# Extract timestamp
date {
match => [ "[parsed][timestamp]", "ISO8601" ]
target => "@timestamp"
}
# Add GeoIP for IP fields
if [parsed][fields][client_ip] {
geoip {
source => "[parsed][fields][client_ip]"
target => "geoip"
}
}
# Parse error stack traces
if [parsed][level] == "ERROR" or [parsed][level] == "FATAL" {
mutate {
add_tag => [ "error" ]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
# Also output to stdout for debugging
stdout {
codec => rubydebug
}
}
Enhanced Logger for ELK
Add ELK-specific fields to logs:package main
import (
"context"
"os"
"runtime"
"github.com/drossan/go_logs"
)
type ELKLogger struct {
logger go_logs.Logger
}
func NewELKLogger(serviceName, version string) (*ELKLogger, error) {
logger, err := go_logs.New(
go_logs.WithLevel(go_logs.InfoLevel),
go_logs.WithFormatter(go_logs.NewJSONFormatter()),
go_logs.WithRotatingFile("/var/log/app.log", 100, 5),
go_logs.WithCaller(true), // Include file and line number
)
if err != nil {
return nil, err
}
// Add standard fields for ELK
hostname, _ := os.Hostname()
enrichedLogger := logger.With(
go_logs.String("service.name", serviceName),
go_logs.String("service.version", version),
go_logs.String("host.name", hostname),
go_logs.String("host.architecture", runtime.GOARCH),
go_logs.String("host.os", runtime.GOOS),
)
return &ELKLogger{logger: enrichedLogger}, nil
}
func (l *ELKLogger) LogHTTPRequest(ctx context.Context, method, path string, statusCode int, duration float64) {
l.logger.LogCtx(ctx, go_logs.InfoLevel, "HTTP request",
go_logs.String("http.request.method", method),
go_logs.String("http.request.path", path),
go_logs.Int("http.response.status_code", statusCode),
go_logs.Float64("http.response.duration_ms", duration),
)
}
func (l *ELKLogger) LogError(ctx context.Context, err error, component string) {
l.logger.LogCtx(ctx, go_logs.ErrorLevel, "Error occurred",
go_logs.Err(err),
go_logs.String("error.component", component),
go_logs.String("error.type", getErrorType(err)),
)
}
func getErrorType(err error) string {
if err == nil {
return "unknown"
}
return fmt.Sprintf("%T", err)
}
Elasticsearch Index Template
Create an index template for optimal searching:{
"index_patterns": ["app-logs-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fields": {
"properties": {
"service": {
"type": "keyword"
},
"version": {
"type": "keyword"
},
"environment": {
"type": "keyword"
},
"trace_id": {
"type": "keyword"
},
"span_id": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
},
"transaction_id": {
"type": "keyword"
},
"error": {
"type": "text"
}
}
},
"caller": {
"type": "keyword"
},
"caller_func": {
"type": "keyword"
}
}
}
}
curl -X PUT "http://elasticsearch:9200/_index_template/app-logs" \
-H 'Content-Type: application/json' \
-d @template.json
Kibana Dashboard Setup
Create Kibana visualizations for your logs:1. Create Index Pattern
# In Kibana UI:
# Management > Stack Management > Index Patterns > Create
# Index pattern: app-logs-*
# Time field: @timestamp
2. Useful Kibana Queries
Filter by service:fields.service: "payment-api"
level: "ERROR" OR level: "FATAL"
fields.trace_id: "trace-abc-123"
fields.duration_ms > 1000
message: "payment failed" AND fields.service: "payment-api"
Complete ELK Integration Example
package main
import (
"context"
"net/http"
"time"
"github.com/drossan/go_logs"
)
func main() {
// Create ELK-optimized logger
elkLogger, _ := NewELKLogger("payment-api", "1.2.3")
// Start HTTP server with logging
http.HandleFunc("/api/payment", func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Create trace context
traceID := r.Header.Get("X-Trace-ID")
if traceID == "" {
traceID = generateTraceID()
}
ctx := go_logs.WithTraceID(r.Context(), traceID)
// Process payment
statusCode, err := processPayment(ctx, elkLogger)
if err != nil {
elkLogger.LogError(ctx, err, "payment-processor")
}
// Log request
duration := time.Since(start).Milliseconds()
elkLogger.LogHTTPRequest(ctx, r.Method, r.URL.Path, statusCode, float64(duration))
w.WriteHeader(statusCode)
})
http.ListenAndServe(":8080", nil)
}
func processPayment(ctx context.Context, logger *ELKLogger) (int, error) {
// Payment processing logic
return http.StatusOK, nil
}
Docker Compose for ELK Stack
Deploy complete ELK stack with your application:# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- /var/log:/var/log:ro
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
app:
build: .
volumes:
- /var/log:/var/log
ports:
- "8080:8080"
depends_on:
- elasticsearch
- logstash
volumes:
elasticsearch-data:
docker-compose up -d
# Access Kibana at http://localhost:5601
# Access Elasticsearch at http://localhost:9200
Log Retention and ILM
Manage log retention with Elasticsearch Index Lifecycle Management:{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "1d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
curl -X PUT "http://elasticsearch:9200/_ilm/policy/app-logs-policy" \
-H 'Content-Type: application/json' \
-d @ilm-policy.json
Alerting in Kibana
Create alerts for critical errors:{
"name": "High Error Rate Alert",
"tags": ["errors", "production"],
"rule_type_id": ".es-query",
"params": {
"index": ["app-logs-*"],
"timeField": "@timestamp",
"esQuery": {
"query": {
"bool": {
"filter": [
{
"terms": {
"level": ["ERROR", "FATAL"]
}
},
{
"term": {
"fields.environment": "production"
}
}
]
}
}
},
"threshold": [100],
"thresholdComparator": ">",
"timeWindowSize": 5,
"timeWindowUnit": "m"
},
"actions": [
{
"group": "default",
"id": "slack-connector",
"params": {
"message": "High error rate detected: {{context.hits}} errors in 5 minutes"
}
}
]
}
Next Steps
OpenTelemetry
Modern observability with OpenTelemetry
Production Setup
Complete production deployment guide