Overview
This page provides practical VRL examples demonstrating how to solve common observability data transformation challenges. Each example includes input data, VRL code, and expected output.Testing VRL Programs
You can test these examples using:- VRL REPL: Run
vector vrlfor an interactive environment - VRL Playground: Visit playground.vrl.dev
- Command line:
vector vrl --input input.json --program program.vrl --print-object
Parsing Examples
Parse JSON Logs
Use case: Extract structured data from JSON-encoded log messages Input:{
"message": "{\"level\":\"error\",\"user_id\":12345,\"action\":\"login\",\"ip\":\"192.168.1.1\"}"
}
# Parse JSON message into structured fields
. = parse_json!(.message)
# Add processing timestamp
.processed_at = now()
# Ensure fields have correct types
.user_id = to_int!(.user_id)
{
"level": "error",
"user_id": 12345,
"action": "login",
"ip": "192.168.1.1",
"processed_at": "2024-03-05T12:00:00Z"
}
Parse Syslog Messages
Input:{
"message": "<34>1 2024-03-05T10:00:00.000Z myhost myapp 1234 ID47 - User login successful"
}
# Parse syslog format
. |= parse_syslog!(.message)
# Extract severity name
.severity_name = .severity
# Keep original message
.original_message = .message
{
"appname": "myapp",
"facility": "auth",
"hostname": "myhost",
"message": "User login successful",
"msgid": "ID47",
"procid": 1234,
"severity": "crit",
"timestamp": "2024-03-05T10:00:00Z",
"version": 1,
"severity_name": "crit",
"original_message": "User login successful"
}
Parse Apache/Nginx Access Logs
Input:{
"message": "192.168.1.1 - alice [05/Mar/2024:10:00:00 +0000] \"GET /api/users HTTP/1.1\" 200 1234"
}
# Parse common log format
. |= parse_apache_log!(.message, format: "common")
# Parse timestamp
.timestamp = parse_timestamp!(.timestamp, format: "%d/%b/%Y:%H:%M:%S %z")
# Add derived fields
.is_success = .status >= 200 && .status < 300
.is_error = .status >= 400
# Categorize by status code
.status_category = if .status >= 500 {
"server_error"
} else if .status >= 400 {
"client_error"
} else if .status >= 300 {
"redirect"
} else if .status >= 200 {
"success"
} else {
"informational"
}
Parse Key-Value Logs (Logfmt)
Input:{
"message": "level=info msg=\"Request completed\" method=GET path=/api/users duration=45ms status=200"
}
# Parse key-value format
. = parse_key_value!(.message)
# Convert types
.status = to_int!(.status)
.duration_ms = to_float!(replace!(.duration, "ms", ""))
# Standardize field names
.log_level = del(.level)
.message = del(.msg)
.http_method = del(.method)
.http_path = del(.path)
Parse Custom Regex Patterns
Input:{
"message": "[2024-03-05 10:30:15] ERROR: Failed to connect to database (attempt 3/5)"
}
# Parse with regex and named captures
. |= parse_regex!(
.message,
r'^\[(?P<timestamp>[^\]]+)\] (?P<level>\w+): (?P<message>.*?)(?:\(attempt (?P<attempt>\d+)/(?P<max_attempts>\d+)\))?$'
)
# Parse timestamp
.timestamp = parse_timestamp!(.timestamp, format: "%Y-%m-%d %H:%M:%S")
# Convert numeric fields
.attempt = to_int(.attempt) ?? null
.max_attempts = to_int(.max_attempts) ?? null
# Normalize level
.level = downcase(.level)
Data Enrichment Examples
Add Timestamp and Metadata
VRL:# Add processing timestamp
.ingested_at = now()
# Add metadata
.metadata.pipeline = "production"
.metadata.version = "1.0.0"
.metadata.processor = "vector"
# Add unique ID
.event_id = uuid_v4()
GeoIP Enrichment
Input:{
"client_ip": "8.8.8.8",
"request": "/api/users"
}
# Enrich with GeoIP data (requires GeoIP enrichment table configured)
.geo, err = get_enrichment_table_record("geoip", {"ip": .client_ip})
if err == null {
.country = .geo.country_name
.city = .geo.city_name
.latitude = .geo.latitude
.longitude = .geo.longitude
.continent = .geo.continent_code
del(.geo)
} else {
.geo_lookup_failed = true
}
User Lookup Enrichment
VRL:# Look up user details from enrichment table
if exists(.user_id) {
.user, err = get_enrichment_table_record(
"users",
{"user_id": to_string(.user_id)}
)
if err == null {
.user_email = .user.email
.user_name = .user.name
.user_department = .user.department
del(.user)
}
}
Data Sanitization Examples
Redact Sensitive Information
VRL:# Redact credit card numbers
.message = replace!(
.message,
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
"[REDACTED-CC]"
)
# Redact email addresses
.message = replace!(
.message,
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"[REDACTED-EMAIL]"
)
# Redact Social Security Numbers
.message = replace!(
.message,
r'\b\d{3}-\d{2}-\d{4}\b',
"XXX-XX-XXXX"
)
# Redact IP addresses (optional)
if .redact_ips == true {
.message = replace!(
.message,
r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
"[REDACTED-IP]"
)
}
Remove Sensitive Fields
VRL:# Remove authentication tokens
del(.auth_token)
del(.api_key)
del(.password)
del(.secret)
# Remove PII fields
del(.ssn)
del(.credit_card)
del(.drivers_license)
# Remove internal fields
del(.internal_id)
del(.debug_info)
Hash Sensitive Data
VRL:# Hash email for privacy-preserving analytics
if exists(.email) {
.email_hash = encode_base16(sha2(.email, variant: "SHA-256"))
del(.email)
}
# Hash user ID
if exists(.user_id) {
.user_id_hash = encode_base16(sha2(to_string(.user_id), variant: "SHA-256"))
del(.user_id)
}
# Hash IP address
if exists(.ip) {
.ip_hash = encode_base16(sha2(.ip, variant: "SHA-256"))
del(.ip)
}
Data Transformation Examples
Normalize Field Names
Input:{
"UserName": "alice",
"user-email": "[email protected]",
"USER_ID": "12345"
}
# Normalize to snake_case
.user_name = del(.UserName)
.user_email = del(."user-email")
.user_id = del(.USER_ID)
# Convert types
.user_id = to_int!(.user_id)
Flatten Nested Objects
Input:{
"user": {
"profile": {
"name": "Alice",
"age": 30
},
"settings": {
"theme": "dark"
}
}
}
# Flatten nested structure with dot notation
.user_profile_name = .user.profile.name
.user_profile_age = .user.profile.age
.user_settings_theme = .user.settings.theme
# Remove original nested structure
del(.user)
Restructure Events
Input:{
"timestamp": "2024-03-05T10:00:00Z",
"level": "error",
"message": "Connection failed",
"source_ip": "192.168.1.1"
}
# Restructure into nested format
.event = {
"timestamp": del(.timestamp),
"severity": del(.level),
"content": del(.message)
}
.source = {
"ip": del(.source_ip)
}
# Add event type
.event.type = "connection_error"
Merge Multiple Fields
VRL:# Combine first and last name
.full_name = .first_name + " " + .last_name
# Create address string
.address = join!([
.street,
.city,
.state,
to_string(.zip_code)
], ", ")
# Combine with template string
user = .username
action = .action
.summary = "User {{ user }} performed action: {{ action }}"
Array and Object Manipulation
Filter Array Elements
Input:{
"scores": [45, 67, 89, 23, 91, 78]
}
# Filter passing scores (>= 60)
.passing_scores = filter(array!(.scores)) -> |_i, score| {
score >= 60
}
# Count of passing scores
.passing_count = length(.passing_scores)
# Calculate average of passing scores
if .passing_count > 0 {
.passing_average = reduce(.passing_scores, 0.0) -> |sum, _i, score| {
sum + to_float!(score)
} / .passing_count
}
Transform Array Values
Input:{
"tags": ["Production", "Web-Server", "US-EAST"]
}
# Normalize tags to lowercase with underscores
.normalized_tags = map_values(array!(.tags)) -> |tag| {
downcase(replace(string!(tag), "-", "_"))
}
# Remove duplicate tags
.unique_tags = unique(.normalized_tags)
# Sort tags alphabetically
.sorted_tags = sort(.unique_tags)
Transform Object Keys
Input:{
"metrics": {
"RequestCount": 1000,
"ErrorRate": 0.05,
"AvgLatency": 125
}
}
# Convert keys to snake_case
.metrics = map_keys(object!(.metrics)) -> |key| {
# Simple snake_case conversion
downcase(replace(key, r'([A-Z])', "_$1"))
}
# Trim leading underscores from keys
.metrics = map_keys(object!(.metrics)) -> |key| {
trim_start(key, "_")
}
Iterate Over Arrays
VRL:# Log each error in array
if exists(.errors) {
for_each(array!(.errors)) -> |index, error| {
log(
"Error " + to_string(index) + ": " + to_string(error),
level: "error"
)
}
.error_count = length(array!(.errors))
}
Conditional Logic Examples
Categorize by Field Values
VRL:# Categorize HTTP status codes
.status_category = if .status_code >= 500 {
"server_error"
} else if .status_code >= 400 {
"client_error"
} else if .status_code >= 300 {
"redirect"
} else if .status_code >= 200 {
"success"
} else {
"informational"
}
# Set severity level
.severity = if .status_category == "server_error" {
"critical"
} else if .status_category == "client_error" {
"warning"
} else {
"info"
}
Complex Conditional Processing
VRL:# Process based on multiple conditions
if exists(.user_id) && .authenticated == true {
# Authenticated user flow
.user_type = "authenticated"
if .subscription_tier == "premium" {
.rate_limit = 1000
.features = ["api", "advanced_analytics", "priority_support"]
} else {
.rate_limit = 100
.features = ["api", "basic_analytics"]
}
} else {
# Anonymous user flow
.user_type = "anonymous"
.rate_limit = 10
.features = ["api"]
}
Early Return Pattern
VRL:# Skip processing for test events
if .environment == "test" {
.processed = false
return .
}
# Skip already processed events
if .already_processed == true {
return .
}
# Continue with normal processing
.processed = true
.processed_at = now()
# ... more processing ...
Drop Events Conditionally
VRL:# Drop debug logs in production
if .level == "debug" && .environment == "production" {
abort
}
# Drop health check requests
if .path == "/health" || .path == "/healthz" {
abort
}
# Drop high-volume noisy logs
if contains(.message, "connection pool") && .level == "info" {
abort
}
Error Handling Examples
Graceful Parsing with Fallback
VRL:# Try parsing JSON, use raw message on failure
.parsed, err = parse_json(.message)
if err == null {
# Parsing succeeded - merge into root
. |= .parsed
del(.parsed)
.parse_status = "success"
} else {
# Parsing failed - keep raw message
.parse_status = "failed"
.parse_error = err
.raw_message = .message
}
Multiple Parsing Strategies
VRL:# Try multiple parsing strategies
parsed = null
# Try JSON first
parsed, err = parse_json(.message)
if err != null {
# Try key-value format
parsed, err = parse_key_value(.message)
}
if err != null {
# Try syslog format
parsed, err = parse_syslog(.message)
}
if err == null {
. |= parsed
.parser_used = "auto_detected"
} else {
.parser_used = "none"
.parse_error = "Unable to parse message"
}
Safe Type Coercion
VRL:# Safely convert to integer with fallback
.status_code = to_int(.status) ?? 0
# Safe timestamp parsing
.timestamp, err = parse_timestamp(.time, format: "%Y-%m-%d")
if err != null {
.timestamp = now()
.timestamp_source = "generated"
} else {
.timestamp_source = "parsed"
}
# Safe numeric operations
if is_string(.duration) {
.duration_ms, err = to_float(.duration)
if err != null {
.duration_ms = 0.0
}
}
Performance Optimization Examples
Conditional Expensive Operations
VRL:# Only parse large messages when needed
if .requires_parsing == true && strlen(.message) < 1000000 {
.parsed = parse_json(.message) ?? {}
}
# Cache frequently accessed values
user_id = .user.id # Access once, use multiple times
.user_hash = encode_base16(sha2(to_string(user_id), variant: "SHA-256"))
.user_category = if user_id > 10000 { "new" } else { "legacy" }
Early Filtering
VRL:# Drop unwanted events early
if .level == "debug" || .source == "health_check" {
abort
}
# Only continue for events that need processing
if !exists(.requires_transform) {
return .
}
# Expensive processing only happens for relevant events
.parsed = parse_json!(.message)
# ... more processing ...
Complete Real-World Examples
Web Access Log Processing
Input:{
"message": "192.168.1.100 - - [05/Mar/2024:10:30:45 +0000] \"GET /api/v1/users?page=1 HTTP/1.1\" 200 4523 \"https://example.com\" \"Mozilla/5.0\""
}
# Parse Apache common log format
. |= parse_apache_log!(.message, format: "combined")
# Parse timestamp
.timestamp = parse_timestamp!(.timestamp, format: "%d/%b/%Y:%H:%M:%S %z")
# Extract query parameters from path
path_parts = split(.path, "?", limit: 2)
.path_clean = path_parts[0]
if length(path_parts) > 1 {
.query_string = path_parts[1]
.query_params = parse_key_value(.query_string, key_value_delimiter: "=", field_delimiter: "&") ?? {}
}
# Categorize request
.is_api_request = starts_with(.path_clean, "/api/")
.http_method = upcase(.method)
# Status categorization
.is_success = .status >= 200 && .status < 300
.is_error = .status >= 400
# Add response time bucket
.size_bucket = if .size > 10000 {
"large"
} else if .size > 1000 {
"medium"
} else {
"small"
}
# Parse user agent
if exists(.agent) {
.is_mobile = contains(downcase(.agent), "mobile")
.is_bot = contains(downcase(.agent), "bot") || contains(downcase(.agent), "crawler")
}
# GeoIP enrichment
if exists(.client_ip) {
.geo, _ = get_enrichment_table_record("geoip", {"ip": .client_ip})
if exists(.geo) {
.country = .geo.country_code
.city = .geo.city_name
del(.geo)
}
}
# Remove raw message
del(.message)
Application Error Log Processing
Input:{
"message": "[2024-03-05T10:30:45.123Z] ERROR [UserService] Failed to authenticate user - invalid credentials (user_id: 12345, ip: 192.168.1.1, attempt: 3)"
}
# Parse structured log with regex
. |= parse_regex!(
.message,
r'^\[(?P<timestamp>[^\]]+)\] (?P<level>\w+) \[(?P<service>[^\]]+)\] (?P<message>.*?)(?:\((?P<context>.*)\))?$'
)
# Parse timestamp
.timestamp = parse_timestamp!(.timestamp, format: "%+")
# Normalize level
.level = downcase(.level)
# Parse context key-value pairs if present
if exists(.context) {
.context_parsed = parse_key_value(.context, key_value_delimiter: ":", field_delimiter: ",") ?? {}
# Extract specific context fields
if exists(.context_parsed.user_id) {
.user_id = to_int(.context_parsed.user_id) ?? null
}
if exists(.context_parsed.ip) {
.client_ip = trim(.context_parsed.ip)
}
if exists(.context_parsed.attempt) {
.attempt_number = to_int(.context_parsed.attempt) ?? null
}
del(.context)
del(.context_parsed)
}
# Categorize error
.error_category = if contains(.message, "authenticate") {
"authentication"
} else if contains(.message, "authorization") {
"authorization"
} else if contains(.message, "database") {
"database"
} else if contains(.message, "network") {
"network"
} else {
"general"
}
# Set severity based on error category
.severity = if .error_category == "database" || .error_category == "network" {
"critical"
} else {
"error"
}
# Add alert flag for critical errors
.should_alert = .severity == "critical" || (.error_category == "authentication" && .attempt_number >= 3)
# Add processing metadata
.processed_at = now()
.pipeline_version = "1.0.0"
Metrics Data Transformation
Input:{
"metric_name": "http_requests_total",
"value": "1543",
"timestamp": "1709636400",
"tags": "method=GET,status=200,path=/api/users"
}
# Parse tags into structured format
.tags_parsed = parse_key_value!(.tags, key_value_delimiter: "=", field_delimiter: ",")
# Convert value and timestamp to correct types
.value = to_float!(.value)
.timestamp = from_unix_timestamp!(to_int!(.timestamp))
# Extract tags to top level
.method = .tags_parsed.method
.status = to_int!(.tags_parsed.status)
.path = .tags_parsed.path
# Add derived dimensions
.status_class = to_string(.status)[0:0] # "2", "4", "5"
.endpoint = replace(.path, r'/\d+', "/:id") # Normalize IDs
# Categorize request type
.request_type = if starts_with(.path, "/api/") {
"api"
} else if starts_with(.path, "/static/") {
"static"
} else {
"page"
}
# Calculate rate (value per second over 1 min interval)
.rate_per_second = .value / 60.0
# Clean up
del(.tags)
del(.tags_parsed)
# Rename for clarity
.metric = del(.metric_name)
.count = del(.value)
Learn More
- VRL Overview - Introduction to VRL
- VRL Functions - Built-in function reference
- VRL Expressions - Language syntax and operators
- VRL Playground - Test VRL online