Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use events to test network logic #2700

Merged
merged 42 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b3f45bc
wip
nasdf Jun 5, 2024
10c522b
move event names and types to event package
nasdf Jun 5, 2024
8f870bd
refactor net events
nasdf Jun 6, 2024
138bb84
add event messages. use separate event bus for user subscriptions
nasdf Jun 7, 2024
144b392
update tests with event bus
nasdf Jun 7, 2024
243775a
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 7, 2024
6b8d97e
merge push log and dag merge events
nasdf Jun 7, 2024
05f1579
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 11, 2024
98e77d4
rename events package to event
nasdf Jun 11, 2024
2da9153
add event bus timeout
nasdf Jun 11, 2024
3725fca
add waitForMerge test util
nasdf Jun 11, 2024
45f6edd
merge event bus and simple channel implementations
nasdf Jun 11, 2024
938954e
resubscribe to events after restart
nasdf Jun 11, 2024
6e98699
update event names
nasdf Jun 12, 2024
1a1ce8f
update event bus publish logic
nasdf Jun 12, 2024
9ce688f
update command buffer size
nasdf Jun 12, 2024
7e1045f
document p2p waitForMerge require
nasdf Jun 12, 2024
573090e
add event bus interface
nasdf Jun 12, 2024
ab56124
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 12, 2024
d6a1be1
add event.Name type
nasdf Jun 12, 2024
23d4d95
fix event bus isClosed logic
nasdf Jun 12, 2024
64ebf82
remove WARNING text from bufferedBus docs
nasdf Jun 12, 2024
42c52d0
update mocks
nasdf Jun 12, 2024
7ac87b9
fix waitForMerge node address
nasdf Jun 12, 2024
311a37d
revert p2p waitForMerge change. add more info to cli peer info panic
nasdf Jun 13, 2024
cfa0ef9
fix node constructor
nasdf Jun 13, 2024
0357385
skip waitForMerge when no waitForSync present
nasdf Jun 13, 2024
3513390
revert skipWaitForMerge. use peer info from state
nasdf Jun 13, 2024
184f0cc
test db merge sequential
nasdf Jun 13, 2024
38e0c48
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
55aac76
add mutex to buffered bus
nasdf Jun 17, 2024
e6128ef
document bus mutex
nasdf Jun 17, 2024
801354c
add buffered bus tests
nasdf Jun 17, 2024
2dfa4d2
fix buffered bus test names
nasdf Jun 17, 2024
56136c5
remove event.Bus interface
nasdf Jun 17, 2024
726be78
make mocks
nasdf Jun 17, 2024
61ab97f
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
9fb2eeb
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
72f44ce
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
d0ad720
update test names
nasdf Jun 18, 2024
aeec15d
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 20, 2024
7680133
use session for dag sync
nasdf Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func MakeStartCommand() *cobra.Command {
node.WithACPType(node.LocalACPType),
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
4 changes: 2 additions & 2 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/event"
)

type CollectionName = string
Expand Down Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() events.Events
Events() *event.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
16 changes: 9 additions & 7 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datastore/mocks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func prepareDAGStore(t *testing.T) *DAGStore {
func NewTxnWithMultistore(t *testing.T) *MultiStoreTxn {
txn := NewTxn(t)
txn.EXPECT().OnSuccess(mock.Anything).Maybe()
txn.EXPECT().OnSuccessAsync(mock.Anything).Maybe()

result := &MultiStoreTxn{
Txn: txn,
Expand Down
163 changes: 163 additions & 0 deletions event/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"sync"
"sync/atomic"
)

type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

type closeCommand struct{}

// Bus uses a buffered channel to manage subscribers and publish messages.
type Bus struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: I thought we agreed to keep this behind an interface? Happy to resume that conversation if need be :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewSisley, where were you thinking of defining the interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already exists in the events package, this PR deleted it. I think it would just need a rename, removal of the generic, and adding the ...string param to Subscribe, otherwise it looks identical to the *Bus signature.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to define the interface in the event package has it doesn't need/use it. We could define it in client though for the purpose of the client.DB interface method Events() Events.

Copy link
Contributor

@AndrewSisley AndrewSisley Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to move it, I'd also suggest moving Subscription there too, and then the event package can become internal.

Otherwise, I do slightly disagree with you. It is quite common for packages to define their public interfaces within themselves. Especially given event is not really Defra specific, and could quite happily be moved outside of this repo for general use.

Bus can also be made private if we want (like it is in develop), would be my minor personal preference too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packages defining public interfaces that are not used internally is bad practice though. We should try to avoid it. The goal of an interface is to make something more flexible. Like if we want db to be able to use different event bus implementation. I'd argue that we are building the Bus specifically for our needs so I don't think it flexibility makes sense here. I much prefer the concrete type as it is now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the interface because I will be using it to create a JavaScript event bus for the WASM build.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Doesn't have to be in this PR, but the OnFooAsync funcs added in #2708 can now be removed IMO - they are dead code (unless we want to keep them to allow users to call them).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to remember to bring this up in the stand up

// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[Name]map[uint64]struct{}
// commandChannel manages all commands sent to the bufferedBus.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed bool
// closeMutex is only locked when the bus is closing.
closeMutex sync.RWMutex
}

// NewBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBus(commandBufferSize int, eventBufferSize int) *Bus {
bus := Bus{
subs: make(map[uint64]*Subscription),
events: make(map[Name]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

// Publish broadcasts the given message to the bus subscribers. Non-blocking.
func (b *Bus) Publish(msg Message) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return
}
b.commandChannel <- publishCommand(msg)
}

// Subscribe returns a new subscription that will receive all of the events
// contained in the given list of events.
func (b *Bus) Subscribe(events ...Name) (*Subscription, error) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return nil, ErrSubscribedToClosedChan

Check warning on line 83 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L83

Added line #L83 was not covered by tests
}
sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

// Unsubscribe removes all event subscriptions and closes the subscription channel.
//
// Will do nothing if this object is already closed.
func (b *Bus) Unsubscribe(sub *Subscription) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

// Close unsubscribes all active subscribers and closes the command channel.
func (b *Bus) Close() {
b.closeMutex.Lock()
defer b.closeMutex.Unlock()

if b.isClosed {
return
}
b.isClosed = true
b.commandChannel <- closeCommand{}
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

func (b *Bus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardName] {
b.subs[id].value <- Message(t)
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardName][id]; ok {
continue
}
b.subs[id].value <- Message(t)
}
}
}
}
Loading
Loading