Skip to main content

Overview

The reduce transform collapses multiple log events into a single event based on grouping and merge strategies. It’s ideal for combining related log lines into transactions, assembling multi-line logs, or aggregating events over time windows. This transform is essential for:
  • Combining multi-line stack traces
  • Aggregating related log events into transactions
  • Grouping events by request ID, session ID, or correlation ID
  • Reducing log volume by combining repetitive events
  • Building event sequences before downstream processing
Key Features:
  • Flexible field-based grouping
  • Multiple merge strategies per field
  • Time-based expiration
  • Conditional flush on start/end markers
  • Maximum event limits per group
  • Task transform for stateful aggregation

Configuration

expire_after_ms
integer
default:"30000"
The maximum period of time to wait after the last event is received, in milliseconds, before a combined event should be considered complete.After this period elapses, the reduced event is flushed downstream.
expire_after_ms = 30000  # 30 seconds
flush_period_ms
integer
default:"1000"
The interval to check for and flush any expired events, in milliseconds.This determines how frequently the transform checks for groups that have exceeded expire_after_ms.
flush_period_ms = 1000  # Check every second
end_every_period_ms
integer
Optional. If supplied, every time this interval elapses for a given grouping, the reduced value for that grouping is flushed.Checked every flush_period_ms. Useful for periodic aggregation.
end_every_period_ms = 60000  # Flush every minute
max_events
integer
The maximum number of events to group together.When this limit is reached, the reduced event is immediately flushed.
max_events = 100
group_by
array
default:"[]"
An ordered list of fields by which to group events.Each group with matching values for the specified keys is reduced independently. When not specified, all events are reduced in a single group.
group_by = ["request_id", "user_id"]
merge_strategies
object
default:"{}"
A map of field names to custom merge strategies.For each field specified, the given strategy is used for combining events rather than the default behavior.Default behaviors:
  • Strings: First value is kept
  • Timestamps: First value is kept, [field]_end is added with last value
  • Numbers: Values are summed
Available strategies:
  • discard - Discard field value (remove from output)
  • retain - Keep only the first value
  • sum - Sum numeric values
  • max - Keep maximum numeric value
  • min - Keep minimum numeric value
  • array - Collect all values into an array
  • concat - Concatenate string values with space separator
  • concat_newline - Concatenate strings with newline separator
  • concat_raw - Concatenate strings with no separator
  • shortest_array - Keep the shortest array
  • longest_array - Keep the longest array
  • flat_unique - Flatten arrays/objects and keep unique values
[transforms.reduce.merge_strategies]
message = "concat_newline"
duration = "sum"
tags = "flat_unique"
starts_when
condition
A condition used to distinguish the first event of a transaction.If this condition resolves to true for an event, the previous transaction is flushed (without this event) and a new transaction is started.
starts_when = 'match(.message, r"^START")'
ends_when
condition
A condition used to distinguish the final event of a transaction.If this condition resolves to true for an event, the current transaction is immediately flushed with this event.
ends_when = 'match(.message, r"^END")'

Inputs

inputs
array
required
List of upstream component IDs.The reduce transform only accepts log events. Metrics and traces are ignored.
inputs = ["logs", "parsed_logs"]

Outputs

The reduce transform has a single output that emits reduced log events when:
  1. expire_after_ms time has passed since last event in group
  2. max_events limit is reached for a group
  3. ends_when condition matches
  4. starts_when condition matches (flushes previous group)
  5. end_every_period_ms interval elapses
  6. Vector shutdown (flushes all buffered groups)

Examples

Combine Multi-line Stack Traces

[transforms.combine_stacktraces]
type = "reduce"
inputs = ["application_logs"]
group_by = ["stream"]
expire_after_ms = 2000

# Detect start of new log entry
starts_when = 'match(.message, r"^\\d{4}-\\d{2}-\\d{2}")'

# Concatenate messages with newlines
[transforms.combine_stacktraces.merge_strategies]
message = "concat_newline"
Input:
2024-01-15 10:00:00 ERROR Exception occurred
  at com.example.Service.process(Service.java:42)
  at com.example.Controller.handle(Controller.java:123)
2024-01-15 10:00:01 INFO Request completed
Output:
2024-01-15 10:00:00 ERROR Exception occurred\n  at com.example.Service.process(Service.java:42)\n  at com.example.Controller.handle(Controller.java:123)
2024-01-15 10:00:01 INFO Request completed

Aggregate by Request ID

[transforms.aggregate_requests]
type = "reduce"
inputs = ["api_logs"]
group_by = ["request_id"]
expire_after_ms = 5000
max_events = 50

# End transaction when response is logged
ends_when = '.event == "response_sent"'

[transforms.aggregate_requests.merge_strategies]
duration = "sum"  # Total duration
status_code = "retain"  # Keep first status
errors = "array"  # Collect all errors
message = "concat"  # Combine messages

Session Aggregation

[transforms.aggregate_sessions]
type = "reduce"
inputs = ["user_events"]
group_by = ["session_id", "user_id"]
expire_after_ms = 300000  # 5 minutes

# Start new session on login
starts_when = '.event_type == "login"'

# End session on logout
ends_when = '.event_type == "logout"'

[transforms.aggregate_sessions.merge_strategies]
page_views = "sum"
pages = "array"  # All pages visited
events = "concat_newline"  # All event descriptions
duration = "sum"  # Total session time

Periodic Time-based Aggregation

[transforms.minute_aggregates]
type = "reduce"
inputs = ["metrics_as_logs"]
group_by = ["metric_name", "host"]
end_every_period_ms = 60000  # Flush every minute
flush_period_ms = 1000

[transforms.minute_aggregates.merge_strategies]
value = "sum"  # Sum all values in window
count = "sum"  # Count events
timestamp = "retain"  # Keep first timestamp

Combine Duplicate Events

[transforms.dedupe_and_count]
type = "reduce"
inputs = ["noisy_logs"]
group_by = ["message", "level", "source"]
expire_after_ms = 10000
max_events = 1000

[transforms.dedupe_and_count.merge_strategies]
occurrences = "sum"  # Count occurrences (if field exists)
timestamp = "retain"  # First occurrence
timestamp_last = "max"  # Last occurrence

Transaction Processing

[transforms.build_transactions]
type = "reduce"
inputs = ["transaction_logs"]
group_by = ["transaction_id"]
max_events = 100

# Start transaction
starts_when = '.action == "BEGIN"'

# End transaction
ends_when = '.action == "COMMIT" || .action == "ROLLBACK"'

[transforms.build_transactions.merge_strategies]
operations = "array"  # All operations in transaction
total_amount = "sum"  # Sum of all amounts
error_messages = "array"  # Collect any errors
status = "retain"  # Final status

Kubernetes Pod Logs

[transforms.combine_k8s_logs]
type = "reduce"
inputs = ["kubernetes_logs"]
group_by = ["kubernetes.pod_name", "kubernetes.container_name", "stream"]
expire_after_ms = 1000

# Detect start of structured logs
starts_when = 'match(.message, r"^\\{")'

[transforms.combine_k8s_logs.merge_strategies]
message = "concat_newline"

Database Query Logs

[transforms.query_aggregation]
type = "reduce"
inputs = ["db_logs"]
group_by = ["query_id", "connection_id"]
expire_after_ms = 30000

ends_when = '.stage == "complete"'

[transforms.query_aggregation.merge_strategies]
execution_time = "sum"  # Total execution time
rows_affected = "sum"  # Total rows
stages = "array"  # Query execution stages
plan = "retain"  # Keep query plan from first event

Merge Strategies Explained

discard

Removes the field from the output event entirely.
[merge_strategies]
temporary_field = "discard"

retain

Keeps only the first value, discards subsequent values.
[merge_strategies]
user_id = "retain"  # Keep first user_id

sum

Sums numeric values. Non-numeric values are ignored.
[merge_strategies]
bytes_transferred = "sum"
request_count = "sum"

max / min

Keeps the maximum or minimum numeric value.
[merge_strategies]
max_response_time = "max"
min_memory_usage = "min"

array

Collects all values into an array.
[merge_strategies]
error_codes = "array"
# Input:  [error_codes="404"], [error_codes="500"]
# Output: error_codes=["404", "500"]

concat / concat_newline / concat_raw

Concatenates string values with different separators:
  • concat: Space separator
  • concat_newline: Newline separator
  • concat_raw: No separator
[merge_strategies]
message = "concat_newline"
tags = "concat"  # "tag1 tag2 tag3"
raw_data = "concat_raw"

shortest_array / longest_array

Keeps the shortest or longest array value.
[merge_strategies]
error_paths = "shortest_array"  # Keep most specific
all_paths = "longest_array"  # Keep most complete

flat_unique

Flattens arrays and objects, collecting unique scalar values.
[merge_strategies]
tags = "flat_unique"
# Input:  [{tags=["a","b"]}, {tags=["b","c"]}, {tags="d"}]
# Output: tags=["a","b","c","d"]

Default Merge Behavior

When no merge strategy is specified:
Field TypeDefault Behavior
StringKeep first value (retain)
NumberSum all values
TimestampKeep first, add <field>_end with last
ArrayNo default (must specify strategy)
ObjectNo default (must specify strategy)

Grouping Behavior

Events are grouped by exact matches on all group_by fields:
group_by = ["request_id", "user_id"]

# These events are in different groups:
{request_id: "123", user_id: "alice"}
{request_id: "123", user_id: "bob"}    # Different user_id
{request_id: "456", user_id: "alice"}  # Different request_id
When group_by is empty, all events are reduced into a single group.

Flush Triggers

Reduced events are flushed when any of these conditions are met:

1. Time Expiration

expire_after_ms = 5000
# Flushes 5 seconds after the last event in the group

2. Event Count Limit

max_events = 100
# Flushes immediately when 100 events are in the group

3. End Condition

ends_when = '.status == "complete"'
# Flushes immediately when condition matches

4. Start Condition

starts_when = '.event_type == "start"'
# Flushes previous group, starts new group with this event

5. Periodic Flush

end_every_period_ms = 60000
# Flushes every 60 seconds regardless of other conditions

6. Shutdown

All buffered groups are flushed when Vector shuts down.

Performance Considerations

Memory Usage

Memory usage scales with:
  • Number of active groups (cardinality of group_by fields)
  • Number of events per group
  • Size of events
  • Merge strategies used (arrays use more memory)

Cardinality Management

High cardinality in group_by fields increases memory usage:
# Low cardinality - good
group_by = ["service_name", "environment"]

# High cardinality - risky
group_by = ["request_id", "timestamp", "unique_id"]
Mitigate with:
  • Shorter expire_after_ms
  • Lower max_events
  • More specific starts_when/ends_when conditions

Flush Period Tuning

flush_period_ms controls how often expiration checks run:
  • Lower values: More responsive, higher CPU usage
  • Higher values: Less responsive, lower CPU usage
Recommended: 1000ms (1 second) for most use cases

Common Patterns

Multi-line Exceptions (Java/Python)

type = "reduce"
group_by = ["stream", "source"]
expire_after_ms = 1000
starts_when = '!match(.message, r"^\\s+")'

[merge_strategies]
message = "concat_newline"

HTTP Request Lifecycle

type = "reduce"
group_by = ["request_id"]
starts_when = '.stage == "start"'
ends_when = '.stage == "end"'

[merge_strategies]
events = "array"
duration = "sum"
status = "retain"

Batch Processing

type = "reduce"
group_by = ["batch_id"]
max_events = 1000
expire_after_ms = 60000

[merge_strategies]
items = "array"
total_size = "sum"
errors = "sum"

Troubleshooting

Events Not Being Combined

  1. Check group_by fields exist and have matching values
  2. Verify flush conditions are not too aggressive
  3. Check if starts_when/ends_when are triggering prematurely
  4. Increase expire_after_ms or max_events

Memory Issues

  1. Reduce expire_after_ms to flush more frequently
  2. Lower max_events limit
  3. Reduce group_by cardinality
  4. Add more specific starts_when/ends_when conditions
  5. Use discard strategy for unnecessary fields

Missing Data

  1. Check if events are expiring too quickly
  2. Verify starts_when isn’t flushing prematurely
  3. Ensure group_by fields are consistent across related events

Metrics

The reduce transform doesn’t emit specific internal metrics, but standard component metrics are available:
  • component_received_events_total - Events received
  • component_sent_events_total - Reduced events emitted
Reduction ratio: 1 - (sent / received)

See Also

Build docs developers (and LLMs) love