Skip to main content

Overview

Snuba supports two deployment modes for ClickHouse, each with different migration characteristics:
  1. Local (Single-Node) Mode - Fully supported, used in development and smaller deployments
  2. Distributed Mode - Experimental, for multi-node clustered ClickHouse deployments
The mode affects how migrations are executed and how tables are created across ClickHouse nodes.
Distributed mode is experimental and should be used only for development purposes. Production use of distributed migrations requires careful testing and validation.

Single-Node Mode (Local)

Overview

In single-node mode, all data resides on a single ClickHouse server. This is the default and fully supported configuration.

Configuration

Single-node mode is the default configuration:
settings.py
CLUSTERS = [
    {
        "host": "localhost",
        "port": 9000,
        "storage_sets": {"events", "transactions", "discover"},
        "single_node": True,  # Enable single-node mode
    },
]

Migration Behavior

In single-node mode:
  • Only local tables are created
  • Distributed tables are automatically skipped
  • Operations use simple MergeTree engine families
  • No replication or sharding configuration needed

Example Migration

from snuba.migrations import migration, operations, table_engines
from snuba.migrations.operations import OperationTarget

class Migration(migration.ClickhouseNodeMigration):
    blocking = False
    
    def forwards_ops(self) -> Sequence[operations.SqlOperation]:
        return [
            # Create local table
            operations.CreateTable(
                storage_set=StorageSetKey.EVENTS,
                table_name="events_local",
                columns=columns,
                engine=table_engines.MergeTree(
                    storage_set=StorageSetKey.EVENTS,
                    order_by="(project_id, timestamp)",
                    partition_by="(toMonday(timestamp))",
                ),
                target=OperationTarget.LOCAL,
            ),
            # Distributed table creation is skipped automatically
            operations.CreateTable(
                storage_set=StorageSetKey.EVENTS,
                table_name="events_dist",
                columns=columns,
                engine=table_engines.Distributed(
                    local_table_name="events_local",
                    sharding_key=None,
                ),
                target=OperationTarget.DISTRIBUTED,  # Skipped in single-node
            ),
        ]

Table Engines in Single-Node Mode

When single_node = True, table engines are simplified:
Replicated EngineSingle-Node Equivalent
ReplicatedMergeTreeMergeTree
ReplicatedReplacingMergeTreeReplacingMergeTree
ReplicatedAggregatingMergeTreeAggregatingMergeTree
ReplicatedSummingMergeTreeSummingMergeTree

Running Migrations

1

List migrations

snuba migrations list
View all migrations and their current status.
2

Run all pending migrations

snuba migrations migrate --force
Apply all pending migrations. The --force flag allows blocking migrations to run.
3

Verify completion

snuba migrations list
Confirm all migrations show “completed” status.
If you’re running snuba devserver, migrations automatically run on startup—no manual intervention needed.

Distributed Mode

Overview

Distributed mode supports multi-node ClickHouse clusters with replication and sharding. This configuration is more complex but enables horizontal scaling.
Distributed mode support is experimental. Use only for development and testing purposes.

Architecture

Distributed deployments involve:
  • Local tables - Store actual data on each shard
  • Distributed tables - Query proxies that aggregate results from local tables
  • Replication - Data redundancy across replicas using ZooKeeper
  • Sharding - Data partitioning across multiple nodes

Configuration

Enable distributed mode with cluster configuration:
settings.py
CLUSTERS = [
    {
        "host": "localhost",
        "port": 9000,
        "storage_sets": {"events", "transactions"},
        "single_node": False,  # Enable distributed mode
        "cluster_name": "my_cluster",  # Storage cluster name
        "distributed_cluster_name": "my_cluster_dist",  # Query cluster name
        "database": "default",
    },
]
The cluster names must match entries in ClickHouse’s system.clusters table.

ClickHouse Cluster Configuration

Example ClickHouse cluster configuration (usually in config.xml):
<clickhouse>
    <remote_servers>
        <my_cluster>
            <shard>
                <replica>
                    <host>clickhouse-node-1</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-node-2</host>
                    <port>9000</port>
                </replica>
            </shard>
        </my_cluster>
    </remote_servers>
</clickhouse>

Migration Behavior

In distributed mode:
  • Both local and distributed tables are created
  • Operations use replicated engines (ReplicatedMergeTree, etc.)
  • ZooKeeper coordinates replication
  • Migrations use ON CLUSTER syntax for cluster-wide DDL

Example Migration

class Migration(migration.ClickhouseNodeMigration):
    blocking = False
    
    def forwards_ops(self) -> Sequence[operations.SqlOperation]:
        return [
            # Create replicated local tables on each shard
            operations.CreateTable(
                storage_set=StorageSetKey.EVENTS,
                table_name="events_local",
                columns=columns,
                engine=table_engines.MergeTree(
                    storage_set=StorageSetKey.EVENTS,
                    order_by="(project_id, timestamp)",
                    partition_by="(toMonday(timestamp))",
                ),
                target=OperationTarget.LOCAL,
            ),
            # Create distributed table for querying
            operations.CreateTable(
                storage_set=StorageSetKey.EVENTS,
                table_name="events_dist",
                columns=columns,
                engine=table_engines.Distributed(
                    local_table_name="events_local",
                    sharding_key="cityHash64(project_id)",
                ),
                target=OperationTarget.DISTRIBUTED,
            ),
        ]

Table Engines in Distributed Mode

When single_node = False, replicated engines are used:
# MergeTree becomes ReplicatedMergeTree with ZooKeeper path
engine=table_engines.MergeTree(
    storage_set=StorageSetKey.EVENTS,
    order_by="(project_id, timestamp)",
)

# Generates:
# ReplicatedMergeTree(
#     '/clickhouse/tables/events/{shard}/default/events_local',
#     '{replica}'
# )

ZooKeeper Integration

Distributed mode requires ZooKeeper for coordination:
# Start ZooKeeper with devservices
devservices up zookeeper

# Verify ZooKeeper is accessible from ClickHouse
echo "SELECT * FROM system.zookeeper WHERE path = '/'" | 
    clickhouse-client

Running Distributed Migrations

1

Set environment variable

export SNUBA_SETTINGS=distributed
Configure Snuba to use distributed settings.
2

Verify cluster configuration

echo "SELECT * FROM system.clusters" | clickhouse-client
Ensure your clusters are properly configured.
3

Run migrations

SNUBA_SETTINGS=distributed snuba migrations migrate --force
Execute migrations with distributed configuration.

Operation Targets

All SQL operations must specify a target indicating which nodes the operation applies to:

OperationTarget.LOCAL

Executes on local tables (data storage layer):
  • Single-node: Executes on the single node
  • Distributed: Executes on all shards
operations.AddColumn(
    storage_set=StorageSetKey.EVENTS,
    table_name="events_local",
    column=Column("new_column", String()),
    target=OperationTarget.LOCAL,
)

OperationTarget.DISTRIBUTED

Executes on distributed tables (query layer):
  • Single-node: Automatically skipped
  • Distributed: Executes on query nodes
operations.AddColumn(
    storage_set=StorageSetKey.EVENTS,
    table_name="events_dist",
    column=Column("new_column", String()),
    target=OperationTarget.DISTRIBUTED,
)

OperationTarget.UNSET

Default value - will cause an error if left unchanged:
# This will fail!
operations.AddColumn(
    storage_set=StorageSetKey.EVENTS,
    table_name="events_local",
    column=Column("new_column", String()),
    # target not specified - defaults to UNSET
)
Always explicitly set the target parameter. Leaving it as UNSET will cause the migration to fail.

Switching Between Modes

When switching between modes, data is not automatically migrated:
1

Stop ClickHouse

devservices down clickhouse
2

Update configuration

Modify single_node setting in your cluster configuration.
3

Start ClickHouse

devservices up clickhouse
Different Docker volumes are used for each mode, so data from the previous mode won’t be accessible.
4

Run migrations

snuba migrations migrate --force
Each mode uses separate storage volumes. Switching modes effectively starts with a clean database state.

Best Practices

For Development

  • Use single-node mode for local development
  • Test migrations in both modes before deploying
  • Use --dry-run to preview SQL without executing

For Production

  • Thoroughly test distributed migrations in staging
  • Monitor ZooKeeper health and connectivity
  • Plan for downtime during blocking migrations
  • Keep backups before running migrations

Migration Compatibility

Write migrations that work in both modes:
def forwards_ops(self) -> Sequence[operations.SqlOperation]:
    return [
        # Local operation - always runs
        operations.CreateTable(
            table_name="events_local",
            target=OperationTarget.LOCAL,
            # ...
        ),
        # Distributed operation - skipped in single-node
        operations.CreateTable(
            table_name="events_dist",
            target=OperationTarget.DISTRIBUTED,
            # ...
        ),
    ]
This approach ensures migrations work correctly regardless of deployment mode.

Build docs developers (and LLMs) love