Skip to main content
Snuba supports multiple ClickHouse deployment topologies, from simple single-node setups to complex sharded and replicated clusters. This guide covers configuration, setup, and management of ClickHouse clusters.

ClickHouse Architecture

ClickHouse provides distributed query processing through:
  • Sharding: Horizontal data partitioning across nodes
  • Replication: Data redundancy using ZooKeeper
  • Distributed tables: Query routing across shards
  • Remote servers: Cross-cluster querying

Deployment Topologies

Single Node Setup

Simplest topology for development and testing:
# docker-compose.yml
services:
  clickhouse:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse.local
    extra_hosts:
      - "clickhouse.local:127.0.0.1"
    ports:
      - "9000:9000"   # Native protocol
      - "8123:8123"   # HTTP protocol
      - "9009:9009"   # Interserver HTTP
    volumes:
      - ./config/clickhouse/macros.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./config/clickhouse/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml
      - ./config/clickhouse/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
Configuration files:
Defines node identity in cluster:
<yandex>
    <macros>
        <replica>1</replica>
        <shard>1</shard>
    </macros>
</yandex>
  • replica: Replica ID within a shard (starts at 1)
  • shard: Shard ID (starts at 1)
ZooKeeper connection configuration:
<yandex>
    <zookeeper>
        <node>
            <host>zookeeper</host>
            <port>2181</port>
        </node>
    </zookeeper>
</yandex>
ZooKeeper is required for replicated tables, even in single-node setups that use ReplicatedMergeTree.
Cluster topology definition:
<yandex>
    <remote_servers>
        <cluster_one_sh>
            <shard>
                <replica>
                    <host>clickhouse</host>
                    <port>9000</port>
                </replica>
            </shard>
        </cluster_one_sh>
    </remote_servers>
</yandex>

Multi-Node Cluster for Testing

Test distributed queries with multiple nodes:
# docker-compose.yml
services:
  clickhouse-query:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse-query.local
    profiles: ["multi_node"]
    volumes:
      - ./config/clickhouse-query/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
      - ./config/clickhouse-query/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml
    ulimits:
      nofile: { soft: 262144, hard: 262144 }

  clickhouse-01:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse-01.local
    profiles: ["multi_node"]
    volumes:
      - ./config/clickhouse-01/macros.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./config/clickhouse-01/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml
      - ./config/clickhouse-01/zookeeper.xml:/etc/clickhouse-server/config.d/zookeeper.xml

  clickhouse-02:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse-02.local
    profiles: ["multi_node"]
    volumes:
      - ./config/clickhouse-02/macros.xml:/etc/clickhouse-server/config.d/macros.xml
      - ./config/clickhouse-02/remote_servers.xml:/etc/clickhouse-server/config.d/remote_servers.xml

  clickhouse-03:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse-03.local
    profiles: ["multi_node"]

  clickhouse-04:
    image: altinity/clickhouse-server:25.3.6.10034.altinitystable
    hostname: clickhouse-04.local
    profiles: ["multi_node"]

Production Cluster Topology

Three-tier architecture for production:
1

Query Nodes

Stateless query routers:
<!-- Query node remote_servers.xml -->
<yandex>
    <remote_servers>
        <query_cluster>
            <shard>
                <replica>
                    <host>clickhouse-query</host>
                    <port>9000</port>
                </replica>
            </shard>
        </query_cluster>
        
        <!-- Reference to storage cluster for distributed queries -->
        <storage_cluster>
            <shard>
                <replica>
                    <host>clickhouse-02</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>clickhouse-03</host>
                    <port>9000</port>
                </replica>
            </shard>
        </storage_cluster>
    </remote_servers>
</yandex>
Query nodes handle:
  • User query execution
  • Query parsing and optimization
  • Result aggregation from storage nodes
  • No data storage
2

Migration Nodes

Dedicated nodes for schema migrations:
<!-- Migration node remote_servers.xml -->
<yandex>
    <remote_servers>
        <migrations_cluster>
            <shard>
                <replica>
                    <host>clickhouse-01</host>
                    <port>9000</port>
                </replica>
            </shard>
        </migrations_cluster>
    </remote_servers>
</yandex>
Migration nodes handle:
  • DDL operations (CREATE, ALTER, DROP)
  • Schema changes
  • System table operations
  • Isolated from query load
3

Storage Nodes

Data storage and processing:
<!-- Storage node remote_servers.xml -->
<yandex>
    <remote_servers>
        <storage_cluster>
            <!-- Shard 1 -->
            <shard>
                <replica>
                    <host>clickhouse-02</host>
                    <port>9000</port>
                </replica>
            </shard>
            
            <!-- Shard 2 -->
            <shard>
                <replica>
                    <host>clickhouse-03</host>
                    <port>9000</port>
                </replica>
            </shard>
        </storage_cluster>
    </remote_servers>
</yandex>
Storage nodes handle:
  • Data ingestion from consumers
  • Local query execution
  • Data merging and compaction
  • Actual data storage

Sharded Cluster with Replication

Production-ready topology with HA:
<yandex>
    <remote_servers>
        <prod_cluster>
            <!-- Shard 1 with 2 replicas -->
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse-shard1-replica1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-shard1-replica2</host>
                    <port>9000</port>
                </replica>
            </shard>
            
            <!-- Shard 2 with 2 replicas -->
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse-shard2-replica1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-shard2-replica2</host>
                    <port>9000</port>
                </replica>
            </shard>
            
            <!-- Shard 3 with 2 replicas -->
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse-shard3-replica1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-shard3-replica2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </prod_cluster>
    </remote_servers>
</yandex>
Macros for replica nodes:
<!-- clickhouse-shard1-replica1 -->
<yandex>
    <macros>
        <replica>1</replica>
        <shard>1</shard>
    </macros>
</yandex>

<!-- clickhouse-shard1-replica2 -->
<yandex>
    <macros>
        <replica>2</replica>
        <shard>1</shard>
    </macros>
</yandex>

Snuba Cluster Configuration

Basic Cluster Setup

# settings.py
CLUSTERS = [
    {
        "host": "clickhouse",
        "port": 9000,
        "http_port": 8123,
        "user": "default",
        "password": "",
        "database": "default",
        "max_connections": 10,
        "storage_sets": {
            "discover",
            "events",
            "transactions",
            "metrics",
            "profiles",
        },
        "single_node": True,
        "cluster_name": "cluster_one_sh",
    }
]

Multi-Cluster Configuration

Separate clusters for different storage sets:
CLUSTERS = [
    # Events cluster
    {
        "host": "clickhouse-events",
        "port": 9000,
        "storage_sets": {"events", "errors", "transactions"},
        "single_node": False,
        "cluster_name": "events_cluster",
    },
    # Metrics cluster
    {
        "host": "clickhouse-metrics",
        "port": 9000,
        "storage_sets": {"metrics", "generic_metrics_sets"},
        "single_node": False,
        "cluster_name": "metrics_cluster",
    },
    # Profiles cluster
    {
        "host": "clickhouse-profiles",
        "port": 9000,
        "storage_sets": {"profiles", "functions"},
        "single_node": False,
        "cluster_name": "profiles_cluster",
    },
]

Storage Slicing Configuration

Horizontally partition storage across clusters:
# Define sliced storage sets
SLICED_STORAGE_SETS = {
    "events": 4,  # 4 slices
}

# Configure sliced clusters
SLICED_CLUSTERS = [
    {
        "host": "clickhouse-slice-0",
        "port": 9000,
        "storage_sets": {("events", 0)},  # Slice 0
        "cluster_name": "events_slice_0",
    },
    {
        "host": "clickhouse-slice-1",
        "port": 9000,
        "storage_sets": {("events", 1)},  # Slice 1
        "cluster_name": "events_slice_1",
    },
    {
        "host": "clickhouse-slice-2",
        "port": 9000,
        "storage_sets": {("events", 2)},  # Slice 2
        "cluster_name": "events_slice_2",
    },
    {
        "host": "clickhouse-slice-3",
        "port": 9000,
        "storage_sets": {("events", 3)},  # Slice 3
        "cluster_name": "events_slice_3",
    },
]

# Map logical partitions to slices
LOGICAL_PARTITION_MAPPING = {
    "events": {
        0: 0,  # Partition 0 -> Slice 0
        1: 1,  # Partition 1 -> Slice 1
        2: 2,  # Partition 2 -> Slice 2
        3: 3,  # Partition 3 -> Slice 3
    }
}

Table Engine Types

MergeTree (Non-Replicated)

Simple single-node tables:
CREATE TABLE events_local (
    event_id UUID,
    project_id UInt64,
    timestamp DateTime,
    message String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (project_id, timestamp, event_id);
Use cases:
  • Development and testing
  • Single-node deployments
  • Non-critical data

ReplicatedMergeTree

Replicated tables with ZooKeeper:
CREATE TABLE events_local ON CLUSTER '{cluster}' (
    event_id UUID,
    project_id UInt64,
    timestamp DateTime,
    message String
) ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/events',
    '{replica}'
)
PARTITION BY toYYYYMM(timestamp)
ORDER BY (project_id, timestamp, event_id);
Replication path macros:
  • {cluster}: Cluster name from remote_servers.xml
  • {shard}: Shard number from macros.xml
  • {replica}: Replica number from macros.xml
ZooKeeper path must be unique per table and shard. Use consistent naming like /clickhouse/tables/{shard}/{table_name}.

Distributed Tables

Query routing across shards:
CREATE TABLE events_dist ON CLUSTER '{cluster}' AS events_local
ENGINE = Distributed(
    '{cluster}',
    'default',
    'events_local',
    rand()  -- Sharding key (random distribution)
);
Sharding key options:
  • rand(): Random distribution
  • sipHash64(project_id): Hash-based sharding
  • project_id % 4: Modulo sharding

ZooKeeper Configuration

ZooKeeper Ensemble

Production ZooKeeper setup:
services:
  zookeeper-1:
    image: zookeeper:3.8
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: |
        server.1=zookeeper-1:2888:3888;2181
        server.2=zookeeper-2:2888:3888;2181
        server.3=zookeeper-3:2888:3888;2181
    volumes:
      - zk1-data:/data
      - zk1-log:/datalog

  zookeeper-2:
    image: zookeeper:3.8
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: |
        server.1=zookeeper-1:2888:3888;2181
        server.2=zookeeper-2:2888:3888;2181
        server.3=zookeeper-3:2888:3888;2181

  zookeeper-3:
    image: zookeeper:3.8
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: |
        server.1=zookeeper-1:2888:3888;2181
        server.2=zookeeper-2:2888:3888;2181
        server.3=zookeeper-3:2888:3888;2181

ClickHouse ZooKeeper Config

<yandex>
    <zookeeper>
        <node>
            <host>zookeeper-1</host>
            <port>2181</port>
        </node>
        <node>
            <host>zookeeper-2</host>
            <port>2181</port>
        </node>
        <node>
            <host>zookeeper-3</host>
            <port>2181</port>
        </node>
        
        <!-- Connection settings -->
        <session_timeout_ms>30000</session_timeout_ms>
        <operation_timeout_ms>10000</operation_timeout_ms>
        <root>/clickhouse</root>
    </zookeeper>
</yandex>
Use at least 3 ZooKeeper nodes for quorum. Never use even numbers (2, 4, 6) due to split-brain risks.

Monitoring Cluster Health

System Tables

Query cluster status:
-- Check cluster configuration
SELECT * FROM system.clusters;

-- View replication status
SELECT
    database,
    table,
    is_leader,
    is_readonly,
    absolute_delay,
    queue_size
FROM system.replicas;

-- Check ZooKeeper connection
SELECT * FROM system.zookeeper WHERE path = '/';

-- View shard and replica info
SELECT
    shard_num,
    replica_num,
    host_name,
    host_address,
    port
FROM system.clusters
WHERE cluster = 'prod_cluster';

Replication Lag

SELECT
    database,
    table,
    absolute_delay AS lag_seconds,
    queue_size AS pending_ops
FROM system.replicas
WHERE absolute_delay > 60
ORDER BY absolute_delay DESC;
Alert on replication lag:
  • Warning: lag > 60 seconds
  • Critical: lag > 300 seconds
  • Emergency: replica marked as readonly

Data Distribution

Check data balance across shards:
SELECT
    shard_num,
    count() AS parts_count,
    sum(rows) AS total_rows,
    formatReadableSize(sum(bytes_on_disk)) AS disk_size
FROM system.parts
WHERE table = 'events_local'
GROUP BY shard_num
ORDER BY shard_num;

Cluster Operations

Adding a New Node

1

Update remote_servers.xml

Add new node to cluster configuration:
<shard>
    <internal_replication>true</internal_replication>
    <replica>
        <host>clickhouse-shard4-replica1</host>
        <port>9000</port>
    </replica>
</shard>
2

Configure Node

Set up macros.xml on new node:
<yandex>
    <macros>
        <replica>1</replica>
        <shard>4</shard>
    </macros>
</yandex>
3

Create Tables

Create table structures on new node:
-- Run on new node
CREATE TABLE events_local AS existing_node.events_local;
4

Reload Configuration

Reload ClickHouse config on all nodes:
SYSTEM RELOAD CONFIG;
5

Verify Cluster

Check new node appears in cluster:
SELECT * FROM system.clusters WHERE cluster = 'prod_cluster';

Removing a Node

Removing nodes from replicated setups requires careful data migration to avoid data loss.
  1. Stop writes to the node
  2. Wait for replication to complete
  3. Update remote_servers.xml to remove node
  4. Reload configuration on all nodes
  5. Decommission the node

Resharding Data

Migrate to new sharding scheme:
-- Create new distributed table with new sharding
CREATE TABLE events_dist_new AS events_local
ENGINE = Distributed(
    'prod_cluster_new',
    'default',
    'events_local',
    sipHash64(project_id) % 6  -- 6 shards instead of 3
);

-- Migrate data (on each shard)
INSERT INTO events_dist_new
SELECT * FROM events_local;

-- Verify data migration
SELECT count() FROM events_dist_new;

-- Swap tables
RENAME TABLE events_dist TO events_dist_old,
             events_dist_new TO events_dist;

Best Practices

  1. Use ReplicatedMergeTree for all production tables
  2. Maintain 2-3 replicas per shard for high availability
  3. Use consistent hashing for sharding keys to enable resharding
  4. Monitor replication lag continuously
  5. Keep ZooKeeper healthy - it’s critical for replication
  6. Use separate query nodes to isolate user queries from writes
  7. Plan for 3x current capacity when sizing clusters
  8. Test failover procedures regularly
  9. Backup ZooKeeper data separately from ClickHouse
  10. Document cluster topology and update as changes occur

Troubleshooting

Replica Marked as Readonly

-- Check why replica is readonly
SELECT * FROM system.replicas WHERE is_readonly = 1;

-- Common causes:
-- 1. Disk full
-- 2. ZooKeeper connection lost
-- 3. Replication errors

-- Force replica to be writable (emergency only)
SYSTEM ENABLE FAILPOINT 'force_readonly_replica';

Replication Queue Stuck

-- View replication queue
SELECT
    database,
    table,
    num_tries,
    last_exception
FROM system.replication_queue
WHERE num_tries > 10;

-- Drop and recreate replica
SYSTEM DROP REPLICA 'replica1' FROM TABLE events_local;
SYSTEM RESTORE REPLICA events_local;

ZooKeeper Connection Issues

# Test ZooKeeper connectivity
echo ruok | nc zookeeper 2181
# Should return "imok"

# Check ClickHouse ZK connection
tail -f /var/log/clickhouse-server/clickhouse-server.log | grep -i zookeeper

Build docs developers (and LLMs) love