Skip to content

Commit 3e161d2

Browse files
author
James Phillips
committed
Adds a direct TCP fallback ping piplined with the UDP indirect pings.
1 parent 534ba05 commit 3e161d2

File tree

4 files changed

+191
-11
lines changed

4 files changed

+191
-11
lines changed

net.go

+65-7
Original file line numberDiff line numberDiff line change
@@ -196,17 +196,19 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
196196
defer conn.Close()
197197
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
198198

199+
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
199200
msgType, bufConn, dec, err := m.readTCP(conn)
200201
if err != nil {
201202
m.logger.Printf("[ERR] memberlist: failed to receive: %s", err)
202203
return
203204
}
204205

205-
if msgType == userMsg {
206+
switch msgType {
207+
case userMsg:
206208
if err := m.readUserMsg(bufConn, dec); err != nil {
207209
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s", err)
208210
}
209-
} else if msgType == pushPullMsg {
211+
case pushPullMsg:
210212
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
211213
if err != nil {
212214
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s", err)
@@ -221,7 +223,31 @@ func (m *Memberlist) handleConn(conn *net.TCPConn) {
221223
if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
222224
return
223225
}
224-
} else {
226+
case pingMsg:
227+
var p ping
228+
if err := dec.Decode(&p); err != nil {
229+
m.logger.Printf("[ERR] memberlist: Failed to decode TCP ping: %s", err)
230+
return
231+
}
232+
233+
if p.Node != "" && p.Node != m.config.Name {
234+
m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s", p.Node)
235+
return
236+
}
237+
238+
ack := ackResp{p.SeqNo, nil}
239+
out, err := encode(ackRespMsg, &ack)
240+
if err != nil {
241+
m.logger.Printf("[ERR] memeberlist: Failed to encode TCP ack: %s", err)
242+
return
243+
}
244+
245+
err = m.rawSendMsgTCP(conn, out.Bytes())
246+
if err != nil {
247+
m.logger.Printf("[ERR] memberlist: Failed to send TCP ack: %s", err)
248+
return
249+
}
250+
default:
225251
m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d)", msgType)
226252
}
227253
}
@@ -654,6 +680,7 @@ func (m *Memberlist) sendAndReceiveState(addr []byte, port uint16, join bool) ([
654680
return nil, nil, err
655681
}
656682

683+
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
657684
msgType, bufConn, dec, err := m.readTCP(conn)
658685
if err != nil {
659686
return nil, nil, err
@@ -789,9 +816,6 @@ func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
789816
// readTCP is used to read the start of a TCP stream.
790817
// it decrypts and decompresses the stream if necessary
791818
func (m *Memberlist) readTCP(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
792-
// Setup a deadline
793-
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
794-
795819
// Created a buffered reader
796820
var bufConn io.Reader = bufio.NewReader(conn)
797821

@@ -934,7 +958,7 @@ func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, us
934958
return nil
935959
}
936960

937-
// readUserMsg is used to decode a UserMsg from a TCP stream
961+
// readUserMsg is used to decode a userMsg from a TCP stream
938962
func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
939963
// Read the user message header
940964
var header userMsgHeader
@@ -964,3 +988,37 @@ func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
964988

965989
return nil
966990
}
991+
992+
// sendPingAndWaitForAck uses the given TCP connection, sends a ping, and waits
993+
// for an ack. All of this is done in blocking fashion, and if the exchange is
994+
// good, a nil error will be returned to the caller.
995+
func (m *Memberlist) sendPingAndWaitForAck(conn net.Conn, ping ping) error {
996+
out, err := encode(pingMsg, &ping)
997+
if err != nil {
998+
return err
999+
}
1000+
1001+
if err = m.rawSendMsgTCP(conn, out.Bytes()); err != nil {
1002+
return err
1003+
}
1004+
1005+
msgType, _, dec, err := m.readTCP(conn)
1006+
if err != nil {
1007+
return err
1008+
}
1009+
1010+
if msgType != ackRespMsg {
1011+
return fmt.Errorf("unexpected msgType (%d) from TCP ping", msgType)
1012+
}
1013+
1014+
var ack ackResp
1015+
if err := dec.Decode(&ack); err != nil {
1016+
return err
1017+
}
1018+
1019+
if ack.SeqNo != ping.SeqNo {
1020+
return fmt.Errorf("sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
1021+
}
1022+
1023+
return nil
1024+
}

ping_delegate.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import "time"
44

55
// PingDelegate is used to notify an observer how long it took for a ping message to
66
// complete a round trip. It can also be used for writing arbitrary byte slices
7-
// into ack messages.
7+
// into ack messages. Note that in order to be meaningful for RTT estimates, this
8+
// delegate does not apply to indirect pings, nor fallback pings sent over TCP.
89
type PingDelegate interface {
910
// AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack
1011
AckPayload() []byte

state.go

+41
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func (m *Memberlist) probeNode(node *nodeState) {
216216
// Setup an ack handler
217217
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
218218
sent := time.Now()
219+
deadline := sent.Add(m.config.ProbeInterval)
219220
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
220221

221222
// Send the ping message
@@ -257,6 +258,37 @@ func (m *Memberlist) probeNode(node *nodeState) {
257258
}
258259
}
259260

261+
// Also make an attempt to contact the node directly over TCP. This
262+
// helps prevent confused clients who get isolated from UDP traffic
263+
// but can still speak TCP (which also means they can possibly report
264+
// misinformation to other nodes via anti-entropy), avoiding flapping in
265+
// the cluster.
266+
fallbackCh := make(chan bool)
267+
if node.PMax >= 3 {
268+
destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
269+
dialer := net.Dialer{Timeout: deadline.Sub(time.Now())}
270+
conn, err := dialer.Dial("tcp", destAddr.String())
271+
if err != nil {
272+
// This will fail if the node is actually dead, so we
273+
// shouldn't spam the logs with it.
274+
close(fallbackCh)
275+
} else {
276+
go func() {
277+
defer close(fallbackCh)
278+
defer conn.Close()
279+
280+
conn.SetDeadline(deadline)
281+
if err := m.sendPingAndWaitForAck(conn, ping); err != nil {
282+
m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err)
283+
} else {
284+
fallbackCh <- true
285+
}
286+
}()
287+
}
288+
} else {
289+
close(fallbackCh)
290+
}
291+
260292
// Wait for the acks or timeout
261293
select {
262294
case v := <-ackCh:
@@ -265,6 +297,15 @@ func (m *Memberlist) probeNode(node *nodeState) {
265297
}
266298
}
267299

300+
// Finally, poll the fallback channel - anything in there means that the
301+
// TCP fallback ping was successful. The timeouts are set such that the
302+
// channel will have something or be closed without having to wait any
303+
// additional time here.
304+
for _ = range fallbackCh {
305+
m.logger.Printf("memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name)
306+
return
307+
}
308+
268309
// No acks received from target, suspect
269310
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
270311
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}

state_test.go

+83-3
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,97 @@ func TestMemberList_ProbeNode_Suspect(t *testing.T) {
105105
}
106106
}
107107

108-
func TestMemberList_ProbeNode(t *testing.T) {
108+
func TestMemberList_ProbeNode_FallbackTCP(t *testing.T) {
109109
addr1 := getBindAddr()
110110
addr2 := getBindAddr()
111+
addr3 := getBindAddr()
112+
addr4 := getBindAddr()
111113
ip1 := []byte(addr1)
112114
ip2 := []byte(addr2)
115+
ip3 := []byte(addr3)
116+
ip4 := []byte(addr4)
113117

114118
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
115119
c.ProbeTimeout = time.Millisecond
116120
c.ProbeInterval = 10 * time.Millisecond
117121
})
118122
m2 := HostMemberlist(addr2.String(), t, nil)
123+
m3 := HostMemberlist(addr3.String(), t, nil)
124+
m4 := HostMemberlist(addr4.String(), t, nil)
125+
126+
a1 := alive{Node: addr1.String(), Addr: ip1, Port: 7946, Incarnation: 1}
127+
m1.aliveNode(&a1, nil, true)
128+
a2 := alive{Node: addr2.String(), Addr: ip2, Port: 7946, Incarnation: 1}
129+
m1.aliveNode(&a2, nil, false)
130+
a3 := alive{Node: addr3.String(), Addr: ip3, Port: 7946, Incarnation: 1}
131+
m1.aliveNode(&a3, nil, false)
132+
133+
// Make sure m4 is configured with the same protocol version as m1 so
134+
// the TCP fallback behavior is enabled.
135+
a4 := alive{
136+
Node: addr4.String(),
137+
Addr: ip4,
138+
Port: 7946,
139+
Incarnation: 1,
140+
Vsn: []uint8{
141+
ProtocolVersionMin, ProtocolVersionMax,
142+
m1.config.ProtocolVersion, m1.config.DelegateProtocolMin,
143+
m1.config.DelegateProtocolMax, m1.config.DelegateProtocolVersion,
144+
},
145+
}
146+
m1.aliveNode(&a4, nil, false)
147+
148+
// Isolate m4 from all inbound UDP traffic to force the TCP fallback
149+
// path to get executed.
150+
if err := m4.udpListener.Close(); err != nil {
151+
t.Fatalf("could not close UDP listener")
152+
}
153+
n := m1.nodeMap[addr4.String()]
154+
m1.probeNode(n)
155+
if n.State != stateAlive {
156+
t.Fatalf("expect node to be alive")
157+
}
158+
159+
// Confirm the peers attempted an indirect probe.
160+
time.Sleep(10 * time.Millisecond)
161+
if m2.sequenceNum != 1 {
162+
t.Fatalf("bad seqno %v", m2.sequenceNum)
163+
}
164+
if m3.sequenceNum != 1 {
165+
t.Fatalf("bad seqno %v", m3.sequenceNum)
166+
}
167+
168+
// Now shutdown all inbound TCP traffic to make sure the TCP fallback
169+
// path properly fails when the node is really unreachable.
170+
if err := m4.tcpListener.Close(); err != nil {
171+
t.Fatalf("could not close TCP listener")
172+
}
173+
m1.probeNode(n)
174+
if n.State != stateSuspect {
175+
t.Fatalf("expect node to be suspect")
176+
}
177+
178+
// Confirm the peers attempted an indirect probe.
179+
time.Sleep(10 * time.Millisecond)
180+
if m2.sequenceNum != 2 {
181+
t.Fatalf("bad seqno %v", m2.sequenceNum)
182+
}
183+
if m3.sequenceNum != 2 {
184+
t.Fatalf("bad seqno %v", m3.sequenceNum)
185+
}
186+
}
187+
188+
func TestMemberList_ProbeNode(t *testing.T) {
189+
addr1 := getBindAddr()
190+
addr2 := getBindAddr()
191+
ip1 := []byte(addr1)
192+
ip2 := []byte(addr2)
193+
194+
m1 := HostMemberlist(addr1.String(), t, func(c *Config) {
195+
c.ProbeTimeout = time.Millisecond
196+
c.ProbeInterval = 10 * time.Millisecond
197+
})
198+
_ = HostMemberlist(addr2.String(), t, nil)
119199

120200
a1 := alive{Node: addr1.String(), Addr: ip1, Port: 7946, Incarnation: 1}
121201
m1.aliveNode(&a1, nil, true)
@@ -125,14 +205,14 @@ func TestMemberList_ProbeNode(t *testing.T) {
125205
n := m1.nodeMap[addr2.String()]
126206
m1.probeNode(n)
127207

128-
// Should be marked suspect
208+
// Should be marked alive
129209
if n.State != stateAlive {
130210
t.Fatalf("Expect node to be alive")
131211
}
132212

133213
// Should increment seqno
134214
if m1.sequenceNum != 1 {
135-
t.Fatalf("bad seqno %v", m2.sequenceNum)
215+
t.Fatalf("bad seqno %v", m1.sequenceNum)
136216
}
137217
}
138218

0 commit comments

Comments
 (0)