Skip to content

Commit 7f09211

Browse files
authored
Merge pull request #11 from KyberNetwork/ft/batcher-flush
feat: batcher can now be flushed manually
2 parents 4260526 + 253651b commit 7f09211

File tree

2 files changed

+69
-22
lines changed

2 files changed

+69
-22
lines changed

batcher.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ func (c *ChanTask[R]) Resolve(ret R, err error) {
9292
type Batcher[T BatchableTask[R], R any] interface {
9393
// Batch submits a BatchableTask to the batcher.
9494
Batch(task T)
95+
// Flush executes tasks currently waiting in queue immediately.
96+
Flush()
9597
// Close should stop Batch from being called and clean up any background resources.
9698
Close()
9799
}
@@ -108,6 +110,7 @@ type ChanBatcher[T BatchableTask[R], R any] struct {
108110
batchCfg BatchCfg
109111
batchFn BatchFn[T]
110112
taskCh chan T
113+
flushCh chan struct{}
111114
closed atomic.Bool
112115
}
113116

@@ -117,6 +120,7 @@ func NewChanBatcher[T BatchableTask[R], R any](batchCfg BatchCfg, batchFn BatchF
117120
batchCfg: batchCfg,
118121
batchFn: batchFn,
119122
taskCh: make(chan T, 16*batchCnt),
123+
flushCh: make(chan struct{}, 1),
120124
}
121125
go chanBatcher.worker()
122126
return chanBatcher
@@ -131,6 +135,16 @@ func (b *ChanBatcher[T, R]) Batch(task T) {
131135
}
132136
}
133137

138+
// Flush executes tasks currently waiting in queue immediately.
139+
func (b *ChanBatcher[T, R]) Flush() {
140+
if !b.closed.Load() {
141+
select {
142+
case b.flushCh <- struct{}{}:
143+
default:
144+
}
145+
}
146+
}
147+
134148
// Close closes this chanBatcher to prevents Batch-ing new BatchableTask's and tell the worker goroutine to finish up.
135149
func (b *ChanBatcher[_, _]) Close() {
136150
if !b.closed.Swap(true) {
@@ -185,6 +199,13 @@ func (b *ChanBatcher[T, R]) worker() {
185199
klog.Debugf(tasks[0].Ctx(), "ChanBatcher.worker|timer|%d tasks", len(tasks))
186200
go b.batchFnWithRecover(tasks)
187201
tasks = tasks[:0:0]
202+
case <-b.flushCh:
203+
if len(tasks) == 0 {
204+
break
205+
}
206+
klog.Debugf(tasks[0].Ctx(), "ChanBatcher.worker|flush|%d tasks", len(tasks))
207+
go b.batchFnWithRecover(tasks)
208+
tasks = tasks[:0:0]
188209
case task, ok := <-b.taskCh:
189210
if !ok {
190211
ctx := context.Background()

batcher_test.go

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestChanBatcher(t *testing.T) {
2121
return batchRate, 2
2222
}, func(tasks []*ChanTask[time.Duration]) { batchFn(tasks) })
2323
var cnt atomic.Uint32
24-
start := time.Now()
24+
var start time.Time
2525
batchFn = func(tasks []*ChanTask[time.Duration]) {
2626
cnt.Add(1)
2727
for _, task := range tasks {
@@ -31,29 +31,54 @@ func TestChanBatcher(t *testing.T) {
3131
task0 := NewChanTask[time.Duration](ctx)
3232
task1 := NewChanTask[time.Duration](ctx)
3333
task2 := NewChanTask[time.Duration](ctx)
34+
task3 := NewChanTask[time.Duration](ctx)
3435

3536
t.Run("happy", func(t *testing.T) {
36-
batcher.Batch(task0)
37-
batcher.Batch(task1)
38-
_, _ = task0.Result()
39-
assert.EqualValues(t, 1, cnt.Load())
40-
assert.NoError(t, task0.Err)
41-
assert.Less(t, task0.Ret, batchRate)
42-
ret, err := task1.Result()
43-
assert.NoError(t, err)
44-
assert.Less(t, ret, batchRate)
45-
time.Sleep(batchRate * 11 / 10)
46-
runtime.Gosched()
37+
t.Run("trigger max", func(t *testing.T) {
38+
start = time.Now()
39+
batcher.Batch(task0)
40+
batcher.Batch(task1)
41+
_, _ = task0.Result()
42+
assert.EqualValues(t, 1, cnt.Load())
43+
assert.NoError(t, task0.Err)
44+
assert.Less(t, task0.Ret, batchRate)
45+
ret, err := task1.Result()
46+
assert.NoError(t, err)
47+
assert.Less(t, ret, batchRate)
48+
time.Sleep(batchRate * 11 / 10)
49+
runtime.Gosched()
50+
})
4751

48-
batcher.Batch(task2)
49-
assert.False(t, task2.IsDone())
50-
ret, err = task2.Result()
51-
assert.True(t, task2.IsDone())
52-
assert.EqualValues(t, 2, cnt.Load())
53-
assert.Equal(t, task2.Err, err)
54-
assert.NoError(t, task2.Err)
55-
assert.Equal(t, task2.Ret, ret)
56-
assert.Greater(t, ret, batchRate)
52+
t.Run("trigger timer after blocked by .Result()", func(t *testing.T) {
53+
start = time.Now()
54+
batcher.Batch(task2)
55+
assert.False(t, task2.IsDone())
56+
ret, err := task2.Result()
57+
assert.True(t, task2.IsDone())
58+
assert.EqualValues(t, 2, cnt.Load())
59+
assert.Equal(t, task2.Err, err)
60+
assert.NoError(t, task2.Err)
61+
assert.Equal(t, task2.Ret, ret)
62+
assert.Greater(t, ret, batchRate)
63+
})
64+
65+
t.Run("trigger flush", func(t *testing.T) {
66+
start = time.Now()
67+
batcher.Batch(task3)
68+
assert.False(t, task3.IsDone())
69+
batcher.Flush()
70+
batcher.Flush()
71+
ret, err := task3.Result()
72+
assert.True(t, task3.IsDone())
73+
assert.EqualValues(t, 3, cnt.Load())
74+
assert.Equal(t, task3.Err, err)
75+
assert.NoError(t, task3.Err)
76+
assert.Equal(t, task3.Ret, ret)
77+
assert.Less(t, ret, batchRate)
78+
batcher.Flush()
79+
batcher.Flush()
80+
assert.EqualValues(t, 3, cnt.Load())
81+
})
5782
})
5883

5984
t.Run("spam", func(t *testing.T) {
@@ -111,13 +136,14 @@ func TestChanBatcher(t *testing.T) {
111136
assert.ErrorIs(t, task0.Err, panicErr)
112137
assert.ErrorIs(t, task1.Err, panicErr)
113138

139+
start = time.Now()
114140
batchFn = oldBatchFn
115141
task2 = NewChanTask[time.Duration](nil) // nolint:staticcheck
116142
batcher.Batch(task2)
117143
batcher.Batch(task2)
118144
ret, err := task2.Result()
119145
assert.NoError(t, err)
120-
assert.Greater(t, ret, batchRate)
146+
assert.Less(t, ret, batchRate)
121147
})
122148

123149
t.Run("cancelled task", func(t *testing.T) {

0 commit comments

Comments
 (0)