Snuba supports multiple ClickHouse deployment topologies, from simple single-node setups to complex sharded and replicated clusters. This guide covers configuration, setup, and management of ClickHouse clusters.
ClickHouse Architecture
ClickHouse provides distributed query processing through:
Sharding : Horizontal data partitioning across nodes
Replication : Data redundancy using ZooKeeper
Distributed tables : Query routing across shards
Remote servers : Cross-cluster querying
Deployment Topologies
Single Node Setup
Simplest topology for development and testing:
# docker-compose.yml
services :
clickhouse :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse.local
extra_hosts :
- "clickhouse.local:127.0.0.1"
ports :
- "9000:9000" # Native protocol
- "8123:8123" # HTTP protocol
- "9009:9009" # Interserver HTTP
volumes :
- ./config/clickhouse/macros.xml:/etc/clickhouse-server/config.d/macros.xml
- ./config/clickhouse/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml
- ./config/clickhouse/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
ulimits :
nofile :
soft : 262144
hard : 262144
Configuration files:
Defines node identity in cluster: < yandex >
< macros >
< replica > 1 </ replica >
< shard > 1 </ shard >
</ macros >
</ yandex >
replica: Replica ID within a shard (starts at 1)
shard: Shard ID (starts at 1)
ZooKeeper connection configuration: < yandex >
< zookeeper >
< node >
< host > zookeeper </ host >
< port > 2181 </ port >
</ node >
</ zookeeper >
</ yandex >
ZooKeeper is required for replicated tables, even in single-node setups that use ReplicatedMergeTree.
Cluster topology definition: < yandex >
< remote_servers >
< cluster_one_sh >
< shard >
< replica >
< host > clickhouse </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ cluster_one_sh >
</ remote_servers >
</ yandex >
Multi-Node Cluster for Testing
Test distributed queries with multiple nodes:
# docker-compose.yml
services :
clickhouse-query :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse-query.local
profiles : [ "multi_node" ]
volumes :
- ./config/clickhouse-query/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
- ./config/clickhouse-query/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml
ulimits :
nofile : { soft : 262144 , hard : 262144 }
clickhouse-01 :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse-01.local
profiles : [ "multi_node" ]
volumes :
- ./config/clickhouse-01/macros.xml:/etc/clickhouse-server/config.d/macros.xml
- ./config/clickhouse-01/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
- ./config/clickhouse-01/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml
clickhouse-02 :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse-02.local
profiles : [ "multi_node" ]
volumes :
- ./config/clickhouse-02/macros.xml:/etc/clickhouse-server/config.d/macros.xml
- ./config/clickhouse-02/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
clickhouse-03 :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse-03.local
profiles : [ "multi_node" ]
clickhouse-04 :
image : altinity/clickhouse-server:25.3.6.10034.altinitystable
hostname : clickhouse-04.local
profiles : [ "multi_node" ]
Production Cluster Topology
Three-tier architecture for production:
Query Nodes
Stateless query routers: <!-- Query node remote_servers.xml -->
< yandex >
< remote_servers >
< query_cluster >
< shard >
< replica >
< host > clickhouse-query </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ query_cluster >
<!-- Reference to storage cluster for distributed queries -->
< storage_cluster >
< shard >
< replica >
< host > clickhouse-02 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
< shard >
< replica >
< host > clickhouse-03 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ storage_cluster >
</ remote_servers >
</ yandex >
Query nodes handle:
User query execution
Query parsing and optimization
Result aggregation from storage nodes
No data storage
Migration Nodes
Dedicated nodes for schema migrations: <!-- Migration node remote_servers.xml -->
< yandex >
< remote_servers >
< migrations_cluster >
< shard >
< replica >
< host > clickhouse-01 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ migrations_cluster >
</ remote_servers >
</ yandex >
Migration nodes handle:
DDL operations (CREATE, ALTER, DROP)
Schema changes
System table operations
Isolated from query load
Storage Nodes
Data storage and processing: <!-- Storage node remote_servers.xml -->
< yandex >
< remote_servers >
< storage_cluster >
<!-- Shard 1 -->
< shard >
< replica >
< host > clickhouse-02 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
<!-- Shard 2 -->
< shard >
< replica >
< host > clickhouse-03 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ storage_cluster >
</ remote_servers >
</ yandex >
Storage nodes handle:
Data ingestion from consumers
Local query execution
Data merging and compaction
Actual data storage
Sharded Cluster with Replication
Production-ready topology with HA:
< yandex >
< remote_servers >
< prod_cluster >
<!-- Shard 1 with 2 replicas -->
< shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-shard1-replica1 </ host >
< port > 9000 </ port >
</ replica >
< replica >
< host > clickhouse-shard1-replica2 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
<!-- Shard 2 with 2 replicas -->
< shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-shard2-replica1 </ host >
< port > 9000 </ port >
</ replica >
< replica >
< host > clickhouse-shard2-replica2 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
<!-- Shard 3 with 2 replicas -->
< shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-shard3-replica1 </ host >
< port > 9000 </ port >
</ replica >
< replica >
< host > clickhouse-shard3-replica2 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ prod_cluster >
</ remote_servers >
</ yandex >
Macros for replica nodes:
<!-- clickhouse-shard1-replica1 -->
< yandex >
< macros >
< replica > 1 </ replica >
< shard > 1 </ shard >
</ macros >
</ yandex >
<!-- clickhouse-shard1-replica2 -->
< yandex >
< macros >
< replica > 2 </ replica >
< shard > 1 </ shard >
</ macros >
</ yandex >
Snuba Cluster Configuration
Basic Cluster Setup
# settings.py
CLUSTERS = [
{
"host" : "clickhouse" ,
"port" : 9000 ,
"http_port" : 8123 ,
"user" : "default" ,
"password" : "" ,
"database" : "default" ,
"max_connections" : 10 ,
"storage_sets" : {
"discover" ,
"events" ,
"transactions" ,
"metrics" ,
"profiles" ,
},
"single_node" : True ,
"cluster_name" : "cluster_one_sh" ,
}
]
Multi-Cluster Configuration
Separate clusters for different storage sets:
CLUSTERS = [
# Events cluster
{
"host" : "clickhouse-events" ,
"port" : 9000 ,
"storage_sets" : { "events" , "errors" , "transactions" },
"single_node" : False ,
"cluster_name" : "events_cluster" ,
},
# Metrics cluster
{
"host" : "clickhouse-metrics" ,
"port" : 9000 ,
"storage_sets" : { "metrics" , "generic_metrics_sets" },
"single_node" : False ,
"cluster_name" : "metrics_cluster" ,
},
# Profiles cluster
{
"host" : "clickhouse-profiles" ,
"port" : 9000 ,
"storage_sets" : { "profiles" , "functions" },
"single_node" : False ,
"cluster_name" : "profiles_cluster" ,
},
]
Storage Slicing Configuration
Horizontally partition storage across clusters:
# Define sliced storage sets
SLICED_STORAGE_SETS = {
"events" : 4 , # 4 slices
}
# Configure sliced clusters
SLICED_CLUSTERS = [
{
"host" : "clickhouse-slice-0" ,
"port" : 9000 ,
"storage_sets" : {( "events" , 0 )}, # Slice 0
"cluster_name" : "events_slice_0" ,
},
{
"host" : "clickhouse-slice-1" ,
"port" : 9000 ,
"storage_sets" : {( "events" , 1 )}, # Slice 1
"cluster_name" : "events_slice_1" ,
},
{
"host" : "clickhouse-slice-2" ,
"port" : 9000 ,
"storage_sets" : {( "events" , 2 )}, # Slice 2
"cluster_name" : "events_slice_2" ,
},
{
"host" : "clickhouse-slice-3" ,
"port" : 9000 ,
"storage_sets" : {( "events" , 3 )}, # Slice 3
"cluster_name" : "events_slice_3" ,
},
]
# Map logical partitions to slices
LOGICAL_PARTITION_MAPPING = {
"events" : {
0 : 0 , # Partition 0 -> Slice 0
1 : 1 , # Partition 1 -> Slice 1
2 : 2 , # Partition 2 -> Slice 2
3 : 3 , # Partition 3 -> Slice 3
}
}
Table Engine Types
MergeTree (Non-Replicated)
Simple single-node tables:
CREATE TABLE events_local (
event_id UUID,
project_id UInt64,
timestamp DateTime ,
message String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM( timestamp )
ORDER BY (project_id, timestamp , event_id);
Use cases:
Development and testing
Single-node deployments
Non-critical data
ReplicatedMergeTree
Replicated tables with ZooKeeper:
CREATE TABLE events_local ON CLUSTER '{cluster}' (
event_id UUID,
project_id UInt64,
timestamp DateTime ,
message String
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events' ,
'{replica}'
)
PARTITION BY toYYYYMM( timestamp )
ORDER BY (project_id, timestamp , event_id);
Replication path macros:
{cluster}: Cluster name from remote_servers.xml
{shard}: Shard number from macros.xml
{replica}: Replica number from macros.xml
ZooKeeper path must be unique per table and shard. Use consistent naming like /clickhouse/tables/{shard}/{table_name}.
Distributed Tables
Query routing across shards:
CREATE TABLE events_dist ON CLUSTER '{cluster}' AS events_local
ENGINE = Distributed (
'{cluster}' ,
'default' ,
'events_local' ,
rand () -- Sharding key (random distribution)
);
Sharding key options:
rand(): Random distribution
sipHash64(project_id): Hash-based sharding
project_id % 4: Modulo sharding
ZooKeeper Configuration
ZooKeeper Ensemble
Production ZooKeeper setup:
services :
zookeeper-1 :
image : zookeeper:3.8
environment :
ZOO_MY_ID : 1
ZOO_SERVERS : |
server.1=zookeeper-1:2888:3888;2181
server.2=zookeeper-2:2888:3888;2181
server.3=zookeeper-3:2888:3888;2181
volumes :
- zk1-data:/data
- zk1-log:/datalog
zookeeper-2 :
image : zookeeper:3.8
environment :
ZOO_MY_ID : 2
ZOO_SERVERS : |
server.1=zookeeper-1:2888:3888;2181
server.2=zookeeper-2:2888:3888;2181
server.3=zookeeper-3:2888:3888;2181
zookeeper-3 :
image : zookeeper:3.8
environment :
ZOO_MY_ID : 3
ZOO_SERVERS : |
server.1=zookeeper-1:2888:3888;2181
server.2=zookeeper-2:2888:3888;2181
server.3=zookeeper-3:2888:3888;2181
ClickHouse ZooKeeper Config
< yandex >
< zookeeper >
< node >
< host > zookeeper-1 </ host >
< port > 2181 </ port >
</ node >
< node >
< host > zookeeper-2 </ host >
< port > 2181 </ port >
</ node >
< node >
< host > zookeeper-3 </ host >
< port > 2181 </ port >
</ node >
<!-- Connection settings -->
< session_timeout_ms > 30000 </ session_timeout_ms >
< operation_timeout_ms > 10000 </ operation_timeout_ms >
< root > /clickhouse </ root >
</ zookeeper >
</ yandex >
Use at least 3 ZooKeeper nodes for quorum. Never use even numbers (2, 4, 6) due to split-brain risks.
Monitoring Cluster Health
System Tables
Query cluster status:
-- Check cluster configuration
SELECT * FROM system . clusters ;
-- View replication status
SELECT
database ,
table ,
is_leader,
is_readonly,
absolute_delay,
queue_size
FROM system . replicas ;
-- Check ZooKeeper connection
SELECT * FROM system . zookeeper WHERE path = '/' ;
-- View shard and replica info
SELECT
shard_num,
replica_num,
host_name,
host_address,
port
FROM system . clusters
WHERE cluster = 'prod_cluster' ;
Replication Lag
SELECT
database ,
table ,
absolute_delay AS lag_seconds,
queue_size AS pending_ops
FROM system . replicas
WHERE absolute_delay > 60
ORDER BY absolute_delay DESC ;
Alert on replication lag:
Warning: lag > 60 seconds
Critical: lag > 300 seconds
Emergency: replica marked as readonly
Data Distribution
Check data balance across shards:
SELECT
shard_num,
count () AS parts_count,
sum ( rows ) AS total_rows,
formatReadableSize( sum (bytes_on_disk)) AS disk_size
FROM system . parts
WHERE table = 'events_local'
GROUP BY shard_num
ORDER BY shard_num;
Cluster Operations
Adding a New Node
Update remote_servers.xml
Add new node to cluster configuration: < shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-shard4-replica1 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
Configure Node
Set up macros.xml on new node: < yandex >
< macros >
< replica > 1 </ replica >
< shard > 4 </ shard >
</ macros >
</ yandex >
Create Tables
Create table structures on new node: -- Run on new node
CREATE TABLE events_local AS existing_node . events_local ;
Reload Configuration
Reload ClickHouse config on all nodes:
Verify Cluster
Check new node appears in cluster: SELECT * FROM system . clusters WHERE cluster = 'prod_cluster' ;
Removing a Node
Removing nodes from replicated setups requires careful data migration to avoid data loss.
Stop writes to the node
Wait for replication to complete
Update remote_servers.xml to remove node
Reload configuration on all nodes
Decommission the node
Resharding Data
Migrate to new sharding scheme:
-- Create new distributed table with new sharding
CREATE TABLE events_dist_new AS events_local
ENGINE = Distributed (
'prod_cluster_new' ,
'default' ,
'events_local' ,
sipHash64(project_id) % 6 -- 6 shards instead of 3
);
-- Migrate data (on each shard)
INSERT INTO events_dist_new
SELECT * FROM events_local;
-- Verify data migration
SELECT count () FROM events_dist_new;
-- Swap tables
RENAME TABLE events_dist TO events_dist_old,
events_dist_new TO events_dist;
Best Practices
Use ReplicatedMergeTree for all production tables
Maintain 2-3 replicas per shard for high availability
Use consistent hashing for sharding keys to enable resharding
Monitor replication lag continuously
Keep ZooKeeper healthy - it’s critical for replication
Use separate query nodes to isolate user queries from writes
Plan for 3x current capacity when sizing clusters
Test failover procedures regularly
Backup ZooKeeper data separately from ClickHouse
Document cluster topology and update as changes occur
Troubleshooting
Replica Marked as Readonly
-- Check why replica is readonly
SELECT * FROM system . replicas WHERE is_readonly = 1 ;
-- Common causes:
-- 1. Disk full
-- 2. ZooKeeper connection lost
-- 3. Replication errors
-- Force replica to be writable (emergency only)
SYSTEM ENABLE FAILPOINT 'force_readonly_replica' ;
Replication Queue Stuck
-- View replication queue
SELECT
database ,
table ,
num_tries,
last_exception
FROM system . replication_queue
WHERE num_tries > 10 ;
-- Drop and recreate replica
SYSTEM DROP REPLICA 'replica1' FROM TABLE events_local;
SYSTEM RESTORE REPLICA events_local;
ZooKeeper Connection Issues
# Test ZooKeeper connectivity
echo ruok | nc zookeeper 2181
# Should return "imok"
# Check ClickHouse ZK connection
tail -f /var/log/clickhouse-server/clickhouse-server.log | grep -i zookeeper