Kafka-based data ingestion with consumers and message processors
Snuba ingests data through a Kafka-based pipeline that processes messages in batches and writes them to ClickHouse. This architecture enables high throughput while maintaining data quality and consistency guarantees.
Message processors transform Kafka messages into ClickHouse rows:
# From snuba/processor.pyclass MessageProcessor(ABC): """Processes a message from Kafka into ClickHouse rows""" @abstractmethod def process_message( self, message: Mapping[str, Any], metadata: KafkaMessageMetadata, ) -> Union[InsertBatch, ReplacementBatch, None]: """ Convert a message from Kafka into rows to insert into ClickHouse Returns: InsertBatch: Rows to insert ReplacementBatch: Mutation operations None: Skip this message """ raise NotImplementedError
Processed messages are batched before writing to ClickHouse:
# From snuba/consumers/consumer.pyclass InsertBatchWriter: """Accumulates messages and writes batch to ClickHouse""" def __init__(self, writer: BatchWriter[JSONRow], metrics: MetricsBackend): self.__writer = writer self.__messages: List[Message[BytesInsertBatch]] = [] def submit(self, message: Message[BytesInsertBatch]) -> None: """Add message to batch""" self.__messages.append(message) def close(self) -> None: """Write entire batch to ClickHouse""" if not self.__messages: return # Flatten all rows from all messages all_rows = itertools.chain.from_iterable( message.payload.rows for message in self.__messages ) # Single write to ClickHouse self.__writer.write(all_rows) # Record metrics self.__metrics.increment( "batch_write_msgs", sum(len(msg.payload.rows) for msg in self.__messages) )
Topics must be partitioned by project_id to ensure events from the same project are processed in order. This is critical for replacements and subscriptions.
Example partitioning logic:
# In upstream producerpartition_key = str(message["project_id"])producer.produce(topic, value=message, key=partition_key)
try: result = processor.process_message(decoded, metadata)except Exception as err: # Log error with Sentry logger.warning(err, exc_info=True) # Mark message as invalid raise InvalidMessage(partition, offset) from err
class ReplacementBatchWriter: """Produces replacement operations to Kafka""" def close(self) -> None: for message in self.__messages: batch = message.payload for value in batch.values: # Produce to replacements topic self.__producer.produce( self.__topic.name, key=batch.key, value=json.dumps(value), )
Replacement Operations:
Merge error groups
Unmerge error groups
Update group IDs
Mark events as deleted
Replacement Consumer:
Separate consumer reads replacement topic
Executes ClickHouse ALTER TABLE mutations
Updates rows in place
Replacements are expensive operations. ClickHouse mutations rewrite entire data parts.