-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Use events to test network logic #2700
Changes from all commits
b3f45bc
10c522b
8f870bd
138bb84
144b392
243775a
6b8d97e
05f1579
98e77d4
2da9153
3725fca
45f6edd
938954e
6e98699
1a1ce8f
9ce688f
7e1045f
573090e
ab56124
d6a1be1
23d4d95
64ebf82
42c52d0
7ac87b9
311a37d
cfa0ef9
0357385
3513390
184f0cc
38e0c48
55aac76
e6128ef
801354c
2dfa4d2
56136c5
726be78
61ab97f
9fb2eeb
72f44ce
d0ad720
aeec15d
7680133
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
// Copyright 2024 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package event | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
type subscribeCommand *Subscription | ||
|
||
type unsubscribeCommand *Subscription | ||
|
||
type publishCommand Message | ||
|
||
type closeCommand struct{} | ||
|
||
// Bus uses a buffered channel to manage subscribers and publish messages. | ||
type Bus struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. todo: Doesn't have to be in this PR, but the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll try to remember to bring this up in the stand up |
||
// subID is incremented for each subscriber and used to set subscriber ids. | ||
subID atomic.Uint64 | ||
// subs is a mapping of subscriber ids to subscriptions. | ||
subs map[uint64]*Subscription | ||
// events is a mapping of event names to subscriber ids. | ||
events map[Name]map[uint64]struct{} | ||
// commandChannel manages all commands sent to the bufferedBus. | ||
// | ||
// It is important that all stuff gets sent through this single channel to ensure | ||
// that the order of operations is preserved. | ||
// | ||
// This does mean that non-event commands can block the database if the buffer | ||
// size is breached (e.g. if many subscribe commands occupy the buffer). | ||
commandChannel chan any | ||
eventBufferSize int | ||
hasClosedChan chan struct{} | ||
isClosed bool | ||
// closeMutex is only locked when the bus is closing. | ||
closeMutex sync.RWMutex | ||
} | ||
|
||
// NewBus creates a new event bus with the given commandBufferSize and | ||
// eventBufferSize. | ||
// | ||
// Should the buffers be filled, subsequent calls on this bus will block. | ||
func NewBus(commandBufferSize int, eventBufferSize int) *Bus { | ||
bus := Bus{ | ||
subs: make(map[uint64]*Subscription), | ||
events: make(map[Name]map[uint64]struct{}), | ||
commandChannel: make(chan any, commandBufferSize), | ||
hasClosedChan: make(chan struct{}), | ||
eventBufferSize: eventBufferSize, | ||
} | ||
go bus.handleChannel() | ||
return &bus | ||
} | ||
|
||
// Publish broadcasts the given message to the bus subscribers. Non-blocking. | ||
func (b *Bus) Publish(msg Message) { | ||
b.closeMutex.RLock() | ||
defer b.closeMutex.RUnlock() | ||
|
||
if b.isClosed { | ||
return | ||
} | ||
b.commandChannel <- publishCommand(msg) | ||
} | ||
|
||
// Subscribe returns a new subscription that will receive all of the events | ||
// contained in the given list of events. | ||
func (b *Bus) Subscribe(events ...Name) (*Subscription, error) { | ||
b.closeMutex.RLock() | ||
defer b.closeMutex.RUnlock() | ||
|
||
if b.isClosed { | ||
return nil, ErrSubscribedToClosedChan | ||
} | ||
sub := &Subscription{ | ||
id: b.subID.Add(1), | ||
value: make(chan Message, b.eventBufferSize), | ||
events: events, | ||
} | ||
b.commandChannel <- subscribeCommand(sub) | ||
return sub, nil | ||
} | ||
|
||
// Unsubscribe removes all event subscriptions and closes the subscription channel. | ||
// | ||
// Will do nothing if this object is already closed. | ||
func (b *Bus) Unsubscribe(sub *Subscription) { | ||
b.closeMutex.RLock() | ||
defer b.closeMutex.RUnlock() | ||
|
||
if b.isClosed { | ||
return | ||
} | ||
b.commandChannel <- unsubscribeCommand(sub) | ||
} | ||
|
||
// Close unsubscribes all active subscribers and closes the command channel. | ||
func (b *Bus) Close() { | ||
b.closeMutex.Lock() | ||
defer b.closeMutex.Unlock() | ||
|
||
if b.isClosed { | ||
return | ||
} | ||
b.isClosed = true | ||
b.commandChannel <- closeCommand{} | ||
// Wait for the close command to be handled, in order, before returning | ||
<-b.hasClosedChan | ||
} | ||
|
||
func (b *Bus) handleChannel() { | ||
for cmd := range b.commandChannel { | ||
switch t := cmd.(type) { | ||
case closeCommand: | ||
for _, subscriber := range b.subs { | ||
close(subscriber.value) | ||
} | ||
close(b.commandChannel) | ||
close(b.hasClosedChan) | ||
return | ||
|
||
case subscribeCommand: | ||
for _, event := range t.events { | ||
if _, ok := b.events[event]; !ok { | ||
b.events[event] = make(map[uint64]struct{}) | ||
} | ||
b.events[event][t.id] = struct{}{} | ||
} | ||
b.subs[t.id] = t | ||
|
||
case unsubscribeCommand: | ||
if _, ok := b.subs[t.id]; !ok { | ||
continue // not subscribed | ||
} | ||
for _, event := range t.events { | ||
delete(b.events[event], t.id) | ||
} | ||
delete(b.subs, t.id) | ||
close(t.value) | ||
|
||
case publishCommand: | ||
for id := range b.events[WildCardName] { | ||
b.subs[id].value <- Message(t) | ||
} | ||
for id := range b.events[t.Name] { | ||
if _, ok := b.events[WildCardName][id]; ok { | ||
continue | ||
} | ||
b.subs[id].value <- Message(t) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: I thought we agreed to keep this behind an interface? Happy to resume that conversation if need be :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewSisley, where were you thinking of defining the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already exists in the
events
package, this PR deleted it. I think it would just need a rename, removal of the generic, and adding the...string
param toSubscribe
, otherwise it looks identical to the*Bus
signature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't make sense to define the interface in the
event
package has it doesn't need/use it. We could define it inclient
though for the purpose of theclient.DB
interface methodEvents() Events
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to move it, I'd also suggest moving
Subscription
there too, and then theevent
package can become internal.Otherwise, I do slightly disagree with you. It is quite common for packages to define their public interfaces within themselves. Especially given
event
is not really Defra specific, and could quite happily be moved outside of this repo for general use.Bus
can also be made private if we want (like it is in develop), would be my minor personal preference too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Packages defining public interfaces that are not used internally is bad practice though. We should try to avoid it. The goal of an interface is to make something more flexible. Like if we want
db
to be able to use different event bus implementation. I'd argue that we are building theBus
specifically for our needs so I don't think it flexibility makes sense here. I much prefer the concrete type as it is now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the interface because I will be using it to create a JavaScript event bus for the WASM build.