Skip to content

Commit 7e5e58c

Browse files
committed
Add a docker based etcdintegration package
PR 2 for #4144 High level approach is as described in #4144 . This PR adds: - Functions to spin up a 1 node etcd cluster using docker (in `dockerexternal`) - A drop in replacement for the etcd/integration package using `dockerexternal` commit-id:e4e80f1d
1 parent 459e831 commit 7e5e58c

File tree

12 files changed

+1215
-5
lines changed

12 files changed

+1215
-5
lines changed

docker-compose.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ services:
88
volumes:
99
- .:/go/src/github.com/m3db/m3
1010
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
11+
# Support running docker within docker. That is, buildkite jobs themselves run in a container; that container
12+
# needs to be able to spin up functioning docker containers.
13+
- /var/run/docker.sock:/var/run/docker.sock
14+
extra_hosts:
15+
# Allow routing from the buildkite container to the host machine, as host.docker.internal. This allows us to do
16+
# the following:
17+
# - Spin up an etcd container with ports published to the host machine
18+
# - Connect to the etcd container from the buildkite test process using host.docker.internal
19+
# See
20+
# https://medium.com/@TimvanBaarsen/how-to-connect-to-the-docker-host-from-inside-a-docker-container-112b4c71bc66
21+
- "host.docker.internal:host-gateway"
1122
environment:
1223
- CI
1324
- BUILDKITE

src/cluster/client/etcd/client_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,23 @@ import (
2525
"testing"
2626
"time"
2727

28+
"github.com/m3db/m3/src/cluster/kv"
29+
"github.com/m3db/m3/src/cluster/services"
30+
integration "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration"
31+
"github.com/m3db/m3/src/x/retry"
32+
2833
"github.com/stretchr/testify/assert"
2934
"github.com/stretchr/testify/require"
3035
clientv3 "go.etcd.io/etcd/client/v3"
31-
"go.etcd.io/etcd/tests/v3/framework/integration"
3236
"google.golang.org/grpc"
33-
34-
"github.com/m3db/m3/src/cluster/kv"
35-
"github.com/m3db/m3/src/cluster/services"
3637
)
3738

3839
func TestETCDClientGen(t *testing.T) {
39-
cs, err := NewConfigServiceClient(testOptions())
40+
cs, err := NewConfigServiceClient(
41+
testOptions().
42+
// These are error cases; don't retry for no reason.
43+
SetRetryOptions(retry.NewOptions().SetMaxRetries(0)),
44+
)
4045
require.NoError(t, err)
4146

4247
c := cs.(*csclient)

src/cluster/client/etcd/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ func (c ClusterConfig) NewCluster() Cluster {
6161

6262
if c.AutoSyncInterval > 0 {
6363
cluster = cluster.SetAutoSyncInterval(c.AutoSyncInterval)
64+
} else if c.AutoSyncInterval < 0 {
65+
// Autosync should *always* be on, unless the user very explicitly requests it to be off, via a negative value
66+
// (in which case we can assume they know what they're doing).
67+
// Therefore, only update if it's nonzero.
68+
cluster = cluster.SetAutoSyncInterval(0)
6469
}
6570

6671
if c.DialTimeout > 0 {

src/cluster/client/etcd/options.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ func (c cluster) AutoSyncInterval() time.Duration {
464464
}
465465

466466
func (c cluster) SetAutoSyncInterval(autoSyncInterval time.Duration) Cluster {
467+
if autoSyncInterval < 0 {
468+
autoSyncInterval = 0
469+
}
467470
c.autoSyncInterval = autoSyncInterval
468471
return c
469472
}

src/cluster/client/etcd/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ type Cluster interface {
159159
SetTLSOptions(TLSOptions) Cluster
160160

161161
AutoSyncInterval() time.Duration
162+
// SetAutoSyncInterval sets the etcd client to autosync cluster endpoints periodically. This defaults to
163+
// 1 minute (defaultAutoSyncInterval). If negative
162164
SetAutoSyncInterval(value time.Duration) Cluster
163165

164166
DialTimeout() time.Duration
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// Copyright (c) 2022 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package dockerexternal
22+
23+
import (
24+
"context"
25+
"errors"
26+
"fmt"
27+
"math/rand"
28+
"net"
29+
"strconv"
30+
"time"
31+
32+
//nolint:gci
33+
"github.com/ory/dockertest/v3"
34+
"github.com/ory/dockertest/v3/docker"
35+
clientv3 "go.etcd.io/etcd/client/v3"
36+
"go.uber.org/zap"
37+
"google.golang.org/grpc"
38+
39+
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge"
40+
xdockertest "github.com/m3db/m3/src/x/dockertest"
41+
xerrors "github.com/m3db/m3/src/x/errors"
42+
"github.com/m3db/m3/src/x/instrument"
43+
"github.com/m3db/m3/src/x/retry"
44+
)
45+
46+
var (
47+
etcdImage = xdockertest.Image{
48+
Name: "bitnami/etcd",
49+
Tag: "3.5.4",
50+
}
51+
)
52+
53+
// NewEtcd constructs a single etcd node, running in a docker container.
54+
func NewEtcd(
55+
pool *dockertest.Pool,
56+
instrumentOpts instrument.Options,
57+
options ...EtcdClusterOption,
58+
) (*EtcdNode, error) {
59+
logger := instrumentOpts.Logger()
60+
if logger == nil {
61+
logger = zap.NewNop()
62+
instrumentOpts = instrumentOpts.SetLogger(logger)
63+
}
64+
65+
var opts etcdClusterOptions
66+
for _, o := range options {
67+
o.apply(&opts)
68+
}
69+
70+
return &EtcdNode{
71+
pool: pool,
72+
instrumentOpts: instrumentOpts,
73+
logger: logger,
74+
opts: opts,
75+
// Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here
76+
// (we don't know the endpoints, and therefore need to construct it ourselves).
77+
// Thus, we do two hops (mock newClient returning mock memberClient)
78+
newClient: func(config clientv3.Config) (memberClient, error) {
79+
return clientv3.New(config)
80+
},
81+
}, nil
82+
}
83+
84+
// EtcdNode is a single etcd node, running via a docker container.
85+
//nolint:maligned
86+
type EtcdNode struct {
87+
instrumentOpts instrument.Options
88+
logger *zap.Logger
89+
pool *dockertest.Pool
90+
opts etcdClusterOptions
91+
92+
// namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const
93+
namePrefix string
94+
newClient func(config clientv3.Config) (memberClient, error)
95+
96+
// initialized by Setup
97+
address string
98+
resource *xdockertest.Resource
99+
etcdCli *clientv3.Client
100+
bridge *bridge.Bridge
101+
102+
stopped bool
103+
}
104+
105+
// Setup starts the docker container.
106+
func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) {
107+
if c.resource != nil {
108+
return errors.New("etcd cluster already started")
109+
}
110+
111+
// nolint:gosec
112+
id := rand.New(rand.NewSource(time.Now().UnixNano())).Int()
113+
114+
namePrefix := "m3-test-etcd-"
115+
if c.namePrefix != "" {
116+
// support overriding for tests
117+
namePrefix = c.namePrefix
118+
}
119+
120+
// Roughly, runs:
121+
// docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd
122+
// Port 2379 on the container is bound to a free port on the host
123+
resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{
124+
OverrideDefaults: false,
125+
// TODO: what even is this?
126+
Source: "etcd",
127+
ContainerName: fmt.Sprintf("%s%d", namePrefix, id),
128+
Image: etcdImage,
129+
Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"},
130+
InstrumentOpts: c.instrumentOpts,
131+
PortMappings: map[docker.Port][]docker.PortBinding{
132+
"2379/tcp": {{
133+
HostIP: "0.0.0.0",
134+
HostPort: strconv.Itoa(c.opts.port),
135+
}},
136+
},
137+
NoNetworkOverlay: true,
138+
})
139+
140+
if err != nil {
141+
return fmt.Errorf("starting etcd container: %w", err)
142+
}
143+
144+
container := resource.Resource().Container
145+
c.logger.Info("etcd container started",
146+
zap.String("containerID", container.ID),
147+
zap.Any("ports", container.NetworkSettings.Ports),
148+
// Uncomment if you need gory details about the container printed; equivalent of `docker inspect <id>
149+
// zap.Any("container", container),
150+
)
151+
// Extract the port on which we are listening.
152+
// This is coming from the equivalent of docker inspect <container_id>
153+
portBinds := container.NetworkSettings.Ports["2379/tcp"]
154+
155+
// If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine.
156+
// See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16
157+
ipAddr := "127.0.0.1"
158+
_, err = net.ResolveIPAddr("ip4", "host.docker.internal")
159+
if err == nil {
160+
c.logger.Info("Running tests within a docker container (e.g. for buildkite. " +
161+
"Using host.docker.internal to talk to etcd")
162+
ipAddr = "host.docker.internal"
163+
}
164+
165+
c.resource = resource
166+
c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort)
167+
168+
etcdCli, err := clientv3.New(
169+
clientv3.Config{
170+
Endpoints: []string{c.address},
171+
DialTimeout: 5 * time.Second,
172+
DialOptions: []grpc.DialOption{grpc.WithBlock()},
173+
Logger: c.logger,
174+
},
175+
)
176+
if err != nil {
177+
return fmt.Errorf("constructing etcd client: %w", err)
178+
}
179+
180+
defer func() {
181+
if err := etcdCli.Close(); err != nil {
182+
var merr xerrors.MultiError
183+
closeErr = merr.
184+
Add(closeErr).
185+
Add(fmt.Errorf("closing etcd client: %w", err)).
186+
FinalError()
187+
}
188+
}()
189+
190+
return c.waitForHealth(ctx, etcdCli)
191+
}
192+
193+
func (c *EtcdNode) containerHostPort() string {
194+
portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"]
195+
196+
return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort)
197+
}
198+
199+
func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error {
200+
retrier := retry.NewRetrier(retry.NewOptions().
201+
SetForever(true).
202+
SetMaxBackoff(5 * time.Second),
203+
)
204+
205+
var timeout time.Duration
206+
deadline, ok := ctx.Deadline()
207+
if ok {
208+
timeout = deadline.Sub(time.Now())
209+
}
210+
c.logger.Info(
211+
"Waiting for etcd to report healthy (via member list)",
212+
zap.String("timeout", timeout.String()),
213+
)
214+
err := retrier.AttemptContext(ctx, func() error {
215+
_, err := memberCli.MemberList(ctx)
216+
if err != nil {
217+
c.logger.Info(
218+
"Failed connecting to etcd while waiting for container to come up",
219+
zap.Error(err),
220+
zap.String("endpoints", c.address),
221+
)
222+
}
223+
return err
224+
})
225+
if err == nil {
226+
c.logger.Info("etcd is healthy")
227+
return nil
228+
}
229+
return fmt.Errorf("waiting for etcd to become healthy: %w", err)
230+
}
231+
232+
// Close stops the etcd node, and removes it.
233+
func (c *EtcdNode) Close(ctx context.Context) error {
234+
var err xerrors.MultiError
235+
err = err.
236+
Add(c.resource.Close())
237+
return err.FinalError()
238+
}
239+
240+
// Address is the host:port of the etcd node for use by etcd clients.
241+
func (c *EtcdNode) Address() string {
242+
return c.address
243+
}
244+
245+
// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart.
246+
func (c *EtcdNode) Stop(ctx context.Context) error {
247+
if c.stopped {
248+
return errors.New("etcd node is already stopped")
249+
}
250+
if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil {
251+
return err
252+
}
253+
c.stopped = true
254+
return nil
255+
}
256+
257+
// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then
258+
// started; else it will just be start.
259+
func (c *EtcdNode) Restart(ctx context.Context) error {
260+
if !c.stopped {
261+
c.logger.Info("Stopping etcd node")
262+
263+
if err := c.Stop(ctx); err != nil {
264+
return fmt.Errorf("stopping etcd node for Restart: %w", err)
265+
}
266+
}
267+
err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx)
268+
if err != nil {
269+
return fmt.Errorf("starting etcd node for Restart: %w", err)
270+
}
271+
c.stopped = false
272+
return nil
273+
}
274+
275+
var _ memberClient = (*clientv3.Client)(nil)
276+
277+
// memberClient exposes just one method of *clientv3.Client, for purposes of tests.
278+
type memberClient interface {
279+
MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
280+
}

0 commit comments

Comments
 (0)