diff --git a/balancer/randomsubsetting/randomsubsetting.go b/balancer/randomsubsetting/randomsubsetting.go new file mode 100644 index 000000000000..9b3a9ed54948 --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting.go @@ -0,0 +1,201 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package randomsubsetting defines a random subsetting balancer. +// +// To install random subsetting balancer, import this package as: +// +// import _ "google.golang.org/grpc/balancer/randomsubsetting" +package randomsubsetting + +import ( + "encoding/json" + "errors" + "fmt" + "sort" + "time" + + "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/balancer/gracefulswitch" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" +) + +const ( + // Name is the name of the random subsetting load balancer. + Name = "random_subsetting" +) + +var ( + logger = grpclog.Component(Name) +) + +func prefixLogger(p *subsettingBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[random-subsetting-lb %p] ", p)) +} + +func init() { + balancer.Register(bb{}) +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &subsettingBalancer{ + cc: cc, + hashf: xxhash.NewWithSeed(uint64(time.Now().UnixNano())), + } + // Create a logger with a prefix specific to this balancer instance. + b.logger = prefixLogger(b) + + b.logger.Infof("Created") + b.child = gracefulswitch.NewBalancer(cc, bOpts) + return b +} + +// LBConfig is the config for the outlier detection balancer. +type LBConfig struct { + serviceconfig.LoadBalancingConfig `json:"-"` + + SubsetSize uint64 `json:"subset_size,omitempty"` + + ChildPolicy *iserviceconfig.BalancerConfig `json:"child_policy,omitempty"` +} + +func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + lbCfg := &LBConfig{ + // Default top layer values. + SubsetSize: 10, + } + + if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well. + return nil, fmt.Errorf("subsetting: unable to unmarshal LBconfig: %s, error: %v", string(s), err) + } + + // if someonw needs subsetSize == 1, he should use pick_first instead + if lbCfg.SubsetSize < 2 { + return nil, errors.New("subsetting: subsetSize must be >= 2") + } + + return lbCfg, nil +} + +func (bb) Name() string { + return Name +} + +type subsettingBalancer struct { + cc balancer.ClientConn + logger *internalgrpclog.PrefixLogger + cfg *LBConfig + hashf *xxhash.Digest + child *gracefulswitch.Balancer +} + +func (b *subsettingBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + // Reject whole config if child policy doesn't exist, don't persist it for + // later. + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("subsetting: child balancer %q not registered", lbCfg.ChildPolicy.Name) + } + + if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + err := b.child.SwitchTo(bb) + if err != nil { + return fmt.Errorf("subsetting: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) + } + } + b.cfg = lbCfg + + err := b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: b.prepareChildResolverState(s.ResolverState), + BalancerConfig: b.cfg.ChildPolicy.Config, + }) + + return err +} + +type AddressWithHash struct { + hash uint64 + addr resolver.Address +} + +// implements the subsetting algorithm, as described in A68: https://github.com/grpc/proposal/pull/423 +func (b *subsettingBalancer) prepareChildResolverState(s resolver.State) resolver.State { + addresses := s.Addresses + backendCount := len(addresses) + if backendCount <= int(b.cfg.SubsetSize) { + return s + } + + addressesSet := make([]AddressWithHash, backendCount) + // calculate hash for each endpoint + for i, endpoint := range addresses { + + b.hashf.Write([]byte(s.Addresses[0].String())) + addressesSet[i] = AddressWithHash{ + hash: b.hashf.Sum64(), + addr: endpoint, + } + } + // sort addresses by hash + sort.Slice(addressesSet, func(i, j int) bool { + return addressesSet[i].hash < addressesSet[j].hash + }) + + b.logger.Infof("resulting subset: %v", addressesSet[:b.cfg.SubsetSize]) + + // Convert back to resolver.addresses + addressesSubset := make([]resolver.Address, b.cfg.SubsetSize) + for _, eh := range addressesSet[:b.cfg.SubsetSize] { + addressesSubset = append(addressesSubset, eh.addr) + } + + return resolver.State{ + Addresses: addressesSubset, + ServiceConfig: s.ServiceConfig, + Attributes: s.Attributes, + } +} + +func (b *subsettingBalancer) ResolverError(err error) { + b.child.ResolverError(err) +} + +func (b *subsettingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.child.UpdateSubConnState(sc, state) +} + +func (b *subsettingBalancer) Close() { + b.child.Close() +} + +func (b *subsettingBalancer) ExitIdle() { + b.child.ExitIdle() +} diff --git a/balancer/randomsubsetting/randomsubsetting_test.go b/balancer/randomsubsetting/randomsubsetting_test.go new file mode 100644 index 000000000000..77be56dd48be --- /dev/null +++ b/balancer/randomsubsetting/randomsubsetting_test.go @@ -0,0 +1,222 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package e2e_test contains e2e test cases for the Subsetting LB Policy. +package randomsubsetting_test + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + + _ "google.golang.org/grpc/balancer/randomsubsetting" +) + +var defaultTestTimeout = 5 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func setupBackends(t *testing.T, backendsCount int) ([]resolver.Address, func()) { + t.Helper() + + backends := make([]*stubserver.StubServer, backendsCount) + addresses := make([]resolver.Address, backendsCount) + for i := 0; i < backendsCount; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started good TestService backend at: %q", backend.Address) + backends[i] = backend + addresses[i] = resolver.Address{ + Addr: backend.Address, + } + } + + cancel := func() { + for _, backend := range backends { + backend.Stop() + } + } + return addresses, cancel +} + +func setupClients(t *testing.T, clientsCount int, subsetSize int, addresses []resolver.Address) ([]testgrpc.TestServiceClient, func()) { + t.Helper() + + clients := make([]testgrpc.TestServiceClient, clientsCount) + ccs := make([]*grpc.ClientConn, clientsCount) + var err error + + for i := 0; i < clientsCount; i++ { + mr := manual.NewBuilderWithScheme("subsetting-e2e") + jsonConfig := fmt.Sprintf(` + { + "loadBalancingConfig": [ + { + "random_subsetting": { + "subset_size": %d, + "child_policy": [{"round_robin": {}}] + } + } + ] + }`, subsetSize) + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(string(jsonConfig)) + mr.InitialState(resolver.State{ + Addresses: addresses, + ServiceConfig: sc, + }) + + ccs[i], err = grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + clients[i] = testgrpc.NewTestServiceClient(ccs[i]) + } + + cancel := func() { + for _, cc := range ccs { + cc.Close() + } + } + return clients, cancel +} + +func checkRoundRobinRPCs(t *testing.T, ctx context.Context, clients []testgrpc.TestServiceClient, subsetSize int, maxDiff int) { + clientsPerBackend := map[string]map[int]struct{}{} + + for clientIdx, client := range clients { + // make sure that every client send exactly 1 request to each server in its subset + for i := 0; i < subsetSize; i++ { + var peer peer.Peer + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) + if err != nil { + t.Fatalf("failed to call server: %v", err) + } + if peer.Addr != nil { + if m, ok := clientsPerBackend[peer.Addr.String()]; !ok { + clientsPerBackend[peer.Addr.String()] = map[int]struct{}{clientIdx: {}} + } else if _, ok := m[clientIdx]; !ok { + m[clientIdx] = struct{}{} + } else { + // The backend receives a second request from the same client. This could happen if the client have 1 backend in READY + // state while the other are CONNECTING. In this case round_robbin will pick the same address twice. + // We are going to retry after short timeout. + time.Sleep(10 * time.Microsecond) + i-- + } + } else { + t.Fatalf("peer.Addr == nil, peer: %v", peer) + } + } + } + + minClientsPerBackend := math.MaxInt + maxClientsPerBackend := 0 + for _, v := range clientsPerBackend { + if len(v) < minClientsPerBackend { + minClientsPerBackend = len(v) + } + if len(v) > maxClientsPerBackend { + maxClientsPerBackend = len(v) + } + } + + if maxClientsPerBackend > minClientsPerBackend+maxDiff { + t.Fatalf("the difference between min and max clients per backend should be <= %d, clientsPerBackend: %v", maxDiff, clientsPerBackend) + } +} + +func (s) TestSubsettingE2E(t *testing.T) { + tests := []struct { + name string + subsetSize int + clients int + backends int + maxDiff int + }{ + { + name: "backends could be evenly distributed between clients", + backends: 12, + clients: 8, + subsetSize: 3, + maxDiff: 0, + }, + { + name: "backends could NOT be evenly distributed between clients", + backends: 37, + clients: 22, + subsetSize: 5, + maxDiff: 2, + }, + { + name: "Nbackends %% subsetSize == 0, but there are not enough clients to fill the last round", + backends: 20, + clients: 7, + subsetSize: 5, + maxDiff: 1, + }, + { + name: "last round is completely filled, but there are some excluded backends on every round", + backends: 21, + clients: 8, + subsetSize: 5, + maxDiff: 1, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + addresses, stopBackends := setupBackends(t, test.backends) + defer stopBackends() + + clients, stopClients := setupClients(t, test.clients, test.subsetSize, addresses) + defer stopClients() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + checkRoundRobinRPCs(t, ctx, clients, test.subsetSize, test.maxDiff) + }) + } +}