Skip to content

Commit 064dd25

Browse files
committed
Change part4 to send Command to subscriptions in service
Similar to a change in part5
1 parent cc6b010 commit 064dd25

File tree

1 file changed

+20
-41
lines changed

1 file changed

+20
-41
lines changed

part4kv/kvservice/kvservice.go

Lines changed: 20 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type KVService struct {
3535

3636
// commitSubs are the commit subscriptions currently active in this service.
3737
// See the createCommitSubscription method for more details.
38-
commitSubs map[int]chan raft.CommitEntry
38+
commitSubs map[int]chan Command
3939

4040
// ds is the underlying data store implementing the KV DB.
4141
ds *DataStore
@@ -69,7 +69,7 @@ func New(id int, peerIds []int, storage raft.Storage, readyChan <-chan any) *KVS
6969
rs: rs,
7070
commitChan: commitChan,
7171
ds: NewDataStore(),
72-
commitSubs: make(map[int]chan raft.CommitEntry),
72+
commitSubs: make(map[int]chan Command),
7373
httpResponsesEnabled: true,
7474
}
7575

@@ -178,16 +178,15 @@ func (kvs *KVService) handlePut(w http.ResponseWriter, req *http.Request) {
178178
// also select on the request context - if the request is canceled, this
179179
// handler aborts without sending data back to the client.
180180
select {
181-
case entry := <-sub:
181+
case commitCmd := <-sub:
182182
// If this is our command, all is good! If it's some other server's command,
183183
// this means we lost leadership at some point and should return an error
184184
// to the client.
185-
entryCmd := entry.Command.(Command)
186-
if entryCmd.Id == kvs.id {
185+
if commitCmd.Id == kvs.id {
187186
kvs.sendHTTPResponse(w, api.PutResponse{
188187
RespStatus: api.StatusOK,
189-
KeyFound: entryCmd.ResultFound,
190-
PrevValue: entryCmd.ResultValue,
188+
KeyFound: commitCmd.ResultFound,
189+
PrevValue: commitCmd.ResultValue,
191190
})
192191
} else {
193192
kvs.sendHTTPResponse(w, api.PutResponse{RespStatus: api.StatusFailedCommit})
@@ -197,9 +196,9 @@ func (kvs *KVService) handlePut(w http.ResponseWriter, req *http.Request) {
197196
}
198197
}
199198

199+
// The details of these handlers are very similar to handlePut: refer to that
200+
// function for detailed comments.
200201
func (kvs *KVService) handleGet(w http.ResponseWriter, req *http.Request) {
201-
// The details of this handler are very similar to handleGet: refer to that
202-
// function for detailed comments.
203202
gr := &api.GetRequest{}
204203
if err := readRequestJSON(req, gr); err != nil {
205204
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -213,31 +212,20 @@ func (kvs *KVService) handleGet(w http.ResponseWriter, req *http.Request) {
213212
Id: kvs.id,
214213
}
215214
logIndex := kvs.rs.Submit(cmd)
216-
// If we're not the Raft leader, send an appropriate status
217215
if logIndex < 0 {
218216
kvs.sendHTTPResponse(w, api.GetResponse{RespStatus: api.StatusNotLeader})
219217
return
220218
}
221219

222-
// Subsribe for a commit update for our log index. Then wait for it to
223-
// be delivered.
224220
sub := kvs.createCommitSubscription(logIndex)
225221

226-
// Wait on the sub channel: the updater will deliver a value when the Raft
227-
// log has a commit at logIndex. To ensure clean shutdown of the service,
228-
// also select on the request context - if the request is canceled, this
229-
// handler aborts without sending data back to the client.
230222
select {
231-
case entry := <-sub:
232-
// If this is our command, all is good! If it's some other server's command,
233-
// this means we lost leadership at some point and should return an error
234-
// to the client.
235-
entryCmd := entry.Command.(Command)
236-
if entryCmd.Id == kvs.id {
223+
case commitCmd := <-sub:
224+
if commitCmd.Id == kvs.id {
237225
kvs.sendHTTPResponse(w, api.GetResponse{
238226
RespStatus: api.StatusOK,
239-
KeyFound: entryCmd.ResultFound,
240-
Value: entryCmd.ResultValue,
227+
KeyFound: commitCmd.ResultFound,
228+
Value: commitCmd.ResultValue,
241229
})
242230
} else {
243231
kvs.sendHTTPResponse(w, api.GetResponse{RespStatus: api.StatusFailedCommit})
@@ -271,13 +259,12 @@ func (kvs *KVService) handleCAS(w http.ResponseWriter, req *http.Request) {
271259
sub := kvs.createCommitSubscription(logIndex)
272260

273261
select {
274-
case entry := <-sub:
275-
entryCmd := entry.Command.(Command)
276-
if entryCmd.Id == kvs.id {
262+
case commitCmd := <-sub:
263+
if commitCmd.Id == kvs.id {
277264
kvs.sendHTTPResponse(w, api.CASResponse{
278265
RespStatus: api.StatusOK,
279-
KeyFound: entryCmd.ResultFound,
280-
PrevValue: entryCmd.ResultValue,
266+
KeyFound: commitCmd.ResultFound,
267+
PrevValue: commitCmd.ResultValue,
281268
})
282269
} else {
283270
kvs.sendHTTPResponse(w, api.CASResponse{RespStatus: api.StatusFailedCommit})
@@ -307,18 +294,10 @@ func (kvs *KVService) runUpdater() {
307294
panic(fmt.Errorf("unexpected command %v", cmd))
308295
}
309296

310-
// We're modifying the command to include results from the datastore,
311-
// so clone an entry with the update command for the subscribers.
312-
newEntry := raft.CommitEntry{
313-
Command: cmd,
314-
Index: entry.Index,
315-
Term: entry.Term,
316-
}
317-
318297
// Forward this entry to the subscriber interested in its index, and
319298
// close the subscription - it's single-use.
320299
if sub := kvs.popCommitSubscription(entry.Index); sub != nil {
321-
sub <- newEntry
300+
sub <- cmd
322301
close(sub)
323302
}
324303
}
@@ -331,20 +310,20 @@ func (kvs *KVService) runUpdater() {
331310
// an entry is committed at this index in the Raft log". The entry is delivered
332311
// on the returend (buffered) channel by the updater goroutine, after which
333312
// the channel is closed and the subscription is automatically canceled.
334-
func (kvs *KVService) createCommitSubscription(logIndex int) chan raft.CommitEntry {
313+
func (kvs *KVService) createCommitSubscription(logIndex int) chan Command {
335314
kvs.Lock()
336315
defer kvs.Unlock()
337316

338317
if _, exists := kvs.commitSubs[logIndex]; exists {
339318
panic(fmt.Sprintf("duplicate commit subscription for logIndex=%d", logIndex))
340319
}
341320

342-
ch := make(chan raft.CommitEntry, 1)
321+
ch := make(chan Command, 1)
343322
kvs.commitSubs[logIndex] = ch
344323
return ch
345324
}
346325

347-
func (kvs *KVService) popCommitSubscription(logIndex int) chan raft.CommitEntry {
326+
func (kvs *KVService) popCommitSubscription(logIndex int) chan Command {
348327
kvs.Lock()
349328
defer kvs.Unlock()
350329

0 commit comments

Comments
 (0)