Overview
Distributed ClickHouse deployments introduce complexity in how migrations are executed across multiple nodes. This guide covers advanced strategies for managing migrations in replicated and sharded environments.
Distributed migration support is experimental. Use these strategies with caution and thorough testing in staging environments before production deployment.
Distributed Architecture
Cluster Topology
A typical distributed ClickHouse cluster has:
┌─────────────────────────────────────────────────┐
│ Query Nodes (Distributed) │
│ ┌────────────────────────────────────────┐ │
│ │ events_dist (Distributed table) │ │
│ └────────────────────────────────────────┘ │
└────────────┬──────────────────┬─────────────────┘
│ │
┌────────▼────────┐ ┌─────▼──────────┐
│ Shard 1 │ │ Shard 2 │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Replica 1 │ │ │ │ Replica 1 │ │
│ │ events_ │ │ │ │ events_ │ │
│ │ local │ │ │ │ local │ │
│ └───────────┘ │ │ └───────────┘ │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ Replica 2 │ │ │ │ Replica 2 │ │
│ │ events_ │ │ │ │ events_ │ │
│ │ local │ │ │ │ local │ │
│ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └────────────────┘
│ │
┌────┴──────────────────┴────┐
│ ZooKeeper Cluster │
│ (Coordination Layer) │
└────────────────────────────┘
Key Components
Local Tables
Store actual data on each shard
Replicated across replicas using ReplicatedMergeTree engines
Each shard contains a subset of the data
Distributed Tables
Virtual tables that aggregate results from local tables
Proxy queries to all shards
No data storage—just query routing
ZooKeeper
Coordinates replication between replicas
Manages distributed DDL execution
Ensures consistency across the cluster
ON CLUSTER Execution
Snuba uses ClickHouse’s ON CLUSTER syntax to execute DDL across all nodes:
How It Works
-- Without ON CLUSTER (single node only)
CREATE TABLE events_local (...) ENGINE = MergeTree() ...
-- With ON CLUSTER (all nodes in cluster)
CREATE TABLE events_local ON CLUSTER 'my_cluster' (...)
ENGINE = ReplicatedMergeTree() ...
When you use ON CLUSTER:
Submit to initiator node
The migration runner connects to a single node and submits the DDL with ON CLUSTER.
ZooKeeper coordination
ClickHouse stores the DDL command in ZooKeeper’s /clickhouse/task_queue/ddl/ path.
Distributed execution
Each node in the cluster picks up the task from ZooKeeper and executes it locally.
Wait for completion
The initiator waits for all nodes to complete (or fail) before returning.
Operation Targets and ON CLUSTER
Snuba automatically adds ON CLUSTER based on the operation target:
class SqlOperation :
def _get_on_cluster_clause ( self ) -> str :
cluster = get_cluster( self ._storage_set)
if cluster.is_single_node():
return ""
# Use appropriate cluster name based on target
if self .target == OperationTarget. DISTRIBUTED :
cluster_name = cluster.get_clickhouse_distributed_cluster_name()
else :
cluster_name = cluster.get_clickhouse_cluster_name()
if cluster_name:
return f " ON CLUSTER ' { cluster_name } '"
return ""
Example:
operations.CreateTable(
storage_set = StorageSetKey. EVENTS ,
table_name = "events_local" ,
columns = columns,
engine = table_engines.MergeTree( ... ),
target = OperationTarget. LOCAL ,
)
# Generates:
# CREATE TABLE events_local ON CLUSTER 'events_cluster' (...)
# ENGINE = ReplicatedMergeTree(
# '/clickhouse/tables/events/{shard}/default/events_local',
# '{replica}'
# )
Replication Strategies
ZooKeeper Paths
Each replicated table needs a unique ZooKeeper path for coordination:
def _get_zookeeper_path ( self , cluster : ClickhouseCluster, table_name : str ) -> str :
database_name = cluster.get_database()
if self ._unsharded is True :
# Data replicated on every shard
path = f "/clickhouse/tables/ { self ._storage_set_value } /all/ { database_name } / { table_name } "
else :
# Data sharded, each shard has different data
path = f "/clickhouse/tables/ { self ._storage_set_value } / {{ shard }} / { database_name } / { table_name } "
return f "' { path } '"
Sharded Replication
Data is partitioned across shards, with each shard having replicas:
table_engines.MergeTree(
storage_set = StorageSetKey. EVENTS ,
order_by = "(project_id, timestamp)" ,
unsharded = False , # Default: data is sharded
)
# Becomes:
# ReplicatedMergeTree(
# '/clickhouse/tables/events/{shard}/default/events_local',
# '{replica}'
# )
# Each shard has its own ZooKeeper path:
# - Shard 1: /clickhouse/tables/events/1/default/events_local
# - Shard 2: /clickhouse/tables/events/2/default/events_local
Unsharded Replication
Full data copy on every shard (useful for small reference tables):
table_engines.MergeTree(
storage_set = StorageSetKey. EVENTS ,
order_by = "(project_id, timestamp)" ,
unsharded = True , # All shards have all data
)
# Becomes:
# ReplicatedMergeTree(
# '/clickhouse/tables/events/all/default/events_local',
# '{replica}'
# )
# All shards share the same ZooKeeper path:
# /clickhouse/tables/events/all/default/events_local
Cluster Configuration
Storage Set Mapping
Each storage set must map to a cluster:
CLUSTERS = [
{
"host" : "clickhouse-node-1" ,
"port" : 9000 ,
"storage_sets" : {
"events" ,
"events_ro" ,
"transactions" ,
"discover" ,
},
"single_node" : False ,
"cluster_name" : "events_cluster" , # For local tables
"distributed_cluster_name" : "events_cluster_dist" , # For distributed tables
"database" : "default" ,
},
{
"host" : "clickhouse-node-2" ,
"port" : 9000 ,
"storage_sets" : {
"metrics" ,
"generic_metrics_sets" ,
"generic_metrics_distributions" ,
},
"single_node" : False ,
"cluster_name" : "metrics_cluster" ,
"distributed_cluster_name" : "metrics_cluster_dist" ,
"database" : "default" ,
},
]
Cluster Definitions in ClickHouse
Clusters are defined in ClickHouse configuration:
< clickhouse >
< remote_servers >
< events_cluster >
< shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-node-1 </ host >
< port > 9000 </ port >
</ replica >
< replica >
< host > clickhouse-node-2 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
< shard >
< internal_replication > true </ internal_replication >
< replica >
< host > clickhouse-node-3 </ host >
< port > 9000 </ port >
</ replica >
< replica >
< host > clickhouse-node-4 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ events_cluster >
< events_cluster_dist >
<!-- Query nodes for distributed table access -->
< shard >
< replica >
< host > clickhouse-query-1 </ host >
< port > 9000 </ port >
</ replica >
</ shard >
</ events_cluster_dist >
</ remote_servers >
</ clickhouse >
Advanced Migration Patterns
Conditional Migrations
Execute different operations based on cluster configuration:
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import get_cluster
def create_table_with_settings (
clickhouse : Optional[ClickhousePool]
) -> operations.SqlOperation:
cluster = get_cluster(StorageSetKey. EVENTS )
# Adjust settings based on cluster size
if cluster.is_single_node():
settings = { "index_granularity" : 8192 }
else :
# Smaller granularity for distributed to speed up queries
settings = { "index_granularity" : 4096 }
return operations.CreateTable(
storage_set = StorageSetKey. EVENTS ,
table_name = "events_local" ,
columns = columns,
engine = table_engines.MergeTree(
storage_set = StorageSetKey. EVENTS ,
order_by = "(project_id, timestamp)" ,
settings = settings,
),
target = OperationTarget. LOCAL ,
)
class Migration ( migration . CodeMigration ):
blocking = False
def forwards_global ( self ) -> Sequence[operations.GenericOperation]:
return [
operations.RunSqlAsCode(create_table_with_settings),
]
Version-Aware Migrations
Check ClickHouse version before executing operations:
from snuba.migrations import migration_utilities
class Migration ( migration . CodeMigration ):
blocking = False
def _create_table (
self , clickhouse : Optional[ClickhousePool]
) -> operations.SqlOperation:
settings = { "index_granularity" : 8192 }
clickhouse_version = migration_utilities.get_clickhouse_version_for_storage_set(
StorageSetKey. EVENTS , clickhouse
)
# Feature only available in newer versions
if migration_utilities.supports_setting(
clickhouse_version, "allow_nullable_key"
):
settings[ "allow_nullable_key" ] = 1
return operations.CreateTable(
storage_set = StorageSetKey. EVENTS ,
table_name = "events_local" ,
columns = columns,
engine = table_engines.MergeTree(
storage_set = StorageSetKey. EVENTS ,
order_by = "(project_id, timestamp)" ,
settings = settings,
),
target = OperationTarget. LOCAL ,
)
def forwards_global ( self ) -> Sequence[operations.GenericOperation]:
return [operations.RunSqlAsCode( self ._create_table)]
Multi-Step Data Migrations
Complex data migrations with intermediate steps:
import logging
from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
def create_temp_table ( logger : logging.Logger) -> None :
cluster = get_cluster(StorageSetKey. EVENTS )
connection = cluster.get_query_connection(ClickhouseClientSettings. MIGRATE )
logger.info( "Creating temporary table" )
connection.execute(
"""
CREATE TABLE events_temp_local ON CLUSTER 'events_cluster'
AS events_local
ENGINE = MergeTree()
ORDER BY (project_id, timestamp)
"""
)
def migrate_data ( logger : logging.Logger) -> None :
cluster = get_cluster(StorageSetKey. EVENTS )
logger.info( "Migrating data to temporary table" )
# Use per-node execution for data operations
for node in cluster.get_local_nodes():
connection = cluster.get_node_connection(
ClickhouseClientSettings. MIGRATE , node
)
logger.info( f "Migrating data on node: { node } " )
connection.execute(
"""
INSERT INTO events_temp_local
SELECT * FROM events_local
WHERE timestamp >= '2024-01-01'
"""
)
def swap_tables ( logger : logging.Logger) -> None :
cluster = get_cluster(StorageSetKey. EVENTS )
connection = cluster.get_query_connection(ClickhouseClientSettings. MIGRATE )
logger.info( "Swapping tables" )
connection.execute(
"RENAME TABLE events_local TO events_old_local, "
"events_temp_local TO events_local "
"ON CLUSTER 'events_cluster'"
)
class Migration ( migration . CodeMigration ):
blocking = True
def forwards_global ( self ) -> Sequence[operations.GenericOperation]:
return [
operations.RunPython(
func = create_temp_table,
description = "Create temporary table" ,
),
operations.RunPython(
func = migrate_data,
description = "Migrate data to new structure" ,
),
operations.RunPython(
func = swap_tables,
description = "Swap temporary and production tables" ,
),
]
Synchronization and Timing
Mutation Synchronization
Snuba uses synchronous settings to ensure operations complete across all replicas:
class SqlOperation :
def execute ( self ) -> None :
connection.execute(
sql,
settings = {
"alter_sync" : 2 , # Wait for all replicas
"mutations_sync" : 2 , # Wait for mutations on all replicas
}
)
# No polling needed - ClickHouse blocks until complete
Synchronization levels:
0 - Asynchronous (don’t wait)
1 - Wait for current replica only
2 - Wait for all replicas in the cluster
Retry on Sync Errors
Some operations automatically retry on metadata synchronization errors:
class RetryOnSyncError :
def execute ( self ) -> None :
for i in range ( 30 , - 1 , - 1 ): # Wait at most ~30 seconds
try :
super ().execute()
break
except Exception as e:
# Error 517: Metadata not up to date with ZooKeeper
if i and e.code == 517 :
time.sleep( 1 )
else :
raise
class AddColumn ( RetryOnSyncError , SqlOperation ):
# Inherits retry behavior
pass
Handling Migration Failures
Stuck Migrations
If a migration gets stuck in IN_PROGRESS:
Check cluster health
# Check if all nodes are accessible
echo "SELECT * FROM system.clusters WHERE cluster = 'events_cluster'" |
clickhouse-client
# Check for inactive replicas
echo "SELECT * FROM system.replicas WHERE is_readonly = 1" |
clickhouse-client
Check ZooKeeper connectivity
# Verify ZooKeeper is accessible
echo "SELECT * FROM system.zookeeper WHERE path = '/'" |
clickhouse-client
Reverse the in-progress migration
snuba migrations reverse-in-progress --group < grou p >
Retry after fixing issues
snuba migrations run --group < grou p > --migration-id < i d > --force
Partial Failures
If a migration fails on some nodes but succeeds on others:
import logging
from snuba.clusters.cluster import get_cluster
def verify_table_exists ( logger : logging.Logger) -> None :
"""Verify table exists on all nodes before proceeding."""
cluster = get_cluster(StorageSetKey. EVENTS )
for node in cluster.get_local_nodes():
connection = cluster.get_node_connection(
ClickhouseClientSettings. MIGRATE , node
)
result = connection.execute(
"SELECT count() FROM system.tables "
"WHERE name = 'events_local'"
).results
if result[ 0 ][ 0 ] == 0 :
raise Exception ( f "Table not found on node { node } " )
logger.info( f "Table verified on node { node } " )
class Migration ( migration . CodeMigration ):
blocking = False
def forwards_global ( self ) -> Sequence[operations.GenericOperation]:
return [
operations.RunSqlAsCode(
operations.CreateTable( ... )
),
operations.RunPython(
func = verify_table_exists,
description = "Verify table created on all nodes" ,
),
]
Monitoring and Observability
Check Migration Status
-- View migration history
SELECT
group,
migration_id,
status ,
timestamp
FROM migrations_dist
FINAL
ORDER BY group, migration_id;
-- Check for in-progress migrations
SELECT
group,
migration_id,
status ,
timestamp ,
now () - timestamp AS duration
FROM migrations_dist
FINAL
WHERE status = 'in_progress' ;
Check Replication Health
-- Check replica status
SELECT
database ,
table ,
replica_name,
is_leader,
is_readonly,
absolute_delay,
queue_size
FROM system . replicas
WHERE database = 'default'
ORDER BY table , replica_name;
-- Check for replication lag
SELECT
database ,
table ,
replica_name,
absolute_delay
FROM system . replicas
WHERE absolute_delay > 10 -- More than 10 seconds behind
ORDER BY absolute_delay DESC ;
Check DDL Queue
-- View pending distributed DDL operations
SELECT
entry ,
host,
status ,
exception
FROM system . distributed_ddl_queue
WHERE status != 'Finished'
ORDER BY entry ;
Best Practices for Distributed Migrations
Test in staging first
Always run migrations in a staging environment that mirrors production topology:
Same number of shards and replicas
Similar data volume
Same ClickHouse version
Monitor ZooKeeper health
Ensure ZooKeeper is healthy before running migrations: echo "ruok" | nc localhost 2181 # Should return "imok"
Check for inactive replicas
Verify all replicas are active before migrations: snuba migrations check-inactive-replicas
Use blocking flag appropriately
Mark migrations as blocking if they:
Modify large amounts of data
Require significant time to complete
Need to coordinate with application deployments
Plan for rollback
Ensure backward operations are tested and functional: # Test rollback with dry-run
snuba migrations reverse \
--group < grou p > \
--migration-id < i d > \
--dry-run
Troubleshooting Guide
Migration Timeout
Symptom: Migration hangs indefinitely
Possible causes:
ZooKeeper connectivity issues
One or more nodes unreachable
Replication lag causing synchronization delays
Solutions:
# Check node connectivity
for host in clickhouse-node- { 1..4} ; do
echo "Checking $host "
clickhouse-client --host $host --query "SELECT 1"
done
# Check ZooKeeper
echo "SELECT * FROM system.zookeeper WHERE path = '/clickhouse'" |
clickhouse-client
# Check replication queues
echo "SELECT database, table, replica_name, queue_size
FROM system.replicas WHERE queue_size > 0" |
clickhouse-client
Symptom: Error 517 (metadata not synchronized)
Solution: Operations with RetryOnSyncError automatically retry, but you can force a metadata sync:
SYSTEM SYNC REPLICA events_local;
Uneven Sharding
Symptom: Data unevenly distributed across shards
Solution: Verify sharding key in distributed table:
table_engines.Distributed(
local_table_name = "events_local" ,
sharding_key = "cityHash64(project_id)" , # Ensures even distribution
)
Next Steps
Migration Overview Review migration system fundamentals
Creating Migrations Learn to write custom migrations