Skip to content

Commit

Permalink
Add tests for etcd v2 provider
Browse files Browse the repository at this point in the history
  • Loading branch information
maksim.konovalov authored and KaymeKaydex committed Jan 20, 2025
1 parent d936797 commit be369fe
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
BUG FIXES:
- Fixed go.mod and go.sum files with go mod tidy.

TESTS:

- Added etcd v2 provider tests.

## v2.0.1

in the v2.0.0 release there was a bug that did not allow go-get. Updating to v2 requires changing the module name
Expand Down
13 changes: 10 additions & 3 deletions providers/etcd/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"go.etcd.io/etcd/client/v2"
)

var (
ErrNodesError = fmt.Errorf("etcd nodes err")
ErrClusterError = fmt.Errorf("etcd cluster err")
ErrInstancesError = fmt.Errorf("etcd instances err")
)

// Check that provider implements TopologyProvider interface
var _ vshardrouter.TopologyProvider = (*Provider)(nil)

Expand Down Expand Up @@ -63,6 +69,7 @@ func mapCluster2Instances(replicasets []vshardrouter.ReplicasetInfo,
return currentTopology
}

// nolint:contextcheck
func (p *Provider) GetTopology() (map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo, error) {
resp, err := p.kapi.Get(context.TODO(), p.path, &client.GetOptions{Recursive: true})
if err != nil {
Expand All @@ -71,7 +78,7 @@ func (p *Provider) GetTopology() (map[vshardrouter.ReplicasetInfo][]vshardrouter
nodes := resp.Node.Nodes

if nodes.Len() < 2 {
return nil, fmt.Errorf("etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", p.path)
return nil, fmt.Errorf("%w: etcd path %s subnodes <2; minimum 2 (/clusters & /instances)", ErrNodesError, p.path)
}

var replicasets []vshardrouter.ReplicasetInfo
Expand All @@ -83,7 +90,7 @@ func (p *Provider) GetTopology() (map[vshardrouter.ReplicasetInfo][]vshardrouter
switch filepath.Base(node.Key) {
case "clusters":
if len(node.Nodes) < 1 {
return nil, fmt.Errorf("etcd path %s has no clusters", node.Key)
return nil, fmt.Errorf("%w: etcd path %s has no clusters", ErrClusterError, node.Key)
}

for _, rsNode := range node.Nodes {
Expand All @@ -109,7 +116,7 @@ func (p *Provider) GetTopology() (map[vshardrouter.ReplicasetInfo][]vshardrouter
}
case "instances":
if len(node.Nodes) < 1 {
return nil, fmt.Errorf("etcd path %s has no instances", node.Key)
return nil, fmt.Errorf("%w: etcd path %s has no instances", ErrInstancesError, node.Key)
}

for _, instanceNode := range node.Nodes {
Expand Down
123 changes: 123 additions & 0 deletions providers/etcd/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package etcd

import (
"context"
"errors"
"log"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/server/v3/embed"
)

func parseEtcdUrls(strs []string) []url.URL {
urls := make([]url.URL, 0, len(strs))

for _, str := range strs {
u, err := url.Parse(str)
if err != nil {
log.Printf("Invalid url %s, error: %s", str, err.Error())
continue

}
urls = append(urls, *u)
}

return urls

}

func TestProvider_Init(t *testing.T) {
ctx := context.Background()
config := embed.NewConfig()

config.Name = "localhost"
config.Dir = "/tmp/my-embedded-ectd-cluster"

config.ListenPeerUrls = parseEtcdUrls([]string{"http://0.0.0.0:2380"})
config.ListenClientUrls = parseEtcdUrls([]string{"http://0.0.0.0:2379"})
config.AdvertisePeerUrls = parseEtcdUrls([]string{"http://localhost:2380"})
config.AdvertiseClientUrls = parseEtcdUrls([]string{"http://localhost:2379"})
config.InitialCluster = "localhost=http://localhost:2380"
config.LogLevel = "panic"

// enable v2
config.EnableV2 = true

etcd, err := embed.StartEtcd(config)
require.NoError(t, err)

defer etcd.Close()

clientConfig := client.Config{Endpoints: []string{"http://127.0.0.1:2379"}}

c, err := client.New(clientConfig)
require.NoError(t, err)

kapi := client.NewKeysAPI(c)
require.NotNil(t, kapi)

time.Sleep(1 * time.Second)

t.Run("provider creates ok", func(t *testing.T) {
t.Parallel()

p, err := NewProvider(ctx, Config{EtcdConfig: clientConfig})
require.NoError(t, err)
require.NotNil(t, p)
})

t.Run("provider invalid config error", func(t *testing.T) {
t.Parallel()

invalidClientConfig := client.Config{Endpoints: []string{"http://0.0.0.0:23803"}}

p, err := NewProvider(ctx, Config{EtcdConfig: invalidClientConfig})
require.NoError(t, err)
require.NotNil(t, p)

_, err = p.GetTopology()
require.Error(t, err)
})

t.Run("nodes error", func(t *testing.T) {

nodesErrorPath := "/no-nodes"

p, err := NewProvider(ctx, Config{EtcdConfig: clientConfig, Path: nodesErrorPath})
require.NoError(t, err)
require.NotNil(t, p)

_, err = kapi.Set(ctx, nodesErrorPath, "test", &client.SetOptions{})
require.NoError(t, err)

_, err = p.GetTopology()
require.ErrorIs(t, err, ErrNodesError)
})

t.Run("no clusters or instances nodes error", func(t *testing.T) {
nodesClusterErrPath := "/test-nodes-cluster-instances-error"

p, err := NewProvider(ctx, Config{EtcdConfig: clientConfig, Path: nodesClusterErrPath})
require.NoError(t, err)
require.NotNil(t, p)

_, err = kapi.Set(ctx, nodesClusterErrPath, "", &client.SetOptions{Dir: true})
require.NoError(t, err)

_, err = kapi.Set(ctx, nodesClusterErrPath+"/clusters", "test", &client.SetOptions{})

require.NoError(t, err)

_, err = kapi.Set(ctx, nodesClusterErrPath+"/instances", "test", &client.SetOptions{})

require.NoError(t, err)

_, err = p.GetTopology()
require.NotErrorIs(t, err, ErrNodesError)
require.True(t, errors.Is(err, ErrClusterError) || errors.Is(err, ErrInstancesError))
})
}

0 comments on commit be369fe

Please sign in to comment.