The Selector API provides load balancing and node selection for client-side service discovery.
Interfaces
Selector
Node selection and load balancing interface.
type Selector interface {
Rebalancer
Select(ctx context.Context, opts ...SelectOption) (selected Node, done DoneFunc, err error)
}
Select
func(ctx context.Context, opts ...SelectOption) (Node, DoneFunc, error)
Selects a node based on the load balancing algorithm.
If err is nil, selected and done must not be empty.
Rebalancer
Interface for rebalancing nodes.
type Rebalancer interface {
Apply(nodes []Node)
}
Applies all nodes when any changes happen (e.g., nodes added or removed)
Builder
Builds a selector instance.
type Builder interface {
Build() Selector
}
Constructs and returns a Selector instance
Node
Represents a service node.
type Node interface {
Scheme() string
Address() string
ServiceName() string
InitialWeight() *int64
Version() string
Metadata() map[string]string
}
Returns the service node scheme (e.g., “http”, “grpc”)
Returns the unique address under the same service
Returns the initial scheduling weight. Returns nil if not set.
Returns the service node version
Returns node metadata (version, namespace, region, protocol, etc.)
Balancer
Balancer interface for load balancing algorithms.
type Balancer interface {
Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error)
}
Pick
func(ctx context.Context, nodes []WeightedNode) (WeightedNode, DoneFunc, error)
Picks a node from the list based on the balancing algorithm
BalancerBuilder
Builds a balancer instance.
type BalancerBuilder interface {
Build() Balancer
}
WeightedNode
Node with runtime weight calculation.
type WeightedNode interface {
Node
Raw() Node
Weight() float64
Pick() DoneFunc
PickElapsed() time.Duration
}
Returns the original node
Returns the runtime calculated weight
Marks the node as picked and returns a done function
Returns time elapsed since the latest pick
Types
DoneInfo
Callback info when RPC invocation completes.
type DoneInfo struct {
Err error
ReplyMD ReplyMD
BytesSent bool
BytesReceived bool
}
Indicates if any bytes have been sent to the server
Indicates if any bytes have been received from the server
DoneFunc
Callback function when RPC invocation completes.
type DoneFunc func(ctx context.Context, di DoneInfo)
ReplyMD
Reply metadata interface.
type ReplyMD interface {
Get(key string) string
}
Errors
var ErrNoAvailable = errors.ServiceUnavailable("no_available_node", "")
Returned when no available nodes are found.
Built-in Selectors
Kratos provides several built-in load balancing implementations:
Random
Random selection algorithm.
import "github.com/go-kratos/kratos/v2/selector/random"
selector := random.NewBuilder().Build()
Round Robin (WRR)
Weighted round-robin algorithm.
import "github.com/go-kratos/kratos/v2/selector/wrr"
selector := wrr.NewBuilder().Build()
P2C (Power of Two Choices)
Power of two choices algorithm for better load distribution.
import "github.com/go-kratos/kratos/v2/selector/p2c"
selector := p2c.NewBuilder().Build()
Usage Examples
Basic Node Selection
package main
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/random"
)
func main() {
// Create selector
sel := random.NewBuilder().Build()
// Apply nodes
nodes := []selector.Node{
// Node implementations
}
sel.Apply(nodes)
// Select a node
ctx := context.Background()
node, done, err := sel.Select(ctx)
if err != nil {
panic(err)
}
defer func() {
// Call done when request completes
done(ctx, selector.DoneInfo{
Err: nil,
})
}()
fmt.Printf("Selected: %s\n", node.Address())
}
Integration with gRPC Client
import (
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/selector/p2c"
)
func newClient() {
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///user-service"),
grpc.WithDiscovery(discovery),
grpc.WithSelector(
p2c.NewBuilder().Build(),
),
)
if err != nil {
panic(err)
}
defer conn.Close()
}
Custom Node Implementation
type customNode struct {
addr string
weight int64
version string
metadata map[string]string
}
func (n *customNode) Scheme() string {
return "grpc"
}
func (n *customNode) Address() string {
return n.addr
}
func (n *customNode) ServiceName() string {
return "user-service"
}
func (n *customNode) InitialWeight() *int64 {
return &n.weight
}
func (n *customNode) Version() string {
return n.version
}
func (n *customNode) Metadata() map[string]string {
return n.metadata
}
// Usage
node := &customNode{
addr: "127.0.0.1:9000",
weight: 100,
version: "v1.0.0",
metadata: map[string]string{
"region": "us-west",
"zone": "zone-a",
},
}
Custom Balancer
import "github.com/go-kratos/kratos/v2/selector"
type customBalancer struct{}
func (b *customBalancer) Pick(
ctx context.Context,
nodes []selector.WeightedNode,
) (selector.WeightedNode, selector.DoneFunc, error) {
if len(nodes) == 0 {
return nil, nil, selector.ErrNoAvailable
}
// Custom selection logic
selected := nodes[0]
done := func(ctx context.Context, di selector.DoneInfo) {
// Handle completion
}
return selected, done, nil
}
type customBuilder struct{}
func (b *customBuilder) Build() selector.Balancer {
return &customBalancer{}
}
Filtering Nodes
import "github.com/go-kratos/kratos/v2/selector/filter"
// Filter by version
func filterByVersion(version string) selector.NodeFilter {
return func(ctx context.Context, nodes []selector.Node) []selector.Node {
filtered := make([]selector.Node, 0, len(nodes))
for _, node := range nodes {
if node.Version() == version {
filtered = append(filtered, node)
}
}
return filtered
}
}
// Usage with selector
node, done, err := sel.Select(
ctx,
selector.WithFilter(filterByVersion("v1.0.0")),
)
Weighted Load Balancing
import "github.com/go-kratos/kratos/v2/selector/wrr"
// Create nodes with different weights
nodes := []selector.Node{
&customNode{addr: "192.168.1.10:9000", weight: 100},
&customNode{addr: "192.168.1.11:9000", weight: 200},
&customNode{addr: "192.168.1.12:9000", weight: 50},
}
// Use WRR selector
sel := wrr.NewBuilder().Build()
sel.Apply(nodes)
// 192.168.1.11:9000 will receive ~57% of traffic
// 192.168.1.10:9000 will receive ~29% of traffic
// 192.168.1.12:9000 will receive ~14% of traffic
Monitoring Selection Results
func selectWithMetrics(sel selector.Selector) {
ctx := context.Background()
node, done, err := sel.Select(ctx)
if err != nil {
// Record error metric
return
}
start := time.Now()
// Make request
err = makeRequest(node)
// Report metrics
done(ctx, selector.DoneInfo{
Err: err,
BytesSent: true,
BytesReceived: err == nil,
})
duration := time.Since(start)
fmt.Printf("Request to %s took %v\n", node.Address(), duration)
}