-
Notifications
You must be signed in to change notification settings - Fork 5
/
node.go
321 lines (289 loc) · 8.6 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package easyraft
import (
"errors"
"fmt"
"github.com/Jille/raft-grpc-transport"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/ksrichard/easyraft/discovery"
"github.com/ksrichard/easyraft/fsm"
"github.com/ksrichard/easyraft/grpc"
"github.com/ksrichard/easyraft/serializer"
"github.com/ksrichard/easyraft/util"
"github.com/zemirco/uid"
ggrpc "google.golang.org/grpc"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"sync/atomic"
"syscall"
"time"
)
type Node struct {
ID string
RaftPort int
DiscoveryPort int
address string
dataDir string
Raft *raft.Raft
GrpcServer *ggrpc.Server
DiscoveryMethod discovery.DiscoveryMethod
TransportManager *transport.Manager
Serializer serializer.Serializer
mList *memberlist.Memberlist
discoveryConfig *memberlist.Config
stopped *uint32
logger *log.Logger
stoppedCh chan interface{}
snapshotEnabled bool
}
// NewNode returns an EasyRaft node
func NewNode(raftPort, discoveryPort int, dataDir string, services []fsm.FSMService, serializer serializer.Serializer, discoveryMethod discovery.DiscoveryMethod, snapshotEnabled bool) (*Node, error) {
// default raft config
addr := fmt.Sprintf("%s:%d", "0.0.0.0", raftPort)
nodeId := uid.New(50)
raftConf := raft.DefaultConfig()
raftConf.LocalID = raft.ServerID(nodeId)
raftLogCacheSize := 512
raftConf.LogLevel = "Info"
// stable/log/snapshot store config
if !util.IsDir(dataDir) {
err := util.RemoveCreateDir(dataDir)
if err != nil {
return nil, err
}
}
stableStoreFile := filepath.Join(dataDir, "store.boltdb")
if util.FileExists(stableStoreFile) {
err := os.Remove(stableStoreFile)
if err != nil {
return nil, err
}
}
stableStore, err := raftboltdb.NewBoltStore(stableStoreFile)
if err != nil {
return nil, err
}
logStore, err := raft.NewLogCache(raftLogCacheSize, stableStore)
if err != nil {
return nil, err
}
var snapshotStore raft.SnapshotStore
if !snapshotEnabled {
snapshotStore = raft.NewDiscardSnapshotStore()
} else {
// TODO: implement: snapshotStore = NewLogsOnlySnapshotStore(serializer)
return nil, errors.New("snapshots are not supported at the moment")
}
// grpc transport
grpcTransport := transport.New(raft.ServerAddress(addr), []ggrpc.DialOption{ggrpc.WithInsecure()})
// init FSM
sm := fsm.NewRoutingFSM(services)
sm.Init(serializer)
// memberlist config
mlConfig := memberlist.DefaultWANConfig()
mlConfig.BindPort = discoveryPort
mlConfig.Name = fmt.Sprintf("%s:%d", nodeId, raftPort)
// raft server
raftServer, err := raft.NewRaft(raftConf, sm, logStore, stableStore, snapshotStore, grpcTransport.Transport())
if err != nil {
return nil, err
}
// logging
logger := log.Default()
logger.SetPrefix("[EasyRaft] ")
// initial stopped flag
var stopped uint32
return &Node{
ID: nodeId,
RaftPort: raftPort,
address: addr,
dataDir: dataDir,
Raft: raftServer,
TransportManager: grpcTransport,
Serializer: serializer,
DiscoveryPort: discoveryPort,
DiscoveryMethod: discoveryMethod,
discoveryConfig: mlConfig,
logger: logger,
stopped: &stopped,
snapshotEnabled: snapshotEnabled,
}, nil
}
// Start starts the Node and returns a channel that indicates, that the node has been stopped properly
func (n *Node) Start() (chan interface{}, error) {
n.logger.Println("Starting Node...")
// set stopped as false
if atomic.LoadUint32(n.stopped) == 1 {
atomic.StoreUint32(n.stopped, 0)
}
// raft server
configuration := raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(n.ID),
Address: n.TransportManager.Transport().LocalAddr(),
},
},
}
f := n.Raft.BootstrapCluster(configuration)
err := f.Error()
if err != nil {
return nil, err
}
// memberlist discovery
n.discoveryConfig.Events = n
list, err := memberlist.Create(n.discoveryConfig)
if err != nil {
return nil, err
}
n.mList = list
// grpc server
grpcListen, err := net.Listen("tcp", n.address)
if err != nil {
log.Fatal(err)
}
grpcServer := ggrpc.NewServer()
n.GrpcServer = grpcServer
// register management services
n.TransportManager.Register(grpcServer)
// register client services
clientGrpcServer := NewClientGrpcService(n)
grpc.RegisterRaftServer(grpcServer, clientGrpcServer)
// discovery method
discoveryChan, err := n.DiscoveryMethod.Start(n.ID, n.RaftPort)
if err != nil {
return nil, err
}
go n.handleDiscoveredNodes(discoveryChan)
// serve grpc
go func() {
if err := grpcServer.Serve(grpcListen); err != nil {
n.logger.Fatal(err)
}
}()
// handle interruption
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGABRT, syscall.SIGKILL)
go func() {
_ = <-sigs
n.Stop()
}()
n.logger.Printf("Node started on port %d and discovery port %d\n", n.RaftPort, n.DiscoveryPort)
n.stoppedCh = make(chan interface{})
return n.stoppedCh, nil
}
// Stop stops the node and notifies on stopped channel returned in Start
func (n *Node) Stop() {
if atomic.LoadUint32(n.stopped) == 0 {
atomic.StoreUint32(n.stopped, 1)
if n.snapshotEnabled {
n.logger.Println("Creating snapshot...")
err := n.Raft.Snapshot().Error()
if err != nil {
n.logger.Println("Failed to create snapshot!")
}
}
n.logger.Println("Stopping Node...")
n.DiscoveryMethod.Stop()
err := n.mList.Leave(10 * time.Second)
if err != nil {
n.logger.Printf("Failed to leave from discovery: %q\n", err.Error())
}
err = n.mList.Shutdown()
if err != nil {
n.logger.Printf("Failed to shutdown discovery: %q\n", err.Error())
}
n.logger.Println("Discovery stopped")
err = n.Raft.Shutdown().Error()
if err != nil {
n.logger.Printf("Failed to shutdown Raft: %q\n", err.Error())
}
n.logger.Println("Raft stopped")
n.GrpcServer.GracefulStop()
n.logger.Println("Raft Server stopped")
n.logger.Println("Node Stopped!")
n.stoppedCh <- true
}
}
// handleDiscoveredNodes handles the discovered Node additions
func (n *Node) handleDiscoveredNodes(discoveryChan chan string) {
for peer := range discoveryChan {
detailsResp, err := GetPeerDetails(peer)
if err == nil {
serverId := detailsResp.ServerId
needToAddNode := true
for _, server := range n.Raft.GetConfiguration().Configuration().Servers {
if server.ID == raft.ServerID(serverId) || string(server.Address) == peer {
needToAddNode = false
break
}
}
if needToAddNode {
peerHost := strings.Split(peer, ":")[0]
peerDiscoveryAddr := fmt.Sprintf("%s:%d", peerHost, detailsResp.DiscoveryPort)
_, err = n.mList.Join([]string{peerDiscoveryAddr})
if err != nil {
log.Printf("failed to join to cluster using discovery address: %s\n", peerDiscoveryAddr)
}
}
}
}
}
// NotifyJoin triggered when a new Node has been joined to the cluster (discovery only)
// and capable of joining the Node to the raft cluster
func (n *Node) NotifyJoin(node *memberlist.Node) {
nameParts := strings.Split(node.Name, ":")
nodeId, nodePort := nameParts[0], nameParts[1]
nodeAddr := fmt.Sprintf("%s:%s", node.Addr, nodePort)
if err := n.Raft.VerifyLeader().Error(); err == nil {
result := n.Raft.AddVoter(raft.ServerID(nodeId), raft.ServerAddress(nodeAddr), 0, 0)
if result.Error() != nil {
log.Println(result.Error().Error())
}
}
}
// NotifyLeave triggered when a Node becomes unavailable after a period of time
// it will remove the unavailable Node from the Raft cluster
func (n *Node) NotifyLeave(node *memberlist.Node) {
if n.DiscoveryMethod.SupportsNodeAutoRemoval() {
nodeId := strings.Split(node.Name, ":")[0]
if err := n.Raft.VerifyLeader().Error(); err == nil {
result := n.Raft.RemoveServer(raft.ServerID(nodeId), 0, 0)
if result.Error() != nil {
log.Println(result.Error().Error())
}
}
}
}
func (n *Node) NotifyUpdate(_ *memberlist.Node) {
}
// RaftApply is used to apply any new logs to the raft cluster
// this method does automatic forwarding to Leader Node
func (n *Node) RaftApply(request interface{}, timeout time.Duration) (interface{}, error) {
payload, err := n.Serializer.Serialize(request)
if err != nil {
return nil, err
}
if err := n.Raft.VerifyLeader().Error(); err == nil {
result := n.Raft.Apply(payload, timeout)
if result.Error() != nil {
return nil, result.Error()
}
switch result.Response().(type) {
case error:
return nil, result.Response().(error)
default:
return result.Response(), nil
}
}
response, err := ApplyOnLeader(n, payload)
if err != nil {
return nil, err
}
return response, nil
}