Skip to content

Commit b4c3d39

Browse files
stevenlandersudpatil
authored andcommitted
[OCC] Fix hang where abort channel blocks iterator (#379)
## Describe your changes and provide context - instead of assuming one thing will arrive to the abort channel, drain it ## Testing performed to validate your change - new unit test captures situation (tests iterator)
1 parent e879ae1 commit b4c3d39

File tree

2 files changed

+139
-30
lines changed

2 files changed

+139
-30
lines changed

tasks/scheduler.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,6 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
388388
defer eSpan.End()
389389
task.Ctx = eCtx
390390

391-
s.prepareTask(task)
392391
s.executeTask(task)
393392

394393
s.DoValidate(func() {
@@ -441,27 +440,58 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
441440
task.Ctx = ctx
442441
}
443442

444-
// executeTask executes a single task
445443
func (s *scheduler) executeTask(task *deliverTxTask) {
446-
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerDeliverTx", task)
444+
dCtx, dSpan := s.traceSpan(task.Ctx, "SchedulerExecuteTask", task)
447445
defer dSpan.End()
448446
task.Ctx = dCtx
449447

450-
resp := s.deliverTx(task.Ctx, task.Request)
448+
s.prepareTask(task)
449+
450+
// Channel to signal the completion of deliverTx
451+
doneCh := make(chan types.ResponseDeliverTx)
452+
453+
// Run deliverTx in a separate goroutine
454+
go func() {
455+
doneCh <- s.deliverTx(task.Ctx, task.Request)
456+
}()
457+
458+
// Flag to mark if abort has happened
459+
var abortOccurred bool
460+
461+
var wg sync.WaitGroup
462+
wg.Add(1)
463+
464+
var abort *occ.Abort
465+
// Drain the AbortCh in a non-blocking way
466+
go func() {
467+
defer wg.Done()
468+
for abt := range task.AbortCh {
469+
if !abortOccurred {
470+
abortOccurred = true
471+
abort = &abt
472+
}
473+
}
474+
}()
475+
476+
// Wait for deliverTx to complete
477+
resp := <-doneCh
451478

452479
close(task.AbortCh)
453480

454-
if abt, ok := <-task.AbortCh; ok {
481+
wg.Wait()
482+
483+
// If abort has occurred, return, else set the response and status
484+
if abortOccurred {
455485
task.Status = statusAborted
456-
task.Abort = &abt
486+
task.Abort = abort
457487
return
458488
}
459489

490+
task.Status = statusExecuted
491+
task.Response = &resp
492+
460493
// write from version store to multiversion stores
461494
for _, v := range task.VersionStores {
462495
v.WriteToMultiVersionStore()
463496
}
464-
465-
task.Status = statusExecuted
466-
task.Response = &resp
467497
}

tasks/scheduler_test.go

Lines changed: 100 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"net/http"
8+
_ "net/http/pprof"
9+
"runtime"
710
"testing"
811

912
"github.com/stretchr/testify/require"
@@ -52,30 +55,106 @@ func initTestCtx(injectStores bool) sdk.Context {
5255
return ctx
5356
}
5457

58+
func generateTasks(count int) []*deliverTxTask {
59+
var res []*deliverTxTask
60+
for i := 0; i < count; i++ {
61+
res = append(res, &deliverTxTask{Index: i})
62+
}
63+
return res
64+
}
65+
5566
func TestProcessAll(t *testing.T) {
67+
runtime.SetBlockProfileRate(1)
68+
69+
go func() {
70+
http.ListenAndServe("localhost:6060", nil)
71+
}()
72+
5673
tests := []struct {
5774
name string
5875
workers int
5976
runs int
77+
before func(ctx sdk.Context)
6078
requests []*sdk.DeliverTxEntry
6179
deliverTxFunc mockDeliverTxFunc
6280
addStores bool
6381
expectedErr error
6482
assertions func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx)
6583
}{
6684
{
67-
name: "Test every tx accesses same key",
85+
name: "Test zero txs does not hang",
86+
workers: 20,
87+
runs: 10,
88+
addStores: true,
89+
requests: requestList(0),
90+
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
91+
panic("should not deliver")
92+
},
93+
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
94+
require.Len(t, res, 0)
95+
},
96+
expectedErr: nil,
97+
},
98+
{
99+
name: "Test tx writing to a store that another tx is iterating",
68100
workers: 50,
69-
runs: 50,
101+
runs: 1,
102+
requests: requestList(500),
70103
addStores: true,
71-
requests: requestList(100),
104+
before: func(ctx sdk.Context) {
105+
kv := ctx.MultiStore().GetKVStore(testStoreKey)
106+
// initialize 100 test values in the base kv store so iterating isn't too fast
107+
for i := 0; i < 10; i++ {
108+
kv.Set([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i)))
109+
}
110+
},
111+
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
112+
kv := ctx.MultiStore().GetKVStore(testStoreKey)
113+
if ctx.TxIndex()%2 == 0 {
114+
// For even-indexed transactions, write to the store
115+
kv.Set(req.Tx, req.Tx)
116+
return types.ResponseDeliverTx{
117+
Info: "write",
118+
}
119+
} else {
120+
// For odd-indexed transactions, iterate over the store
121+
122+
// just write so we have more writes going on
123+
kv.Set(req.Tx, req.Tx)
124+
iterator := kv.Iterator(nil, nil)
125+
defer iterator.Close()
126+
for ; iterator.Valid(); iterator.Next() {
127+
// Do nothing, just iterate
128+
}
129+
return types.ResponseDeliverTx{
130+
Info: "iterate",
131+
}
132+
}
133+
},
134+
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
135+
for idx, response := range res {
136+
if idx%2 == 0 {
137+
require.Equal(t, "write", response.Info)
138+
} else {
139+
require.Equal(t, "iterate", response.Info)
140+
}
141+
}
142+
},
143+
expectedErr: nil,
144+
},
145+
{
146+
name: "Test no overlap txs",
147+
workers: 20,
148+
runs: 10,
149+
addStores: true,
150+
requests: requestList(1000),
72151
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
73152
// all txs read and write to the same key to maximize conflicts
74153
kv := ctx.MultiStore().GetKVStore(testStoreKey)
75-
val := string(kv.Get(itemKey))
76154

77155
// write to the store with this tx's index
78-
kv.Set(itemKey, req.Tx)
156+
kv.Set(req.Tx, req.Tx)
157+
val := string(kv.Get(req.Tx))
79158

80159
// return what was read from the store (final attempt should be index-1)
81160
return types.ResponseDeliverTx{
@@ -84,26 +163,22 @@ func TestProcessAll(t *testing.T) {
84163
},
85164
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
86165
for idx, response := range res {
87-
if idx == 0 {
88-
require.Equal(t, "", response.Info)
89-
} else {
90-
// the info is what was read from the kv store by the tx
91-
// each tx writes its own index, so the info should be the index of the previous tx
92-
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
93-
}
166+
require.Equal(t, fmt.Sprintf("%d", idx), response.Info)
167+
}
168+
store := ctx.MultiStore().GetKVStore(testStoreKey)
169+
for i := 0; i < len(res); i++ {
170+
val := store.Get([]byte(fmt.Sprintf("%d", i)))
171+
require.Equal(t, []byte(fmt.Sprintf("%d", i)), val)
94172
}
95-
// confirm last write made it to the parent store
96-
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
97-
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
98173
},
99174
expectedErr: nil,
100175
},
101176
{
102-
name: "Test few workers many txs",
103-
workers: 5,
104-
runs: 10,
177+
name: "Test every tx accesses same key",
178+
workers: 50,
179+
runs: 1,
105180
addStores: true,
106-
requests: requestList(50),
181+
requests: requestList(1000),
107182
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
108183
// all txs read and write to the same key to maximize conflicts
109184
kv := ctx.MultiStore().GetKVStore(testStoreKey)
@@ -136,9 +211,9 @@ func TestProcessAll(t *testing.T) {
136211
{
137212
name: "Test no stores on context should not panic",
138213
workers: 50,
139-
runs: 1,
214+
runs: 10,
140215
addStores: false,
141-
requests: requestList(50),
216+
requests: requestList(10),
142217
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
143218
return types.ResponseDeliverTx{
144219
Info: fmt.Sprintf("%d", ctx.TxIndex()),
@@ -167,6 +242,10 @@ func TestProcessAll(t *testing.T) {
167242
s := NewScheduler(tt.workers, ti, tt.deliverTxFunc)
168243
ctx := initTestCtx(tt.addStores)
169244

245+
if tt.before != nil {
246+
tt.before(ctx)
247+
}
248+
170249
res, err := s.ProcessAll(ctx, tt.requests)
171250
require.Len(t, res, len(tt.requests))
172251

0 commit comments

Comments
 (0)