Skip to content

Commit

Permalink
Events: type Batcher value & ensure queue order (#89)
Browse files Browse the repository at this point in the history
* Events: type Batcher value & ensure queue order

Update Batcher to allow for typed value types.

Update Batcher and Queue to execute values in order they were added.

Signed-off-by: joshvanl <[email protected]>

* Delay batcher to ensure key is sent in order

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL authored Mar 26, 2024
1 parent e33fbab commit 6c3b2ee
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 61 deletions.
111 changes: 80 additions & 31 deletions events/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package batcher

import (
"context"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,14 +24,20 @@ import (
"github.com/dapr/kit/events/queue"
)

type eventCh[T any] struct {
id int
ch chan<- T
}

// Batcher is a one to many event batcher. It batches events and sends them to
// the added event channel subscribers. Events are sent to the channels after
// the interval has elapsed. If events with the same key are received within
// the interval, the timer is reset.
type Batcher[T comparable] struct {
interval time.Duration
eventChs []chan<- struct{}
queue *queue.Processor[T, *item[T]]
type Batcher[K comparable, T any] struct {
interval time.Duration
eventChs []*eventCh[T]
queue *queue.Processor[K, *item[K, T]]
currentID int

clock clock.Clock
lock sync.Mutex
Expand All @@ -40,85 +47,127 @@ type Batcher[T comparable] struct {
}

// New creates a new Batcher with the given interval and key type.
func New[T comparable](interval time.Duration) *Batcher[T] {
b := &Batcher[T]{
func New[K comparable, T any](interval time.Duration) *Batcher[K, T] {
b := &Batcher[K, T]{
interval: interval,
clock: clock.RealClock{},
closeCh: make(chan struct{}),
}

b.queue = queue.NewProcessor[T, *item[T]](b.execute)
b.queue = queue.NewProcessor[K, *item[K, T]](b.execute)

return b
}

// WithClock sets the clock used by the batcher. Used for testing.
func (b *Batcher[T]) WithClock(clock clock.Clock) {
func (b *Batcher[K, T]) WithClock(clock clock.Clock) {
b.queue.WithClock(clock)
b.clock = clock
}

// Subscribe adds a new event channel subscriber. If the batcher is closed, the
// subscriber is silently dropped.
func (b *Batcher[T]) Subscribe(eventCh ...chan<- struct{}) {
func (b *Batcher[K, T]) Subscribe(ctx context.Context, ch ...chan<- T) {
b.lock.Lock()
defer b.lock.Unlock()
for _, c := range ch {
b.subscribe(ctx, c)
}
}

func (b *Batcher[K, T]) subscribe(ctx context.Context, ch chan<- T) {
if b.closed.Load() {
return
}
b.eventChs = append(b.eventChs, eventCh...)

id := b.currentID
b.currentID++
bufferedCh := make(chan T, 50)
b.eventChs = append(b.eventChs, &eventCh[T]{
id: id,
ch: bufferedCh,
})

b.wg.Add(1)
go func() {
defer func() {
b.lock.Lock()
close(ch)
for i, eventCh := range b.eventChs {
if eventCh.id == id {
b.eventChs = append(b.eventChs[:i], b.eventChs[i+1:]...)
break
}
}
b.lock.Unlock()
b.wg.Done()
}()

for {
select {
case <-ctx.Done():
return
case <-b.closeCh:
return
case env := <-bufferedCh:
select {
case ch <- env:
case <-ctx.Done():
case <-b.closeCh:
}
}
}
}()
}

func (b *Batcher[T]) execute(_ *item[T]) {
func (b *Batcher[K, T]) execute(i *item[K, T]) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed.Load() {
return
}
b.wg.Add(len(b.eventChs))
for _, eventCh := range b.eventChs {
go func(eventCh chan<- struct{}) {
defer b.wg.Done()
select {
case eventCh <- struct{}{}:
case <-b.closeCh:
}
}(eventCh)
for _, ev := range b.eventChs {
select {
case ev.ch <- i.value:
case <-b.closeCh:
}
}
}

// Batch adds the given key to the batcher. If an event for this key is already
// active, the timer is reset. If the batcher is closed, the key is silently
// dropped.
func (b *Batcher[T]) Batch(key T) {
b.queue.Enqueue(&item[T]{
key: key,
ttl: b.clock.Now().Add(b.interval),
func (b *Batcher[K, T]) Batch(key K, value T) {
b.queue.Enqueue(&item[K, T]{
key: key,
value: value,
ttl: b.clock.Now().Add(b.interval),
})
}

// Close closes the batcher. It blocks until all events have been sent to the
// subscribers. The batcher will be a no-op after this call.
func (b *Batcher[T]) Close() {
func (b *Batcher[K, T]) Close() {
defer b.wg.Wait()
b.queue.Close()
b.lock.Lock()
if b.closed.CompareAndSwap(false, true) {
close(b.closeCh)
}
b.lock.Unlock()
b.queue.Close()
}

// item implements queue.queueable.
type item[T comparable] struct {
key T
ttl time.Time
type item[K comparable, T any] struct {
key K
value T
ttl time.Time
}

func (b *item[T]) Key() T {
func (b *item[K, T]) Key() K {
return b.key
}

func (b *item[T]) ScheduledTime() time.Time {
func (b *item[K, T]) ScheduledTime() time.Time {
return b.ttl
}
73 changes: 52 additions & 21 deletions events/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package batcher

import (
"context"
"testing"
"time"

Expand All @@ -25,13 +26,13 @@ func TestNew(t *testing.T) {
t.Parallel()

interval := time.Millisecond * 10
b := New[string](interval)
b := New[string, struct{}](interval)
assert.Equal(t, interval, b.interval)
assert.False(t, b.closed.Load())
}

func TestWithClock(t *testing.T) {
b := New[string](time.Millisecond * 10)
b := New[string, struct{}](time.Millisecond * 10)
fakeClock := testingclock.NewFakeClock(time.Now())
b.WithClock(fakeClock)
assert.Equal(t, fakeClock, b.clock)
Expand All @@ -40,32 +41,32 @@ func TestWithClock(t *testing.T) {
func TestSubscribe(t *testing.T) {
t.Parallel()

b := New[string](time.Millisecond * 10)
b := New[string, struct{}](time.Millisecond * 10)
ch := make(chan struct{})
b.Subscribe(ch)
b.Subscribe(context.Background(), ch)
assert.Len(t, b.eventChs, 1)
}

func TestBatch(t *testing.T) {
t.Parallel()

fakeClock := testingclock.NewFakeClock(time.Now())
b := New[string](time.Millisecond * 10)
b := New[string, struct{}](time.Millisecond * 10)
b.WithClock(fakeClock)
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
b.Subscribe(ch1, ch2)
b.Subscribe(ch3)

b.Batch("key1")
b.Batch("key1")
b.Batch("key1")
b.Batch("key1")
b.Batch("key2")
b.Batch("key2")
b.Batch("key3")
b.Batch("key3")
b.Subscribe(context.Background(), ch1, ch2)
b.Subscribe(context.Background(), ch3)

b.Batch("key1", struct{}{})
b.Batch("key1", struct{}{})
b.Batch("key1", struct{}{})
b.Batch("key1", struct{}{})
b.Batch("key2", struct{}{})
b.Batch("key2", struct{}{})
b.Batch("key3", struct{}{})
b.Batch("key3", struct{}{})

assert.Eventually(t, func() bool {
return fakeClock.HasWaiters()
Expand Down Expand Up @@ -100,26 +101,56 @@ func TestBatch(t *testing.T) {
}
}
}

t.Run("ensure items are received in order with latest value", func(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
b := New[int, int](time.Millisecond * 10)
b.WithClock(fakeClock)
t.Cleanup(b.Close)
ch1 := make(chan int, 10)
ch2 := make(chan int, 10)
ch3 := make(chan int, 10)
b.Subscribe(context.Background(), ch1, ch2)
b.Subscribe(context.Background(), ch3)

for i := 0; i < 10; i++ {
b.Batch(i, i)
b.Batch(i, i+1)
b.Batch(i, i+2)
fakeClock.Step(time.Millisecond * 10)
}

for _, ch := range []chan int{ch1} {
for i := 0; i < 10; i++ {
select {
case v := <-ch:
assert.Equal(t, i+2, v)
case <-time.After(time.Second):
assert.Fail(t, "should be triggered")
}
}
}
})
}

func TestClose(t *testing.T) {
t.Parallel()

b := New[string](time.Millisecond * 10)
b := New[string, struct{}](time.Millisecond * 10)
ch := make(chan struct{})
b.Subscribe(ch)
b.Subscribe(context.Background(), ch)
assert.Len(t, b.eventChs, 1)
b.Batch("key1")
b.Batch("key1", struct{}{})
b.Close()
assert.True(t, b.closed.Load())
}

func TestSubscribeAfterClose(t *testing.T) {
t.Parallel()

b := New[string](time.Millisecond * 10)
b := New[string, struct{}](time.Millisecond * 10)
b.Close()
ch := make(chan struct{})
b.Subscribe(ch)
b.Subscribe(context.Background(), ch)
assert.Empty(t, b.eventChs)
}
2 changes: 1 addition & 1 deletion events/queue/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,5 @@ func (p *Processor[K, T]) execute(r T) {
return
}

go p.executeFn(r)
p.executeFn(r)
}
8 changes: 4 additions & 4 deletions fswatcher/fswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Options struct {
type FSWatcher struct {
w *fsnotify.Watcher
running atomic.Bool
batcher *batcher.Batcher[string]
batcher *batcher.Batcher[string, struct{}]
}

func New(opts Options) (*FSWatcher, error) {
Expand All @@ -71,7 +71,7 @@ func New(opts Options) (*FSWatcher, error) {
w: w,
// Often the case, writes to files are not atomic and involve multiple file system events.
// We want to hold off on sending events until we are sure that the file has been written to completion. We do this by waiting for a period of time after the last event has been received for a file name.
batcher: batcher.New[string](interval),
batcher: batcher.New[string, struct{}](interval),
}, nil
}

Expand All @@ -81,7 +81,7 @@ func (f *FSWatcher) Run(ctx context.Context, eventCh chan<- struct{}) error {
}
defer f.batcher.Close()

f.batcher.Subscribe(eventCh)
f.batcher.Subscribe(ctx, eventCh)

for {
select {
Expand All @@ -90,7 +90,7 @@ func (f *FSWatcher) Run(ctx context.Context, eventCh chan<- struct{}) error {
case err := <-f.w.Errors:
return errors.Join(fmt.Errorf("watcher error: %w", err), f.w.Close())
case event := <-f.w.Events:
f.batcher.Batch(event.Name)
f.batcher.Batch(event.Name, struct{}{})
}
}
}
4 changes: 2 additions & 2 deletions fswatcher/fswatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
)

func TestFSWatcher(t *testing.T) {
runWatcher := func(t *testing.T, opts Options, bacher *batcher.Batcher[string]) <-chan struct{} {
runWatcher := func(t *testing.T, opts Options, bacher *batcher.Batcher[string, struct{}]) <-chan struct{} {
t.Helper()

f, err := New(opts)
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestFSWatcher(t *testing.T) {

t.Run("should batch events of the same file for multiple events", func(t *testing.T) {
clock := clocktesting.NewFakeClock(time.Time{})
batcher := batcher.New[string](time.Millisecond * 500)
batcher := batcher.New[string, struct{}](time.Millisecond * 500)
batcher.WithClock(clock)
dir1 := t.TempDir()
dir2 := t.TempDir()
Expand Down
2 changes: 1 addition & 1 deletion fswatcher/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/dapr/kit/events/batcher"
)

func (f *FSWatcher) WithBatcher(b *batcher.Batcher[string]) *FSWatcher {
func (f *FSWatcher) WithBatcher(b *batcher.Batcher[string, struct{}]) *FSWatcher {
f.batcher = b
return f
}
2 changes: 1 addition & 1 deletion fswatcher/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestWithBatcher(t *testing.T) {
b := batcher.New[string](time.Millisecond * 10)
b := batcher.New[string, struct{}](time.Millisecond * 10)
f, err := New(Options{})
require.NoError(t, err)
f.WithBatcher(b)
Expand Down

0 comments on commit 6c3b2ee

Please sign in to comment.