Overview
Snuba supports two deployment modes for ClickHouse, each with different migration characteristics:
- Local (Single-Node) Mode - Fully supported, used in development and smaller deployments
- Distributed Mode - Experimental, for multi-node clustered ClickHouse deployments
The mode affects how migrations are executed and how tables are created across ClickHouse nodes.
Distributed mode is experimental and should be used only for development purposes. Production use of distributed migrations requires careful testing and validation.
Single-Node Mode (Local)
Overview
In single-node mode, all data resides on a single ClickHouse server. This is the default and fully supported configuration.
Configuration
Single-node mode is the default configuration:
CLUSTERS = [
{
"host": "localhost",
"port": 9000,
"storage_sets": {"events", "transactions", "discover"},
"single_node": True, # Enable single-node mode
},
]
Migration Behavior
In single-node mode:
- Only local tables are created
- Distributed tables are automatically skipped
- Operations use simple
MergeTree engine families
- No replication or sharding configuration needed
Example Migration
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.operations import OperationTarget
class Migration(migration.ClickhouseNodeMigration):
blocking = False
def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
# Create local table
operations.CreateTable(
storage_set=StorageSetKey.EVENTS,
table_name="events_local",
columns=columns,
engine=table_engines.MergeTree(
storage_set=StorageSetKey.EVENTS,
order_by="(project_id, timestamp)",
partition_by="(toMonday(timestamp))",
),
target=OperationTarget.LOCAL,
),
# Distributed table creation is skipped automatically
operations.CreateTable(
storage_set=StorageSetKey.EVENTS,
table_name="events_dist",
columns=columns,
engine=table_engines.Distributed(
local_table_name="events_local",
sharding_key=None,
),
target=OperationTarget.DISTRIBUTED, # Skipped in single-node
),
]
Table Engines in Single-Node Mode
When single_node = True, table engines are simplified:
| Replicated Engine | Single-Node Equivalent |
|---|
ReplicatedMergeTree | MergeTree |
ReplicatedReplacingMergeTree | ReplacingMergeTree |
ReplicatedAggregatingMergeTree | AggregatingMergeTree |
ReplicatedSummingMergeTree | SummingMergeTree |
Running Migrations
List migrations
View all migrations and their current status. Run all pending migrations
snuba migrations migrate --force
Apply all pending migrations. The --force flag allows blocking migrations to run.Verify completion
Confirm all migrations show “completed” status.
If you’re running snuba devserver, migrations automatically run on startup—no manual intervention needed.
Distributed Mode
Overview
Distributed mode supports multi-node ClickHouse clusters with replication and sharding. This configuration is more complex but enables horizontal scaling.
Distributed mode support is experimental. Use only for development and testing purposes.
Architecture
Distributed deployments involve:
- Local tables - Store actual data on each shard
- Distributed tables - Query proxies that aggregate results from local tables
- Replication - Data redundancy across replicas using ZooKeeper
- Sharding - Data partitioning across multiple nodes
Configuration
Enable distributed mode with cluster configuration:
CLUSTERS = [
{
"host": "localhost",
"port": 9000,
"storage_sets": {"events", "transactions"},
"single_node": False, # Enable distributed mode
"cluster_name": "my_cluster", # Storage cluster name
"distributed_cluster_name": "my_cluster_dist", # Query cluster name
"database": "default",
},
]
The cluster names must match entries in ClickHouse’s system.clusters table.
ClickHouse Cluster Configuration
Example ClickHouse cluster configuration (usually in config.xml):
<clickhouse>
<remote_servers>
<my_cluster>
<shard>
<replica>
<host>clickhouse-node-1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse-node-2</host>
<port>9000</port>
</replica>
</shard>
</my_cluster>
</remote_servers>
</clickhouse>
Migration Behavior
In distributed mode:
- Both local and distributed tables are created
- Operations use replicated engines (
ReplicatedMergeTree, etc.)
- ZooKeeper coordinates replication
- Migrations use
ON CLUSTER syntax for cluster-wide DDL
Example Migration
class Migration(migration.ClickhouseNodeMigration):
blocking = False
def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
# Create replicated local tables on each shard
operations.CreateTable(
storage_set=StorageSetKey.EVENTS,
table_name="events_local",
columns=columns,
engine=table_engines.MergeTree(
storage_set=StorageSetKey.EVENTS,
order_by="(project_id, timestamp)",
partition_by="(toMonday(timestamp))",
),
target=OperationTarget.LOCAL,
),
# Create distributed table for querying
operations.CreateTable(
storage_set=StorageSetKey.EVENTS,
table_name="events_dist",
columns=columns,
engine=table_engines.Distributed(
local_table_name="events_local",
sharding_key="cityHash64(project_id)",
),
target=OperationTarget.DISTRIBUTED,
),
]
Table Engines in Distributed Mode
When single_node = False, replicated engines are used:
# MergeTree becomes ReplicatedMergeTree with ZooKeeper path
engine=table_engines.MergeTree(
storage_set=StorageSetKey.EVENTS,
order_by="(project_id, timestamp)",
)
# Generates:
# ReplicatedMergeTree(
# '/clickhouse/tables/events/{shard}/default/events_local',
# '{replica}'
# )
ZooKeeper Integration
Distributed mode requires ZooKeeper for coordination:
# Start ZooKeeper with devservices
devservices up zookeeper
# Verify ZooKeeper is accessible from ClickHouse
echo "SELECT * FROM system.zookeeper WHERE path = '/'" |
clickhouse-client
Running Distributed Migrations
Set environment variable
export SNUBA_SETTINGS=distributed
Configure Snuba to use distributed settings.Verify cluster configuration
echo "SELECT * FROM system.clusters" | clickhouse-client
Ensure your clusters are properly configured.Run migrations
SNUBA_SETTINGS=distributed snuba migrations migrate --force
Execute migrations with distributed configuration.
Operation Targets
All SQL operations must specify a target indicating which nodes the operation applies to:
OperationTarget.LOCAL
Executes on local tables (data storage layer):
- Single-node: Executes on the single node
- Distributed: Executes on all shards
operations.AddColumn(
storage_set=StorageSetKey.EVENTS,
table_name="events_local",
column=Column("new_column", String()),
target=OperationTarget.LOCAL,
)
OperationTarget.DISTRIBUTED
Executes on distributed tables (query layer):
- Single-node: Automatically skipped
- Distributed: Executes on query nodes
operations.AddColumn(
storage_set=StorageSetKey.EVENTS,
table_name="events_dist",
column=Column("new_column", String()),
target=OperationTarget.DISTRIBUTED,
)
OperationTarget.UNSET
Default value - will cause an error if left unchanged:
# This will fail!
operations.AddColumn(
storage_set=StorageSetKey.EVENTS,
table_name="events_local",
column=Column("new_column", String()),
# target not specified - defaults to UNSET
)
Always explicitly set the target parameter. Leaving it as UNSET will cause the migration to fail.
Switching Between Modes
When switching between modes, data is not automatically migrated:
Stop ClickHouse
devservices down clickhouse
Update configuration
Modify single_node setting in your cluster configuration.
Start ClickHouse
devservices up clickhouse
Different Docker volumes are used for each mode, so data from the previous mode won’t be accessible.Run migrations
snuba migrations migrate --force
Each mode uses separate storage volumes. Switching modes effectively starts with a clean database state.
Best Practices
For Development
- Use single-node mode for local development
- Test migrations in both modes before deploying
- Use
--dry-run to preview SQL without executing
For Production
- Thoroughly test distributed migrations in staging
- Monitor ZooKeeper health and connectivity
- Plan for downtime during blocking migrations
- Keep backups before running migrations
Migration Compatibility
Write migrations that work in both modes:
def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
# Local operation - always runs
operations.CreateTable(
table_name="events_local",
target=OperationTarget.LOCAL,
# ...
),
# Distributed operation - skipped in single-node
operations.CreateTable(
table_name="events_dist",
target=OperationTarget.DISTRIBUTED,
# ...
),
]
This approach ensures migrations work correctly regardless of deployment mode.