-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtransport.go
263 lines (247 loc) · 6.38 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package flotilla
import (
"bytes"
"github.com/hashicorp/go-msgpack/codec"
"github.com/jbooth/raft"
"log"
"net"
"sync"
"time"
)
// flotilla client connection
type connToLeader struct {
c net.Conn
e *codec.Encoder
d *codec.Decoder
l *sync.Mutex
lg *log.Logger
pending chan *commandCallback
}
// joins the raft leader and sets up infrastructure for
// processing commands
// can return ErrNotLeader
func newConnToLeader(conn net.Conn, advertiseAddr string, lg *log.Logger) (*connToLeader, error) {
// send join command
h := &codec.MsgpackHandle{}
ret := &connToLeader{
c: conn,
e: codec.NewEncoder(conn, h),
d: codec.NewDecoder(conn, h),
l: new(sync.Mutex),
lg: lg,
pending: make(chan *commandCallback, 64),
}
join := &joinReq{
PeerAddr: advertiseAddr,
}
err := ret.e.Encode(join)
if err != nil {
ret.c.Close()
return nil, err
}
joinResp := &joinResp{}
err = ret.d.Decode(joinResp)
if err != nil {
ret.lg.Printf("Error connecting to leader at %s : %s", conn.RemoteAddr().String(), err)
ret.c.Close()
return nil, err
}
go ret.readResponses()
return ret, nil
}
// sends the command for remote execution.
// returns error if we couldn't communicate with leader
func (c *connToLeader) forwardCommand(cb *commandCallback, cmdName string, args [][]byte) error {
c.l.Lock()
defer c.l.Unlock()
// marshal log object
lg := logForCommand(cb.originAddr, cb.reqNo, cmdName, args)
// put response chan in pending
// send
err := c.e.Encode(lg)
if err != nil {
cb.cancel()
cb.result <- Result{nil, err}
return err
} else {
// so our responseReader will forward appropriately
c.pending <- cb
}
return nil
}
func (c *connToLeader) remoteAddr() net.Addr {
return c.c.RemoteAddr()
}
func (c *connToLeader) readResponses() {
resp := &commandResp{}
for cb := range c.pending {
err := c.d.Decode(resp)
if err != nil {
cb.cancel()
cb.result <- Result{nil, err}
c.lg.Printf("Error reading response: %s, closing and giving err to all pending requests", err)
c.c.Close()
for {
select {
case cb1 := <-c.pending:
cb1.cancel()
cb1.result <- Result{nil, err}
default:
return
}
}
}
}
c.lg.Printf("Closing leaderConn to %s", c.c.RemoteAddr().String())
c.c.Close()
return
}
func bytesForCommand(host string, reqno uint64, cmdName string, args [][]byte) []byte {
cmd := &commandReq{}
cmd.Args = args
cmd.Cmd = cmdName
// no callback
cmd.OriginAddr = host
cmd.Reqno = reqno
b, err := encodeMsgPack(cmd)
if err != nil {
panic(err)
}
return b.Bytes()
}
func logForCommand(host string, reqno uint64, cmdName string, args [][]byte) *raft.Log {
ret := &raft.Log{
Index: 0,
Term: 0,
Type: raft.LogCommand,
Data: bytesForCommand(host, reqno, cmdName, args),
}
return ret
}
// serves a follower from a leader server
// we tell all servers to go elsewhere if we are not leader
func serveFollower(lg *log.Logger, follower net.Conn, leader *server) {
ch := &codec.MsgpackHandle{}
decode := codec.NewDecoder(follower, ch)
encode := codec.NewEncoder(follower, ch)
jReq := &joinReq{}
jResp := &joinResp{}
err := decode.Decode(jReq)
if err != nil {
lg.Printf("Error serving follower at %s : %s", follower.RemoteAddr(), err)
return
}
// register with leader
isLeader := true
if leader.IsLeader() {
lf := leader.raft.VerifyLeader()
err := lf.Error()
if err != nil {
lg.Printf("Error while verifying leader on host %s : %s", leader.rpcLayer.Addr().String(), err)
isLeader = false
}
peerAddr, err := net.ResolveTCPAddr("tcp", jReq.PeerAddr)
if err != nil {
lg.Printf("Couldn't resolve pathname %s processing join from %s", jReq.PeerAddr, follower.RemoteAddr().String())
follower.Close()
return
}
addFuture := leader.raft.AddPeer(peerAddr)
err = addFuture.Error()
if err == raft.ErrKnownPeer {
lg.Printf("Tried to add already existing peer %s, continuing", peerAddr)
}
if err != nil && err != raft.ErrKnownPeer {
lg.Printf("Error adding peer %s : %s, terminating conn", peerAddr, err)
follower.Close()
return
}
} else {
isLeader = false
}
if !isLeader {
// send response indicating leader is someone else, then return
lg.Printf("Node %s not leader, refusing connection to peer %s", leader.rpcLayer.Addr().String(), jReq.PeerAddr)
leaderAddr := leader.raft.Leader()
if leaderAddr != nil {
jResp.LeaderHost = leaderAddr.String()
}
encode.Encode(jResp)
follower.Close()
return
}
// send join resp
err = encode.Encode(jResp)
if err != nil {
lg.Printf("Error sending joinResp : %s", err)
follower.Close()
return
}
// read commands
err = nil
futures := make(chan raft.ApplyFuture, 16)
defer func() {
// die
follower.Close()
close(futures)
}()
go sendResponses(futures, lg, encode, follower)
for {
cmdReq := &raft.Log{}
err = decode.Decode(cmdReq)
if err != nil {
lg.Printf("Error reading command from node %s : '%s', closing conn", follower.RemoteAddr().String(), err.Error())
follower.Close()
return
}
// exec with leader
future := leader.raft.Apply(cmdReq.Data, 1*time.Minute)
futures <- future
}
}
// runs alongside serveFollower to send actual responses
func sendResponses(futures chan raft.ApplyFuture, lg *log.Logger, e *codec.Encoder, conn net.Conn) {
resp := &commandResp{}
for f := range futures {
err := f.Error()
resp.Err = err
err = e.Encode(resp)
if err != nil {
lg.Printf("Error writing response %s to host %s : %s", resp, conn.RemoteAddr().String(), err)
conn.Close()
return
}
}
}
type joinReq struct {
PeerAddr string
}
type joinResp struct {
LeaderHost string // "" is success, "addr:port" of leader if we're not leader
}
type commandReq struct {
Reqno uint64
OriginAddr string
Cmd string
Args [][]byte
}
// commandResp has an error if the leader had a non-command-caused error
// while applying the command. can be ErrNotLeader.
type commandResp struct {
Err error
}
// Decode reverses the encode operation on a byte slice input
func decodeMsgPack(buf []byte, out interface{}) error {
r := bytes.NewBuffer(buf)
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(r, &hd)
return dec.Decode(out)
}
// Encode writes an encoded object to a new bytes buffer
func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
buf := bytes.NewBuffer(nil)
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(buf, &hd)
err := enc.Encode(in)
return buf, err
}