Motivation
Distributing SQL processing provides several critical benefits:Remote-Side Filtering
Process filter expressions on nodes that hold the data, reducing network traffic and gateway load.
Remote-Side Updates
Execute UPDATE/DELETE operations directly on data nodes, avoiding round-trips through the gateway.
Parallel Computation
Distribute JOINs, aggregations, and sorting across nodes to scale with data volume.
Reduced Latency
Minimize data movement by processing close to storage, improving query response times.
Architecture Overview
DistSQL separates query execution into two phases:Logical Plan
An abstract, non-distributed data flow representation:- Independent of cluster topology
- Composed of processors (formerly “aggregators”)
- Defines what computation happens, not where
Physical Plan
A concrete mapping to cluster nodes:- Maps logical processors to specific CockroachDB nodes
- Replicates and specializes processors based on topology
- Defines communication channels between nodes
- Scheduled and executed on the cluster
Logical Planning
Logical plans are made up of processors that consume input streams of rows and produce output streams, each with a defined schema.
Processor Characteristics
Input/Output Streams- Typed row streams with defined schemas
- May consume multiple streams (e.g., for joins)
- Produce zero or one output stream
- Defines which rows must be processed together
- Enables distribution: different groups → different nodes
- Group key: subset of columns that define groups
- No grouping = maximum parallelization potential
- Some processors require ordered input
- Others guarantee ordered output
- Characterization function:
ord(input_order) → output_order - Sorting processors inserted when order mismatches occur
Example: Aggregation Query
Fromdocs/RFCS/20160421_distributed_sql.md:
Processor Types
Data Sources
TABLE READER
TABLE READER
Special processor with no input stream:
- Configured with table/index spans to read
- Outputs specified columns only
- Can apply filter expressions
- Provides ordering guarantees based on index
Computation Processors
AGGREGATOR
AGGREGATOR
Performs SQL aggregation functions:
- Groups rows by group key
- Computes: SUM, COUNT, COUNT DISTINCT, AVG, MIN, MAX
- Outputs one row per group
- Optional filter on aggregated results
EVALUATOR
EVALUATOR
Programmable row transformation:
- Processes one row at a time
- No grouping (fully parallelizable)
- Evaluates SQL expressions
- Can filter or transform data
SORT
SORT
Sorts input stream:
- No grouping (distributable to producers)
- Provides intra-stream ordering
- Global ordering via input synchronizer
- Configurable sort columns and direction
Join Processors
JOIN
JOIN
Stream-based join:
- Two input streams
- Equality constraints on columns
- Grouped on equality columns
- Supports INNER, LEFT, RIGHT, FULL joins
JOIN READER
JOIN READER
Index lookup join:
- Point lookups for keys from input stream
- Performs remote KV reads
- Can set up remote flows
- Efficient for index-based joins
Control Processors
Physical Planning
Logical processors are transformed into physical processors distributed across nodes based on data locality and query requirements.
Processor Structure
Each physical processor has three components:Input Synchronizer Types
Single-input: Pass-through for one stream Unsynchronized: Arbitrarily interleaves multiple streams Ordered: Merges streams while preserving ordering guarantees From the design document:The input synchronizer is careful to interleave the streams so that the merged stream has the same ordering guarantee as individual input streams.
Output Router Types
Single-output: Pass-through to one stream Mirror: Every row sent to all output streams Hashing: Rows distributed by hash function on specified columns- Ensures rows with same hash go to same node
- Enables distributed GROUP BY and JOIN
- Used for INSERT operations
- Used for JOIN READER processors
Distribution Strategy
Fromdocs/RFCS/20160421_distributed_sql.md:
Start with data layout
TABLE READER processors instantiated on range leaseholders for relevant spans
Place grouped processors
Single-group processors (FINAL, LIMIT) run on gateway or single chosen node
Execution Infrastructure
Flows
A flow is a subgraph of the physical plan executed on a single node:ScheduleFlows RPC
Gateway initiates distributed execution:ScheduleFlows call:
- Sets up input/output mailboxes
- Creates local processors
- Starts processor execution as goroutines
- Returns immediately (async execution)
Mailboxes
Mailboxes are named queues that allow producers and consumers to start at different times, buffering data until gRPC streams are established.
A gRPC stream is established by the consumer using the StreamMailbox RPC, taking a mailbox ID. From that moment on, gRPC flow control synchronizes producer and consumer.
Local Scheduling
Processors scheduled concurrently within a node:- Each processor runs as a goroutine
- Buffered channels connect processors
- Channel buffer size controls backpressure
- Natural Go concurrency model
Example: Daily Promotion Query
Complex query demonstrating DistSQL capabilities:- Multiple TABLE READERs distributed across range leaseholders
- Distributed AGGREGATORs with hash routing on CustomerID
- JOIN processors colocated with data
- Final aggregation on gateway node
Performance Characteristics
Benefits
Reduced Network Traffic- Filtering happens before data crosses network
- Only relevant rows transferred between nodes
- Aggregations produce smaller result sets
- Multiple nodes process query simultaneously
- Scales with cluster size
- CPU and I/O distributed across machines
- Gateway only handles coordination and final results
- Computation distributed to data nodes
- Better resource utilization
Trade-offs
Implementation Notes
Key source files:- Logical Planning:
pkg/sql/opt/(optimizer) - Physical Planning:
pkg/sql/physicalplan/ - Processor Implementations:
pkg/sql/rowexec/ - Flow Execution:
pkg/sql/flowinfra/ - DistSQL Server:
pkg/sql/distsql_running.go
Further Reading
SQL Layer
SQL parsing and planning
Transaction Layer
Distributed transactions