Skip to main content

Overview

Snuba provides two consumer implementations for processing events from Kafka and writing to ClickHouse:
  • Python Consumer - Original implementation with full Python-based message processing
  • Rust Consumer - High-performance implementation with Rust-based message processing
Both consumers follow a similar architecture but with different performance characteristics. The Rust consumer is recommended for production workloads due to better throughput and resource efficiency.

Python Consumer

The Python-based consumer provides flexible event processing with full Python message transformation.
snuba consumer [OPTIONS]

Required Options

--storage
choice
required
The storage to target for event processing. Must be a writable storage.Common values:
  • errors - Error events
  • transactions - Transaction events
  • outcomes_raw - Outcome/billing events
  • search_issues - Issue occurrence events
  • replays - Session replay events

Kafka Configuration

--raw-events-topic
string
Topic to consume raw events from. If not specified, uses the default topic configured for the storage.
--consumer-group
string
default:"snuba-consumers"
Consumer group ID for consuming the raw events topic. Used for offset management and load balancing.
--bootstrap-server
string
Kafka bootstrap server(s) to use for consuming. Can be specified multiple times for multiple brokers.
--commit-log-topic
string
Topic where committed offsets are written, triggering post-processing tasks.
--replacements-topic
string
Topic to produce replacement messages for data updates/corrections.
--commit-log-bootstrap-server
string
Kafka bootstrap server(s) for the commit log topic. Defaults to main bootstrap servers.
--replacement-bootstrap-server
string
Kafka bootstrap server(s) for the replacements topic. Defaults to main bootstrap servers.

Batching Configuration

--max-batch-size
integer
default:"50000"
Maximum number of messages to batch in memory.Batching applies to three stages:
  1. Batching messages for processing (transforming into ClickHouse rows)
  2. Batching for the INSERT statement
  3. Batching of offset commits
Commits are additionally debounced to happen at most once per second.
--max-batch-time-ms
integer
default:"2000"
Maximum duration (in milliseconds) to buffer messages in memory before flushing.
--max-insert-batch-size
integer
Maximum number of messages to batch for ClickHouse inserts. Defaults to --max-batch-size if not specified.
--max-insert-batch-time-ms
integer
Maximum duration for batching ClickHouse inserts. Defaults to --max-batch-time-ms if not specified.

Consumer Behavior

--auto-offset-reset
choice
default:"earliest"
Kafka consumer auto offset reset behavior.
  • error - Raise an error if no offset is found
  • earliest - Start from the earliest offset
  • latest - Start from the latest offset
--no-strict-offset-reset
flag
Forces the Kafka consumer auto offset reset behavior, even when offsets already exist.
--queued-max-messages-kbytes
integer
default:"50000"
Maximum kilobytes per topic+partition in the local consumer queue. Controls memory usage.
--queued-min-messages
integer
default:"10000"
Minimum messages per topic+partition that librdkafka tries to maintain in the local queue.
--max-poll-interval-ms
integer
default:"30000"
Maximum time between polls before the consumer is considered dead and triggers a rebalance.

Processing Configuration

--processes
integer
Number of worker processes for parallel message processing. Increases throughput but also resource usage.
--input-block-size
integer
Size of input blocks for processing pipeline.
--output-block-size
integer
Size of output blocks for processing pipeline.
--enforce-schema
flag
Enforce schema validation on the raw events topic. Rejects messages that don’t match the expected schema.

Advanced Options

--slice-id
integer
The slice ID for sliced storages. Used when a storage is partitioned across multiple consumers.
--join-timeout
integer
default:"10"
Join timeout in seconds for graceful shutdown.
--profile-path
path
Directory path for writing profiling data. Must be an existing directory.
--health-check-file
string
Path to a file that Arroyo will touch at intervals to indicate health. Useful for orchestration systems like Kubernetes.
--group-instance-id
string
Kafka group instance ID. Passing a value enables static membership for more stable consumer group behavior.
--quantized-rebalance-consumer-group-delay-secs
integer
Time delay before a consumer starts rebalancing, used for coordinated rebalancing across pods during deployments.
--log-level
string
Logging level to use (critical, error, warning, info, debug).

Examples

# Start consuming errors with default settings
snuba consumer \
  --storage errors \
  --bootstrap-server kafka:9092

Rust Consumer

The Rust consumer is a high-performance alternative with better throughput and resource efficiency. It’s the recommended choice for production workloads.
snuba rust-consumer [OPTIONS]

Key Differences from Python Consumer

  • Performance - Significantly higher throughput (2-5x)
  • Resource Usage - Lower CPU and memory footprint
  • Concurrency - Better concurrent processing with configurable parallelism
  • Schema Enforcement - Built-in schema validation
  • Multiple Storages - Can target multiple storages in a single consumer

Required Options

--storage
choice
required
Storage(s) to target. Can be specified multiple times to process events for multiple storages.
--consumer-group
string
required
Consumer group ID for offset management.

Core Configuration

--auto-offset-reset
choice
default:"earliest"
Kafka consumer auto offset reset (error, earliest, latest).
--no-strict-offset-reset
flag
Force the auto offset reset behavior.
--log-level
choice
default:"info"
Logging level for Rust consumer.Available levels:
  • error - Only errors
  • warn - Warnings and errors
  • info - Informational messages
  • debug - Debug output
  • trace - Verbose trace logging

Performance Options

--concurrency
integer
default:"1"
Number of concurrent message processing tasks. Increases parallelism for message transformation.
--clickhouse-concurrency
integer
default:"2"
Number of concurrent ClickHouse batch operations. Only increase this with --async-inserts enabled.
--async-inserts
flag
Enable asynchronous inserts for ClickHouse. Improves throughput but requires proper ClickHouse configuration.
--use-rust-processor
flag
default:"true"
Use the Rust message processor (if available) instead of Python. Generally faster.
--python-max-queue-depth
integer
Maximum messages queued in the Python processor before backpressure. Defaults to number of processes.

Kafka Options

--bootstrap-server
string
Kafka bootstrap servers (can be specified multiple times).
--raw-events-topic
string
Topic to consume raw events from. Overrides the default for the storage.
--commit-log-topic
string
Topic for committed offsets.
--replacements-topic
string
Topic for replacement messages.
--commit-log-bootstrap-server
string
Bootstrap servers for commit log topic.
--replacement-bootstrap-server
string
Bootstrap servers for replacements topic.
--queued-max-messages-kbytes
integer
default:"50000"
Maximum kilobytes per partition in local queue.
--queued-min-messages
integer
default:"10000"
Minimum messages to maintain in local queue.
--max-poll-interval-ms
integer
default:"30000"
Maximum time between polls before rebalance.

Batching Configuration

--max-batch-size
integer
default:"50000"
Maximum messages to batch before writing to Kafka.
--max-batch-time-ms
integer
default:"2000"
Maximum time to buffer messages before writing.
--batch-write-timeout-ms
integer
Optional timeout for batch writer client connecting and sending to ClickHouse.

Reliability Options

--enforce-schema
flag
Enforce schema validation on raw events topic.
--max-dlq-buffer-length
integer
default:"25000"
Per-partition limit to the dead letter queue buffer length.
--health-check
choice
default:"arroyo"
Health check implementation to use.
  • arroyo - Default Arroyo health check
  • snuba - Snuba-specific health check
--health-check-file
string
File to touch periodically to indicate health.

Advanced Options

--group-instance-id
string
Kafka group instance ID for static membership.
--quantized-rebalance-consumer-group-delay-secs
integer
Delay for quantized rebalancing during deployments.
--join-timeout-ms
integer
default:"1000"
Milliseconds to wait for current batch to flush during rebalance.
--stop-at-timestamp
integer
Unix timestamp after which to stop processing messages. Useful for bounded processing.
--use-row-binary
flag
Use RowBinary format for ClickHouse inserts instead of JSONEachRow. Currently only supported for EAPItemsProcessor.

Examples

# Start consuming with Rust processor
snuba rust-consumer \
  --storage errors \
  --consumer-group errors_group \
  --bootstrap-server kafka:9092 \
  --use-rust-processor

Consumer Architecture

Both consumers follow a similar processing pipeline:
1

Consume from Kafka

Messages are read from Kafka topics in batches based on configured batch size and time limits.
2

Transform Messages

Raw messages are processed and transformed into ClickHouse row format. This step can use either Python or Rust processors.
3

Write to ClickHouse

Transformed rows are batched and inserted into ClickHouse storage using bulk INSERT statements.
4

Commit Offsets

After successful writes, consumer offsets are committed back to Kafka.
5

Post-Processing

Committed offsets can trigger downstream processing tasks via the commit log topic.

Performance Tuning

Batch Size Optimization

Larger batches improve throughput but increase latency and memory usage:
# Low latency (faster end-to-end, lower throughput)
--max-batch-size 10000 --max-batch-time-ms 1000

# Balanced
--max-batch-size 50000 --max-batch-time-ms 2000

# High throughput (higher latency, maximum throughput)
--max-batch-size 100000 --max-batch-time-ms 5000

Concurrency Settings

For Rust consumers, tune concurrency based on CPU cores and workload:
# 8-core machine
--concurrency 8 --clickhouse-concurrency 2

# 16-core machine with async inserts
--concurrency 16 --clickhouse-concurrency 4 --async-inserts

Memory Management

Control memory usage with queue size limits:
# Lower memory footprint
--queued-max-messages-kbytes 25000 --queued-min-messages 5000

# Higher throughput (more memory)
--queued-max-messages-kbytes 100000 --queued-min-messages 20000

Monitoring

Health Checks

Enable health check files for monitoring:
snuba rust-consumer \
  --storage errors \
  --consumer-group errors_group \
  --health-check-file /tmp/snuba-health

# Check health
test -f /tmp/snuba-health && echo "Healthy"

Metrics

Consumers emit metrics including:
  • Messages processed per second
  • Batch sizes and timing
  • ClickHouse insert latency
  • Consumer lag
  • Error rates

Logging

Use appropriate log levels:
# Production - minimal logging
--log-level info

# Debugging issues
--log-level debug

# Deep troubleshooting (Rust only)
--log-level trace

Choosing Between Python and Rust Consumers

  • Debugging message processing logic
  • Custom Python-based message processors
  • Simpler deployment requirements
  • Lower throughput workloads (under 10k msgs/sec)
  • Production deployments
  • High throughput requirements (over 10k msgs/sec)
  • Need for better resource efficiency
  • Built-in schema validation
  • Most use cases should prefer Rust consumer

Common Issues

Increase concurrency and batch sizes:
--concurrency 16 --max-batch-size 100000
Enable async inserts if available:
--async-inserts --clickhouse-concurrency 4
Reduce queue sizes:
--queued-max-messages-kbytes 25000 \
--queued-min-messages 5000 \
--max-batch-size 25000
Increase poll interval:
--max-poll-interval-ms 60000
Use static membership:
--group-instance-id $(hostname)
Check offset reset behavior:
--auto-offset-reset earliest --no-strict-offset-reset
Both consumers check ClickHouse connections on startup and will exit if unable to connect. Ensure ClickHouse is available before starting consumers.

Build docs developers (and LLMs) love