Skip to main content

Overview

Distributed ClickHouse deployments introduce complexity in how migrations are executed across multiple nodes. This guide covers advanced strategies for managing migrations in replicated and sharded environments.
Distributed migration support is experimental. Use these strategies with caution and thorough testing in staging environments before production deployment.

Distributed Architecture

Cluster Topology

A typical distributed ClickHouse cluster has:
┌─────────────────────────────────────────────────┐
│                Query Nodes (Distributed)        │
│  ┌────────────────────────────────────────┐    │
│  │      events_dist (Distributed table)   │    │
│  └────────────────────────────────────────┘    │
└────────────┬──────────────────┬─────────────────┘
             │                  │
    ┌────────▼────────┐  ┌─────▼──────────┐
    │   Shard 1       │  │   Shard 2      │
    │  ┌───────────┐  │  │  ┌───────────┐ │
    │  │ Replica 1 │  │  │  │ Replica 1 │ │
    │  │ events_   │  │  │  │ events_   │ │
    │  │   local   │  │  │  │   local   │ │
    │  └───────────┘  │  │  └───────────┘ │
    │  ┌───────────┐  │  │  ┌───────────┐ │
    │  │ Replica 2 │  │  │  │ Replica 2 │ │
    │  │ events_   │  │  │  │ events_   │ │
    │  │   local   │  │  │  │   local   │ │
    │  └───────────┘  │  │  └───────────┘ │
    └─────────────────┘  └────────────────┘
             │                  │
        ┌────┴──────────────────┴────┐
        │      ZooKeeper Cluster     │
        │    (Coordination Layer)    │
        └────────────────────────────┘

Key Components

Local Tables
  • Store actual data on each shard
  • Replicated across replicas using ReplicatedMergeTree engines
  • Each shard contains a subset of the data
Distributed Tables
  • Virtual tables that aggregate results from local tables
  • Proxy queries to all shards
  • No data storage—just query routing
ZooKeeper
  • Coordinates replication between replicas
  • Manages distributed DDL execution
  • Ensures consistency across the cluster

ON CLUSTER Execution

Snuba uses ClickHouse’s ON CLUSTER syntax to execute DDL across all nodes:

How It Works

-- Without ON CLUSTER (single node only)
CREATE TABLE events_local (...) ENGINE = MergeTree() ...

-- With ON CLUSTER (all nodes in cluster)
CREATE TABLE events_local ON CLUSTER 'my_cluster' (...) 
    ENGINE = ReplicatedMergeTree() ...
When you use ON CLUSTER:
1

Submit to initiator node

The migration runner connects to a single node and submits the DDL with ON CLUSTER.
2

ZooKeeper coordination

ClickHouse stores the DDL command in ZooKeeper’s /clickhouse/task_queue/ddl/ path.
3

Distributed execution

Each node in the cluster picks up the task from ZooKeeper and executes it locally.
4

Wait for completion

The initiator waits for all nodes to complete (or fail) before returning.

Operation Targets and ON CLUSTER

Snuba automatically adds ON CLUSTER based on the operation target:
class SqlOperation:
    def _get_on_cluster_clause(self) -> str:
        cluster = get_cluster(self._storage_set)
        if cluster.is_single_node():
            return ""
        
        # Use appropriate cluster name based on target
        if self.target == OperationTarget.DISTRIBUTED:
            cluster_name = cluster.get_clickhouse_distributed_cluster_name()
        else:
            cluster_name = cluster.get_clickhouse_cluster_name()
        
        if cluster_name:
            return f" ON CLUSTER '{cluster_name}'"
        return ""
Example:
operations.CreateTable(
    storage_set=StorageSetKey.EVENTS,
    table_name="events_local",
    columns=columns,
    engine=table_engines.MergeTree(...),
    target=OperationTarget.LOCAL,
)

# Generates:
# CREATE TABLE events_local ON CLUSTER 'events_cluster' (...)
#     ENGINE = ReplicatedMergeTree(
#         '/clickhouse/tables/events/{shard}/default/events_local',
#         '{replica}'
#     )

Replication Strategies

ZooKeeper Paths

Each replicated table needs a unique ZooKeeper path for coordination:
def _get_zookeeper_path(self, cluster: ClickhouseCluster, table_name: str) -> str:
    database_name = cluster.get_database()
    
    if self._unsharded is True:
        # Data replicated on every shard
        path = f"/clickhouse/tables/{self._storage_set_value}/all/{database_name}/{table_name}"
    else:
        # Data sharded, each shard has different data
        path = f"/clickhouse/tables/{self._storage_set_value}/{{shard}}/{database_name}/{table_name}"
    
    return f"'{path}'"

Sharded Replication

Data is partitioned across shards, with each shard having replicas:
table_engines.MergeTree(
    storage_set=StorageSetKey.EVENTS,
    order_by="(project_id, timestamp)",
    unsharded=False,  # Default: data is sharded
)

# Becomes:
# ReplicatedMergeTree(
#     '/clickhouse/tables/events/{shard}/default/events_local',
#     '{replica}'
# )

# Each shard has its own ZooKeeper path:
# - Shard 1: /clickhouse/tables/events/1/default/events_local
# - Shard 2: /clickhouse/tables/events/2/default/events_local

Unsharded Replication

Full data copy on every shard (useful for small reference tables):
table_engines.MergeTree(
    storage_set=StorageSetKey.EVENTS,
    order_by="(project_id, timestamp)",
    unsharded=True,  # All shards have all data
)

# Becomes:
# ReplicatedMergeTree(
#     '/clickhouse/tables/events/all/default/events_local',
#     '{replica}'
# )

# All shards share the same ZooKeeper path:
# /clickhouse/tables/events/all/default/events_local

Cluster Configuration

Storage Set Mapping

Each storage set must map to a cluster:
settings.py
CLUSTERS = [
    {
        "host": "clickhouse-node-1",
        "port": 9000,
        "storage_sets": {
            "events",
            "events_ro",
            "transactions",
            "discover",
        },
        "single_node": False,
        "cluster_name": "events_cluster",  # For local tables
        "distributed_cluster_name": "events_cluster_dist",  # For distributed tables
        "database": "default",
    },
    {
        "host": "clickhouse-node-2",
        "port": 9000,
        "storage_sets": {
            "metrics",
            "generic_metrics_sets",
            "generic_metrics_distributions",
        },
        "single_node": False,
        "cluster_name": "metrics_cluster",
        "distributed_cluster_name": "metrics_cluster_dist",
        "database": "default",
    },
]

Cluster Definitions in ClickHouse

Clusters are defined in ClickHouse configuration:
config.xml
<clickhouse>
    <remote_servers>
        <events_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse-node-1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-node-2</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <internal_replication>true</internal_replication>
                <replica>
                    <host>clickhouse-node-3</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-node-4</host>
                    <port>9000</port>
                </replica>
            </shard>
        </events_cluster>
        
        <events_cluster_dist>
            <!-- Query nodes for distributed table access -->
            <shard>
                <replica>
                    <host>clickhouse-query-1</host>
                    <port>9000</port>
                </replica>
            </shard>
        </events_cluster_dist>
    </remote_servers>
</clickhouse>

Advanced Migration Patterns

Conditional Migrations

Execute different operations based on cluster configuration:
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import get_cluster

def create_table_with_settings(
    clickhouse: Optional[ClickhousePool]
) -> operations.SqlOperation:
    cluster = get_cluster(StorageSetKey.EVENTS)
    
    # Adjust settings based on cluster size
    if cluster.is_single_node():
        settings = {"index_granularity": 8192}
    else:
        # Smaller granularity for distributed to speed up queries
        settings = {"index_granularity": 4096}
    
    return operations.CreateTable(
        storage_set=StorageSetKey.EVENTS,
        table_name="events_local",
        columns=columns,
        engine=table_engines.MergeTree(
            storage_set=StorageSetKey.EVENTS,
            order_by="(project_id, timestamp)",
            settings=settings,
        ),
        target=OperationTarget.LOCAL,
    )

class Migration(migration.CodeMigration):
    blocking = False
    
    def forwards_global(self) -> Sequence[operations.GenericOperation]:
        return [
            operations.RunSqlAsCode(create_table_with_settings),
        ]

Version-Aware Migrations

Check ClickHouse version before executing operations:
from snuba.migrations import migration_utilities

class Migration(migration.CodeMigration):
    blocking = False
    
    def _create_table(
        self, clickhouse: Optional[ClickhousePool]
    ) -> operations.SqlOperation:
        settings = {"index_granularity": 8192}
        
        clickhouse_version = migration_utilities.get_clickhouse_version_for_storage_set(
            StorageSetKey.EVENTS, clickhouse
        )
        
        # Feature only available in newer versions
        if migration_utilities.supports_setting(
            clickhouse_version, "allow_nullable_key"
        ):
            settings["allow_nullable_key"] = 1
        
        return operations.CreateTable(
            storage_set=StorageSetKey.EVENTS,
            table_name="events_local",
            columns=columns,
            engine=table_engines.MergeTree(
                storage_set=StorageSetKey.EVENTS,
                order_by="(project_id, timestamp)",
                settings=settings,
            ),
            target=OperationTarget.LOCAL,
        )
    
    def forwards_global(self) -> Sequence[operations.GenericOperation]:
        return [operations.RunSqlAsCode(self._create_table)]

Multi-Step Data Migrations

Complex data migrations with intermediate steps:
import logging
from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster

def create_temp_table(logger: logging.Logger) -> None:
    cluster = get_cluster(StorageSetKey.EVENTS)
    connection = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE)
    
    logger.info("Creating temporary table")
    connection.execute(
        """
        CREATE TABLE events_temp_local ON CLUSTER 'events_cluster'
        AS events_local
        ENGINE = MergeTree()
        ORDER BY (project_id, timestamp)
        """
    )

def migrate_data(logger: logging.Logger) -> None:
    cluster = get_cluster(StorageSetKey.EVENTS)
    
    logger.info("Migrating data to temporary table")
    
    # Use per-node execution for data operations
    for node in cluster.get_local_nodes():
        connection = cluster.get_node_connection(
            ClickhouseClientSettings.MIGRATE, node
        )
        logger.info(f"Migrating data on node: {node}")
        connection.execute(
            """
            INSERT INTO events_temp_local
            SELECT * FROM events_local
            WHERE timestamp >= '2024-01-01'
            """
        )

def swap_tables(logger: logging.Logger) -> None:
    cluster = get_cluster(StorageSetKey.EVENTS)
    connection = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE)
    
    logger.info("Swapping tables")
    connection.execute(
        "RENAME TABLE events_local TO events_old_local, "
        "events_temp_local TO events_local "
        "ON CLUSTER 'events_cluster'"
    )

class Migration(migration.CodeMigration):
    blocking = True
    
    def forwards_global(self) -> Sequence[operations.GenericOperation]:
        return [
            operations.RunPython(
                func=create_temp_table,
                description="Create temporary table",
            ),
            operations.RunPython(
                func=migrate_data,
                description="Migrate data to new structure",
            ),
            operations.RunPython(
                func=swap_tables,
                description="Swap temporary and production tables",
            ),
        ]

Synchronization and Timing

Mutation Synchronization

Snuba uses synchronous settings to ensure operations complete across all replicas:
class SqlOperation:
    def execute(self) -> None:
        connection.execute(
            sql,
            settings={
                "alter_sync": 2,  # Wait for all replicas
                "mutations_sync": 2,  # Wait for mutations on all replicas
            }
        )
        # No polling needed - ClickHouse blocks until complete
Synchronization levels:
  • 0 - Asynchronous (don’t wait)
  • 1 - Wait for current replica only
  • 2 - Wait for all replicas in the cluster

Retry on Sync Errors

Some operations automatically retry on metadata synchronization errors:
class RetryOnSyncError:
    def execute(self) -> None:
        for i in range(30, -1, -1):  # Wait at most ~30 seconds
            try:
                super().execute()
                break
            except Exception as e:
                # Error 517: Metadata not up to date with ZooKeeper
                if i and e.code == 517:
                    time.sleep(1)
                else:
                    raise

class AddColumn(RetryOnSyncError, SqlOperation):
    # Inherits retry behavior
    pass

Handling Migration Failures

Stuck Migrations

If a migration gets stuck in IN_PROGRESS:
1

Check cluster health

# Check if all nodes are accessible
echo "SELECT * FROM system.clusters WHERE cluster = 'events_cluster'" | 
    clickhouse-client

# Check for inactive replicas
echo "SELECT * FROM system.replicas WHERE is_readonly = 1" | 
    clickhouse-client
2

Check ZooKeeper connectivity

# Verify ZooKeeper is accessible
echo "SELECT * FROM system.zookeeper WHERE path = '/'" | 
    clickhouse-client
3

Reverse the in-progress migration

snuba migrations reverse-in-progress --group <group>
4

Retry after fixing issues

snuba migrations run --group <group> --migration-id <id> --force

Partial Failures

If a migration fails on some nodes but succeeds on others:
import logging
from snuba.clusters.cluster import get_cluster

def verify_table_exists(logger: logging.Logger) -> None:
    """Verify table exists on all nodes before proceeding."""
    cluster = get_cluster(StorageSetKey.EVENTS)
    
    for node in cluster.get_local_nodes():
        connection = cluster.get_node_connection(
            ClickhouseClientSettings.MIGRATE, node
        )
        
        result = connection.execute(
            "SELECT count() FROM system.tables "
            "WHERE name = 'events_local'"
        ).results
        
        if result[0][0] == 0:
            raise Exception(f"Table not found on node {node}")
        
        logger.info(f"Table verified on node {node}")

class Migration(migration.CodeMigration):
    blocking = False
    
    def forwards_global(self) -> Sequence[operations.GenericOperation]:
        return [
            operations.RunSqlAsCode(
                operations.CreateTable(...)
            ),
            operations.RunPython(
                func=verify_table_exists,
                description="Verify table created on all nodes",
            ),
        ]

Monitoring and Observability

Check Migration Status

-- View migration history
SELECT 
    group,
    migration_id,
    status,
    timestamp
FROM migrations_dist
FINAL
ORDER BY group, migration_id;

-- Check for in-progress migrations
SELECT 
    group,
    migration_id,
    status,
    timestamp,
    now() - timestamp AS duration
FROM migrations_dist
FINAL
WHERE status = 'in_progress';

Check Replication Health

-- Check replica status
SELECT
    database,
    table,
    replica_name,
    is_leader,
    is_readonly,
    absolute_delay,
    queue_size
FROM system.replicas
WHERE database = 'default'
ORDER BY table, replica_name;

-- Check for replication lag
SELECT
    database,
    table,
    replica_name,
    absolute_delay
FROM system.replicas
WHERE absolute_delay > 10  -- More than 10 seconds behind
ORDER BY absolute_delay DESC;

Check DDL Queue

-- View pending distributed DDL operations
SELECT
    entry,
    host,
    status,
    exception
FROM system.distributed_ddl_queue
WHERE status != 'Finished'
ORDER BY entry;

Best Practices for Distributed Migrations

1

Test in staging first

Always run migrations in a staging environment that mirrors production topology:
  • Same number of shards and replicas
  • Similar data volume
  • Same ClickHouse version
2

Monitor ZooKeeper health

Ensure ZooKeeper is healthy before running migrations:
echo "ruok" | nc localhost 2181  # Should return "imok"
3

Check for inactive replicas

Verify all replicas are active before migrations:
snuba migrations check-inactive-replicas
4

Use blocking flag appropriately

Mark migrations as blocking if they:
  • Modify large amounts of data
  • Require significant time to complete
  • Need to coordinate with application deployments
5

Plan for rollback

Ensure backward operations are tested and functional:
# Test rollback with dry-run
snuba migrations reverse \
    --group <group> \
    --migration-id <id> \
    --dry-run

Troubleshooting Guide

Migration Timeout

Symptom: Migration hangs indefinitely Possible causes:
  • ZooKeeper connectivity issues
  • One or more nodes unreachable
  • Replication lag causing synchronization delays
Solutions:
# Check node connectivity
for host in clickhouse-node-{1..4}; do
    echo "Checking $host"
    clickhouse-client --host $host --query "SELECT 1"
done

# Check ZooKeeper
echo "SELECT * FROM system.zookeeper WHERE path = '/clickhouse'" | 
    clickhouse-client

# Check replication queues
echo "SELECT database, table, replica_name, queue_size 
      FROM system.replicas WHERE queue_size > 0" | 
    clickhouse-client

Metadata Inconsistency

Symptom: Error 517 (metadata not synchronized) Solution: Operations with RetryOnSyncError automatically retry, but you can force a metadata sync:
SYSTEM SYNC REPLICA events_local;

Uneven Sharding

Symptom: Data unevenly distributed across shards Solution: Verify sharding key in distributed table:
table_engines.Distributed(
    local_table_name="events_local",
    sharding_key="cityHash64(project_id)",  # Ensures even distribution
)

Next Steps

Migration Overview

Review migration system fundamentals

Creating Migrations

Learn to write custom migrations

Build docs developers (and LLMs) love