Skip to content

Commit

Permalink
merge event bus and simple channel implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 11, 2024
1 parent 3725fca commit 45f6edd
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 161 deletions.
168 changes: 101 additions & 67 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@
package event

import (
"sync"
"time"
"sync/atomic"
)

type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

type closeCommand struct{}

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Expand All @@ -30,7 +37,7 @@ func NewMessage(name string, data any) Message {

// Subscription is a read-only event stream.
type Subscription struct {
id int
id uint64
value chan Message
events []string
}
Expand All @@ -48,98 +55,125 @@ func (s *Subscription) Events() []string {
// Bus is used to broadcast events to subscribers.
type Bus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID int
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[int]*Subscription
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[string]map[int]struct{}
// mutex is used to lock reads and writes to the subs and events maps.
mutex sync.RWMutex
// timeout is the amount of time to wait for a blocking channel before a message is discarded.
timeout time.Duration
events map[string]map[uint64]struct{}
// commandChannel manages all commands sent to this simpleChannel.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// WARNING: This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed atomic.Bool
}

// NewBus returns a new event bus.
func NewBus(timeout time.Duration) *Bus {
return &Bus{
timeout: timeout,
subs: make(map[int]*Subscription),
events: make(map[string]map[int]struct{}),
// NewBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled subsequent calls to functions on this object may start to block.
func NewBus(commandBufferSize int, eventBufferSize int) *Bus {
bus := Bus{
subs: make(map[uint64]*Subscription),
events: make(map[string]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

// Publish publishes the given event message to all subscribers.
func (b *Bus) Publish(msg Message) {
b.mutex.RLock()
defer b.mutex.RUnlock()

subscribers := make(map[int]struct{})
for id := range b.events[msg.Name] {
subscribers[id] = struct{}{}
}
for id := range b.events[WildCardEventName] {
subscribers[id] = struct{}{}
}

for id := range subscribers {
select {
case b.subs[id].value <- msg:
// published message
case <-time.After(b.timeout):
// discarded message
}
if b.isClosed.Load() {
return
}
b.commandChannel <- publishCommand(msg)
}

// Subscribe returns a new channel that will receive all of the subscribed events.
func (b *Bus) Subscribe(size int, events ...string) *Subscription {
b.mutex.Lock()
defer b.mutex.Unlock()
func (b *Bus) Subscribe(events ...string) (*Subscription, error) {
if b.isClosed.Load() {
return nil, ErrSubscribedToClosedChan

Check warning on line 103 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L103

Added line #L103 was not covered by tests
}

sub := &Subscription{
id: b.subID,
value: make(chan Message, size),
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}

for _, event := range events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[int]struct{})
}
b.events[event][sub.id] = struct{}{}
}

b.subID++
b.subs[sub.id] = sub
return sub
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

// Unsubscribe unsubscribes from all events and closes the event channel of the given subscription.
func (b *Bus) Unsubscribe(sub *Subscription) {
b.mutex.Lock()
defer b.mutex.Unlock()

if _, ok := b.subs[sub.id]; !ok {
return // not subscribed
if b.isClosed.Load() {
return
}
for _, event := range sub.events {
delete(b.events[event], sub.id)
}

delete(b.subs, sub.id)
close(sub.value)
b.commandChannel <- unsubscribeCommand(sub)
}

// Close closes the event bus by unsubscribing all subscribers.
func (b *Bus) Close() {
b.mutex.RLock()
subs := make([]*Subscription, 0, len(b.subs))
for _, sub := range b.subs {
subs = append(subs, sub)
if b.isClosed.Load() {
return
}
b.mutex.RUnlock()
b.isClosed.Store(true)
b.commandChannel <- closeCommand{}

for _, sub := range subs {
b.Unsubscribe(sub)
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

func (b *Bus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed

Check warning on line 158 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L158

Added line #L158 was not covered by tests
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
subscribers := make(map[uint64]struct{})
for id := range b.events[t.Name] {
subscribers[id] = struct{}{}
}
for id := range b.events[WildCardEventName] {
subscribers[id] = struct{}{}

Check warning on line 172 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L172

Added line #L172 was not covered by tests
}
for id := range subscribers {
b.subs[id].value <- Message(t)
}
}
}
}
145 changes: 104 additions & 41 deletions event/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,74 +11,137 @@
package event

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBusSubscribeThenPublish(t *testing.T) {
bus := NewBus(100 * time.Millisecond)
func TestSimplePushIsNotBlockedWithoutSubscribers(t *testing.T) {
bus := NewBus(0, 0)
defer bus.Close()

sub1 := bus.Subscribe(1, "test")
sub2 := bus.Subscribe(1, WildCardEventName, "test")
msg := NewMessage("test", 1)
bus.Publish(msg)

// just assert that we reach this line, for the sake of having an assert
assert.True(t, true)
}

assert.ElementsMatch(t, sub1.Events(), []string{"test"})
assert.ElementsMatch(t, sub2.Events(), []string{WildCardEventName, "test"})
func TestSimpleSubscribersAreNotBlockedAfterClose(t *testing.T) {
bus := NewBus(0, 0)
defer bus.Close()

msg := NewMessage("test", "hello")
go bus.Publish(msg)
sub, err := bus.Subscribe("test")
assert.Nil(t, err)

event := <-sub1.Message()
assert.Equal(t, msg, event)
bus.Close()

event = <-sub2.Message()
assert.Equal(t, msg, event)
<-sub.Message()

select {
case <-sub2.Message():
t.Fatalf("subscriber should not recieve duplicate message")
case <-time.After(150 * time.Millisecond):
// wait for publish timeout + skew
}
// just assert that we reach this line, for the sake of having an assert
assert.True(t, true)
}

func TestBusPublishThenSubscribe(t *testing.T) {
bus := NewBus(100 * time.Millisecond)
func TestSimpleEachSubscribersRecievesEachItem(t *testing.T) {
bus := NewBus(0, 0)
defer bus.Close()

msg := NewMessage("test", "hello")
bus.Publish(msg)
msg1 := NewMessage("test", 1)
msg2 := NewMessage("test", 2)

sub1, err := bus.Subscribe("test")
assert.Nil(t, err)

sub2, err := bus.Subscribe("test")
assert.Nil(t, err)

// ordering of publish is not deterministic
// so capture each in a go routine
var wg sync.WaitGroup
var event1 Message
var event2 Message

go func() {
event1 = <-sub1.Message()
wg.Done()
}()

go func() {
event2 = <-sub2.Message()
wg.Done()
}()

wg.Add(2)
bus.Publish(msg1)
wg.Wait()

assert.Equal(t, msg1, event1)
assert.Equal(t, msg1, event2)

go func() {
event1 = <-sub1.Message()
wg.Done()
}()

sub := bus.Subscribe(1, "test")
select {
case <-sub.Message():
t.Fatalf("subscriber should not recieve message")
case <-time.After(150 * time.Millisecond):
// wait for publish timeout + skew
}
go func() {
event2 = <-sub2.Message()
wg.Done()
}()

wg.Add(2)
bus.Publish(msg2)
wg.Wait()

assert.Equal(t, msg2, event1)
assert.Equal(t, msg2, event2)
}

func TestBusSubscribeThenUnsubscribeThenPublish(t *testing.T) {
bus := NewBus(100 * time.Millisecond)
func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing.T) {
bus := NewBus(0, 2)
defer bus.Close()

sub := bus.Subscribe(1, "test")
bus.Unsubscribe(sub)
msg1 := NewMessage("test", 1)
msg2 := NewMessage("test", 2)

msg := NewMessage("test", "hello")
bus.Publish(msg)
sub1, err := bus.Subscribe("test")
assert.Nil(t, err)
sub2, err := bus.Subscribe("test")
assert.Nil(t, err)

// both inputs are added first before read, using the internal chan buffer
bus.Publish(msg1)
bus.Publish(msg2)

output1Ch1 := <-sub1.Message()
output1Ch2 := <-sub2.Message()

output2Ch1 := <-sub1.Message()
output2Ch2 := <-sub2.Message()

_, ok := <-sub.Message()
assert.False(t, ok, "channel should be closed")
assert.Equal(t, msg1, output1Ch1)
assert.Equal(t, msg1, output1Ch2)

assert.Equal(t, msg2, output2Ch1)
assert.Equal(t, msg2, output2Ch2)
}

func TestBusUnsubscribeTwice(t *testing.T) {
bus := NewBus(100 * time.Millisecond)
func TestSimpleSubscribersDontRecieveItemsAfterUnsubscribing(t *testing.T) {
bus := NewBus(0, 0)
defer bus.Close()

sub := bus.Subscribe(1, "test")
bus.Unsubscribe(sub)
sub, err := bus.Subscribe("test")
assert.Nil(t, err)
bus.Unsubscribe(sub)

msg := NewMessage("test", 1)
bus.Publish(msg)

// tiny delay to try and make sure the internal logic would have had time
// to do its thing with the pushed item.
time.Sleep(5 * time.Millisecond)

// closing the channel will result in reads yielding the default value
assert.Equal(t, Message{}, <-sub.Message())
}
Loading

0 comments on commit 45f6edd

Please sign in to comment.