-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfiguration.go
363 lines (330 loc) · 11.8 KB
/
configuration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
package raft
import "fmt"
// ServerSuffrage determines whether a Server in a Configuration gets a vote.
type ServerSuffrage int
// Note: Don't renumber these, since the numbers are written into the log.
const (
// Voter is a server whose vote is counted in elections and whose match index
// is used in advancing the leader's commit index.
Voter ServerSuffrage = iota
// Nonvoter is a server that receives log entries but is not considered for
// elections or commitment purposes.
Nonvoter
// Staging is a server that acts like a nonvoter with one exception: once a
// staging server receives enough log entries to be sufficiently caught up to
// the leader's log, the leader will invoke a membership change to change
// the Staging server to a Voter.
Staging
)
func (s ServerSuffrage) String() string {
switch s {
case Voter:
return "Voter"
case Nonvoter:
return "Nonvoter"
case Staging:
return "Staging"
}
return "ServerSuffrage"
}
// ConfigurationStore provides an interface that can optionally be implemented by FSMs
// to store configuration updates made in the replicated log. In general this is only
// necessary for FSMs that mutate durable state directly instead of applying changes
// in memory and snapshotting periodically. By storing configuration changes, the
// persistent FSM state can behave as a complete snapshot, and be able to recover
// without an external snapshot just for persisting the raft configuration.
type ConfigurationStore interface {
// ConfigurationStore is a superset of the FSM functionality
FSM
// StoreConfiguration is invoked once a log entry containing a configuration
// change is committed. It takes the index at which the configuration was
// written and the configuration value.
StoreConfiguration(index uint64, configuration Configuration)
}
type nopConfigurationStore struct{}
func (s nopConfigurationStore) StoreConfiguration(_ uint64, _ Configuration) {}
// ServerID is a unique string identifying a server for all time.
type ServerID string
// ServerAddress is a network address for a server that a transport can contact.
type ServerAddress string
// Server tracks the information about a single server in a configuration.
type Server struct {
// Suffrage determines whether the server gets a vote.
Suffrage ServerSuffrage
// ID is a unique string identifying this server for all time.
ID ServerID
// Address is its network address that a transport can contact.
Address ServerAddress
}
// Configuration tracks which servers are in the cluster, and whether they have
// votes. This should include the local server, if it's a member of the cluster.
// The servers are listed no particular order, but each should only appear once.
// These entries are appended to the log during membership changes.
type Configuration struct {
Servers []Server
}
// Clone makes a deep copy of a Configuration.
func (c *Configuration) Clone() (copy Configuration) {
copy.Servers = append(copy.Servers, c.Servers...)
return
}
// ConfigurationChangeCommand is the different ways to change the cluster
// configuration.
type ConfigurationChangeCommand uint8
const (
// AddStaging makes a server Staging unless its Voter.
AddStaging ConfigurationChangeCommand = iota
// AddNonvoter makes a server Nonvoter unless its Staging or Voter.
AddNonvoter
// DemoteVoter makes a server Nonvoter unless its absent.
DemoteVoter
// RemoveServer removes a server entirely from the cluster membership.
RemoveServer
// Promote is created automatically by a leader; it turns a Staging server
// into a Voter.
Promote
)
func (c ConfigurationChangeCommand) String() string {
switch c {
case AddStaging:
return "AddStaging"
case AddNonvoter:
return "AddNonvoter"
case DemoteVoter:
return "DemoteVoter"
case RemoveServer:
return "RemoveServer"
case Promote:
return "Promote"
}
return "ConfigurationChangeCommand"
}
// configurationChangeRequest describes a change that a leader would like to
// make to its current configuration. It's used only within a single server
// (never serialized into the log), as part of `configurationChangeFuture`.
type configurationChangeRequest struct {
command ConfigurationChangeCommand
serverID ServerID
serverAddress ServerAddress // only present for AddStaging, AddNonvoter
// prevIndex, if nonzero, is the index of the only configuration upon which
// this change may be applied; if another configuration entry has been
// added in the meantime, this request will fail.
prevIndex uint64
}
// configurations is state tracked on every server about its Configurations.
// Note that, per Diego's dissertation, there can be at most one uncommitted
// configuration at a time (the next configuration may not be created until the
// prior one has been committed).
//
// One downside to storing just two configurations is that if you try to take a
// snapshot when your state machine hasn't yet applied the committedIndex, we
// have no record of the configuration that would logically fit into that
// snapshot. We disallow snapshots in that case now. An alternative approach,
// which LogCabin uses, is to track every configuration change in the
// log.
type configurations struct {
// committed is the latest configuration in the log/snapshot that has been
// committed (the one with the largest index).
committed Configuration
// committedIndex is the log index where 'committed' was written.
committedIndex uint64
// latest is the latest configuration in the log/snapshot (may be committed
// or uncommitted)
latest Configuration
// latestIndex is the log index where 'latest' was written.
latestIndex uint64
}
// Clone makes a deep copy of a configurations object.
func (c *configurations) Clone() (copy configurations) {
copy.committed = c.committed.Clone()
copy.committedIndex = c.committedIndex
copy.latest = c.latest.Clone()
copy.latestIndex = c.latestIndex
return
}
// hasVote returns true if the server identified by 'id' is a Voter in the
// provided Configuration.
func hasVote(configuration Configuration, id ServerID) bool {
for _, server := range configuration.Servers {
if server.ID == id {
return server.Suffrage == Voter
}
}
return false
}
// checkConfiguration tests a cluster membership configuration for common
// errors.
func checkConfiguration(configuration Configuration) error {
idSet := make(map[ServerID]bool)
addressSet := make(map[ServerAddress]bool)
var voters int
for _, server := range configuration.Servers {
if server.ID == "" {
return fmt.Errorf("Empty ID in configuration: %v", configuration)
}
if server.Address == "" {
return fmt.Errorf("Empty address in configuration: %v", server)
}
if idSet[server.ID] {
return fmt.Errorf("Found duplicate ID in configuration: %v", server.ID)
}
idSet[server.ID] = true
if addressSet[server.Address] {
return fmt.Errorf("Found duplicate address in configuration: %v", server.Address)
}
addressSet[server.Address] = true
if server.Suffrage == Voter {
voters++
}
}
if voters == 0 {
return fmt.Errorf("Need at least one voter in configuration: %v", configuration)
}
return nil
}
// nextConfiguration generates a new Configuration from the current one and a
// configuration change request. It's split from appendConfigurationEntry so
// that it can be unit tested easily.
func nextConfiguration(current Configuration, currentIndex uint64, change configurationChangeRequest) (Configuration, error) {
if change.prevIndex > 0 && change.prevIndex != currentIndex {
return Configuration{}, fmt.Errorf("Configuration changed since %v (latest is %v)", change.prevIndex, currentIndex)
}
configuration := current.Clone()
switch change.command {
case AddStaging:
// TODO: barf on new address?
newServer := Server{
// TODO: This should add the server as Staging, to be automatically
// promoted to Voter later. However, the promotion to Voter is not yet
// implemented, and doing so is not trivial with the way the leader loop
// coordinates with the replication goroutines today. So, for now, the
// server will have a vote right away, and the Promote case below is
// unused.
Suffrage: Voter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage == Voter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case AddNonvoter:
newServer := Server{
Suffrage: Nonvoter,
ID: change.serverID,
Address: change.serverAddress,
}
found := false
for i, server := range configuration.Servers {
if server.ID == change.serverID {
if server.Suffrage != Nonvoter {
configuration.Servers[i].Address = change.serverAddress
} else {
configuration.Servers[i] = newServer
}
found = true
break
}
}
if !found {
configuration.Servers = append(configuration.Servers, newServer)
}
case DemoteVoter:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers[i].Suffrage = Nonvoter
break
}
}
case RemoveServer:
for i, server := range configuration.Servers {
if server.ID == change.serverID {
configuration.Servers = append(configuration.Servers[:i], configuration.Servers[i+1:]...)
break
}
}
case Promote:
for i, server := range configuration.Servers {
if server.ID == change.serverID && server.Suffrage == Staging {
configuration.Servers[i].Suffrage = Voter
break
}
}
}
// Make sure we didn't do something bad like remove the last voter
if err := checkConfiguration(configuration); err != nil {
return Configuration{}, err
}
return configuration, nil
}
// encodePeers is used to serialize a Configuration into the old peers format.
// This is here for backwards compatibility when operating with a mix of old
// servers and should be removed once we deprecate support for protocol version 1.
func encodePeers(configuration Configuration, trans Transport) []byte {
// Gather up all the voters, other suffrage types are not supported by
// this data format.
var encPeers [][]byte
for _, server := range configuration.Servers {
if server.Suffrage == Voter {
encPeers = append(encPeers, trans.EncodePeer(server.ID, server.Address))
}
}
// Encode the entire array.
buf, err := encodeMsgPack(encPeers)
if err != nil {
panic(fmt.Errorf("failed to encode peers: %v", err))
}
return buf.Bytes()
}
// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
// Decode the buffer first.
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
}
// Deserialize each peer.
var servers []Server
for _, enc := range encPeers {
p := trans.DecodePeer(enc)
servers = append(servers, Server{
Suffrage: Voter,
ID: ServerID(p),
Address: ServerAddress(p),
})
}
return Configuration{
Servers: servers,
}
}
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on
// errors.
func EncodeConfiguration(configuration Configuration) []byte {
buf, err := encodeMsgPack(configuration)
if err != nil {
panic(fmt.Errorf("failed to encode configuration: %v", err))
}
return buf.Bytes()
}
// DecodeConfiguration deserializes a Configuration using MsgPack, or panics on
// errors.
func DecodeConfiguration(buf []byte) Configuration {
var configuration Configuration
if err := decodeMsgPack(buf, &configuration); err != nil {
panic(fmt.Errorf("failed to decode configuration: %v", err))
}
return configuration
}