Overview
Snuba provides two consumer implementations for processing events from Kafka and writing to ClickHouse:- Python Consumer - Original implementation with full Python-based message processing
- Rust Consumer - High-performance implementation with Rust-based message processing
Python Consumer
The Python-based consumer provides flexible event processing with full Python message transformation.Required Options
The storage to target for event processing. Must be a writable storage.Common values:
errors- Error eventstransactions- Transaction eventsoutcomes_raw- Outcome/billing eventssearch_issues- Issue occurrence eventsreplays- Session replay events
Kafka Configuration
Topic to consume raw events from. If not specified, uses the default topic configured for the storage.
Consumer group ID for consuming the raw events topic. Used for offset management and load balancing.
Kafka bootstrap server(s) to use for consuming. Can be specified multiple times for multiple brokers.
Topic where committed offsets are written, triggering post-processing tasks.
Topic to produce replacement messages for data updates/corrections.
Kafka bootstrap server(s) for the commit log topic. Defaults to main bootstrap servers.
Kafka bootstrap server(s) for the replacements topic. Defaults to main bootstrap servers.
Batching Configuration
Maximum number of messages to batch in memory.Batching applies to three stages:
- Batching messages for processing (transforming into ClickHouse rows)
- Batching for the INSERT statement
- Batching of offset commits
Maximum duration (in milliseconds) to buffer messages in memory before flushing.
Maximum number of messages to batch for ClickHouse inserts. Defaults to
--max-batch-size if not specified.Maximum duration for batching ClickHouse inserts. Defaults to
--max-batch-time-ms if not specified.Consumer Behavior
Kafka consumer auto offset reset behavior.
error- Raise an error if no offset is foundearliest- Start from the earliest offsetlatest- Start from the latest offset
Forces the Kafka consumer auto offset reset behavior, even when offsets already exist.
Maximum kilobytes per topic+partition in the local consumer queue. Controls memory usage.
Minimum messages per topic+partition that librdkafka tries to maintain in the local queue.
Maximum time between polls before the consumer is considered dead and triggers a rebalance.
Processing Configuration
Number of worker processes for parallel message processing. Increases throughput but also resource usage.
Size of input blocks for processing pipeline.
Size of output blocks for processing pipeline.
Enforce schema validation on the raw events topic. Rejects messages that don’t match the expected schema.
Advanced Options
The slice ID for sliced storages. Used when a storage is partitioned across multiple consumers.
Join timeout in seconds for graceful shutdown.
Directory path for writing profiling data. Must be an existing directory.
Path to a file that Arroyo will touch at intervals to indicate health. Useful for orchestration systems like Kubernetes.
Kafka group instance ID. Passing a value enables static membership for more stable consumer group behavior.
Time delay before a consumer starts rebalancing, used for coordinated rebalancing across pods during deployments.
Logging level to use (critical, error, warning, info, debug).
Examples
Rust Consumer
The Rust consumer is a high-performance alternative with better throughput and resource efficiency. It’s the recommended choice for production workloads.Key Differences from Python Consumer
- Performance - Significantly higher throughput (2-5x)
- Resource Usage - Lower CPU and memory footprint
- Concurrency - Better concurrent processing with configurable parallelism
- Schema Enforcement - Built-in schema validation
- Multiple Storages - Can target multiple storages in a single consumer
Required Options
Storage(s) to target. Can be specified multiple times to process events for multiple storages.
Consumer group ID for offset management.
Core Configuration
Kafka consumer auto offset reset (error, earliest, latest).
Force the auto offset reset behavior.
Logging level for Rust consumer.Available levels:
error- Only errorswarn- Warnings and errorsinfo- Informational messagesdebug- Debug outputtrace- Verbose trace logging
Performance Options
Number of concurrent message processing tasks. Increases parallelism for message transformation.
Number of concurrent ClickHouse batch operations. Only increase this with
--async-inserts enabled.Enable asynchronous inserts for ClickHouse. Improves throughput but requires proper ClickHouse configuration.
Use the Rust message processor (if available) instead of Python. Generally faster.
Maximum messages queued in the Python processor before backpressure. Defaults to number of processes.
Kafka Options
Kafka bootstrap servers (can be specified multiple times).
Topic to consume raw events from. Overrides the default for the storage.
Topic for committed offsets.
Topic for replacement messages.
Bootstrap servers for commit log topic.
Bootstrap servers for replacements topic.
Maximum kilobytes per partition in local queue.
Minimum messages to maintain in local queue.
Maximum time between polls before rebalance.
Batching Configuration
Maximum messages to batch before writing to Kafka.
Maximum time to buffer messages before writing.
Optional timeout for batch writer client connecting and sending to ClickHouse.
Reliability Options
Enforce schema validation on raw events topic.
Per-partition limit to the dead letter queue buffer length.
Health check implementation to use.
arroyo- Default Arroyo health checksnuba- Snuba-specific health check
File to touch periodically to indicate health.
Advanced Options
Kafka group instance ID for static membership.
Delay for quantized rebalancing during deployments.
Milliseconds to wait for current batch to flush during rebalance.
Unix timestamp after which to stop processing messages. Useful for bounded processing.
Use RowBinary format for ClickHouse inserts instead of JSONEachRow. Currently only supported for EAPItemsProcessor.
Examples
Consumer Architecture
Both consumers follow a similar processing pipeline:Consume from Kafka
Messages are read from Kafka topics in batches based on configured batch size and time limits.
Transform Messages
Raw messages are processed and transformed into ClickHouse row format. This step can use either Python or Rust processors.
Write to ClickHouse
Transformed rows are batched and inserted into ClickHouse storage using bulk INSERT statements.
Performance Tuning
Batch Size Optimization
Larger batches improve throughput but increase latency and memory usage:Concurrency Settings
For Rust consumers, tune concurrency based on CPU cores and workload:Memory Management
Control memory usage with queue size limits:Monitoring
Health Checks
Enable health check files for monitoring:Metrics
Consumers emit metrics including:- Messages processed per second
- Batch sizes and timing
- ClickHouse insert latency
- Consumer lag
- Error rates
Logging
Use appropriate log levels:Choosing Between Python and Rust Consumers
When to use Python Consumer
When to use Python Consumer
- Debugging message processing logic
- Custom Python-based message processors
- Simpler deployment requirements
- Lower throughput workloads (under 10k msgs/sec)
When to use Rust Consumer
When to use Rust Consumer
- Production deployments
- High throughput requirements (over 10k msgs/sec)
- Need for better resource efficiency
- Built-in schema validation
- Most use cases should prefer Rust consumer
Common Issues
Consumer Lag Growing
Consumer Lag Growing
Increase concurrency and batch sizes:Enable async inserts if available:
High Memory Usage
High Memory Usage
Reduce queue sizes:
Rebalancing Too Frequently
Rebalancing Too Frequently
Increase poll interval:Use static membership:
Messages Being Skipped
Messages Being Skipped
Check offset reset behavior:
Both consumers check ClickHouse connections on startup and will exit if unable to connect. Ensure ClickHouse is available before starting consumers.