Skip to content

Commit

Permalink
fix: API to have AsyncEvent, now called Event()
Browse files Browse the repository at this point in the history
  • Loading branch information
gbaranski committed Dec 13, 2024
1 parent a6e7f8d commit 30b0ca8
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 85 deletions.
47 changes: 14 additions & 33 deletions fileshare/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ type EventManager struct {
notificationManager *NotificationManager
defaultDownloadDir string

syncEvents chan []Event
syncDoneCh chan struct{}
asyncEvents chan []Event
events chan []Event
}

// NewEventManager loads transfer state from storage, or creates empty state if loading fails.
Expand All @@ -78,9 +76,7 @@ func NewEventManager(
osInfo: osInfo,
filesystem: filesystem,
defaultDownloadDir: defaultDownloadDir,
syncEvents: make(chan []Event),
syncDoneCh: make(chan struct{}),
asyncEvents: make(chan []Event, 32),
events: make(chan []Event, 32),
}
go em.process()

Expand All @@ -90,50 +86,35 @@ func NewEventManager(
func (em *EventManager) process() {
fn := func(ev []Event) {
em.mutex.Lock()
defer em.mutex.Unlock()
for _, e := range ev {
em.handleEvent(e)
}
em.mutex.Unlock()
}

for {
select {
case e, ok := <-em.asyncEvents:
if !ok {
log.Println(internal.WarningPrefix, "asyncEvents channel closed")
return
}
fn(e)
case e, ok := <-em.syncEvents:
if !ok {
log.Println(internal.WarningPrefix, "syncEvents channel closed")
return
}
fn(e)
em.syncDoneCh <- struct{}{}
events, ok := <-em.events
if !ok {
log.Println(internal.WarningPrefix, "events channel closed")
return
}
fn(events)
}
}

// AsyncEvent sends an event to the event manager in an asynchronous manner
// Event sends an event to the event manager in an asynchronous manner
//
// This function should return immediately,
// unless the asyncEvents channel is full, in which case it will block until there is space
func (em *EventManager) AsyncEvent(event ...Event) {
// unless the Events channel is full, in which case it will block until there is space
func (em *EventManager) Event(event ...Event) {
select {
case em.asyncEvents <- event:
case em.events <- event:
default:
log.Println(internal.WarningPrefix, " async events channel is full. AsyncEvent() will block until there is space")
em.asyncEvents <- event
log.Println(internal.WarningPrefix, " async events channel is full. Event() will block until there is space")
em.events <- event
}
}

// SyncEvent sends an event to the event manager in a synchronous manner, and waits for the event to be fully processed
func (em *EventManager) SyncEvent(event ...Event) {
em.syncEvents <- event
<-em.syncDoneCh
}

// SetFileshare must be called before using event manager.
// Necessary because of circular dependency between event manager and libDrop.
func (em *EventManager) SetFileshare(fileshare Fileshare) {
Expand Down
101 changes: 52 additions & 49 deletions fileshare/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,36 @@ type mockNotification struct {
type mockNotifier struct {
notifications []mockNotification
nextID uint32
updateCh chan struct{}
}

func (mn *mockNotifier) Wait() {
timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()

select {
case <-timeout:
case <-mn.updateCh:
}
}

func (mn *mockNotifier) SendNotification(summary string, body string, actions []Action) (uint32, error) {
notificationID := mn.nextID
mn.notifications = append(mn.notifications, mockNotification{id: notificationID, summary: summary, body: body, actions: actions})
mn.nextID++

if mn.updateCh != nil {
select {
case mn.updateCh <- struct{}{}:
default:
<-mn.updateCh
mn.updateCh <- struct{}{}

Check failure on line 71 in fileshare/event_manager_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}
}
return notificationID, nil
}

Expand Down Expand Up @@ -315,35 +339,23 @@ func TestTransferProgress(t *testing.T) {
},
}

eventManager.SyncEvent(
eventManager.Event(
Event{
Kind: EventKindRequestQueued{
Peer: peer,
TransferId: transferID,
Files: []QueuedFile{
{
Id: file1ID,
Path: file1,
Size: file1sz,
},
{
Id: file2ID,
Path: file2,
Size: file2sz,
},
{
Id: file3ID,
Path: file3,
Size: file3sz,
},
{Id: file1ID, Path: file1, Size: file1sz},
{Id: file2ID, Path: file2, Size: file2sz},
{Id: file3ID, Path: file3, Size: file3sz},
},
},
},
)

progCh := eventManager.Subscribe(transferID)

eventManager.SyncEvent(
eventManager.Event(
Event{
Kind: EventKindFileStarted{
TransferId: transferID,
Expand All @@ -361,7 +373,7 @@ func TestTransferProgress(t *testing.T) {

transferredBytes := file1sz
go func() {
eventManager.SyncEvent(
eventManager.Event(
Event{
Kind: EventKindFileProgress{
TransferId: transferID,
Expand All @@ -380,7 +392,7 @@ func TestTransferProgress(t *testing.T) {
waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
go func() {
eventManager.SyncEvent(
eventManager.Event(
Event{
Kind: EventKindFileDownloaded{
TransferId: transferID,
Expand All @@ -404,7 +416,7 @@ func TestTransferProgress(t *testing.T) {
// Final transfer state is determined from storage
storage.transfers[transferID].Status = pb.Status_SUCCESS

eventManager.SyncEvent(
eventManager.Event(
Event{
Timestamp: 0,
Kind: EventKindTransferFinalized{
Expand Down Expand Up @@ -572,6 +584,7 @@ func TestTransferFinishedNotifications(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: 0,
updateCh: make(chan struct{}, 1),
}
notificationManager := NewMockNotificationManager(&mockEventManagerOsInfo{})
notificationManager.notifier = &notifier
Expand Down Expand Up @@ -676,9 +689,10 @@ func TestTransferFinishedNotifications(t *testing.T) {
eventManager, notifier := initializeEventManager(test.direction)

t.Run(test.name, func(t *testing.T) {
eventManager.SyncEvent(
eventManager.Event(
test.event,
)
notifier.Wait()

assert.Equal(t, 1, len(notifier.notifications),
"TransferFinished event was received, but EventManager did not send any notifications.")
Expand All @@ -703,6 +717,7 @@ func TestTransferFinishedNotificationsOpenFile(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: 0,
updateCh: make(chan struct{}, 1),
}

openedFiles := []string{}
Expand Down Expand Up @@ -734,14 +749,15 @@ func TestTransferFinishedNotificationsOpenFile(t *testing.T) {
Direction: pb.Direction_INCOMING,
}

eventManager.SyncEvent(Event{
eventManager.Event(Event{
Kind: EventKindFileDownloaded{
TransferId: transferID,
FileId: fileID,
FinalPath: filePath,
},
})

notifier.Wait()
notification := notifier.notifications[0]

notificationManager.OpenFile(notification.id)
Expand All @@ -758,6 +774,7 @@ func TestTransferRequestNotification(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: 0,
updateCh: make(chan struct{}, 1),
}

openedFiles := []string{}
Expand Down Expand Up @@ -800,7 +817,8 @@ func TestTransferRequestNotification(t *testing.T) {
},
}

eventManager.SyncEvent(event)
eventManager.Event(event)
notifier.Wait()

assert.Equal(t, 1, len(notifier.notifications),
"Transfer request notification was not sent after transfer request event was received.")
Expand Down Expand Up @@ -868,6 +886,7 @@ func TestTransferRequestNotificationAccept(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: pendingTransferNotificationID,
updateCh: make(chan struct{}, 1),
}

osInfo := mockEventManagerOsInfo{
Expand Down Expand Up @@ -1039,6 +1058,7 @@ func TestTransterRequestNotificationAcceptInvalidTransfer(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: transferNotificationID,
updateCh: make(chan struct{}, 1),
}

notificationManager := NewMockNotificationManager(&mockOsEnvironment.mockEventManagerOsInfo)
Expand Down Expand Up @@ -1100,6 +1120,7 @@ func TestTransferRequestNotificationCancel(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: pendingTransferNotificationID,
// updateCh: make(chan struct{}),
}

notificationManager := NewMockNotificationManager(&mockEventManagerOsInfo{})
Expand Down Expand Up @@ -1223,6 +1244,7 @@ func TestAutoaccept(t *testing.T) {
notifier := mockNotifier{
notifications: []mockNotification{},
nextID: 0,
updateCh: make(chan struct{}, 1),
}

notificationManager := NewMockNotificationManager(&mockOsEnvironment.mockEventManagerOsInfo)
Expand Down Expand Up @@ -1334,7 +1356,8 @@ func TestAutoaccept(t *testing.T) {

eventManager.fileshare = &mockFileshare
eventManager.defaultDownloadDir = test.defaultDownloadDirectory
eventManager.SyncEvent(event)
eventManager.Event(event)
notifier.Wait()

if test.acceptedTransferID != "" {
assert.NotEmpty(t, mockFileshare.acceptedTransferIDS,
Expand All @@ -1358,7 +1381,7 @@ func TestAutoaccept(t *testing.T) {
}
}

func TestAsyncEvents(t *testing.T) {
func TestEventsFlow(t *testing.T) {
category.Set(t, category.Unit)

const (
Expand Down Expand Up @@ -1421,20 +1444,20 @@ func TestAsyncEvents(t *testing.T) {
fn func(t *testing.T, em *EventManager, progCh chan TransferProgressInfo)
}{
{
name: "AsyncEvent() should not block with progCh buffer = 1 and 1024 events",
name: "Event() should not block with progCh buffer = 1 and 1024 events",
progChSize: 32,
fn: func(t *testing.T, em *EventManager, progCh chan TransferProgressInfo) {
for _, chunk := range chunks {
em.AsyncEvent(chunk...)
em.Event(chunk...)
}
},
},
{
name: "AsyncEvent() events should be in order & all received",
name: "Event() events should be in order & all received",
progChSize: int(numEvents * 2),
fn: func(t *testing.T, em *EventManager, progCh chan TransferProgressInfo) {
for _, chunk := range chunks {
em.AsyncEvent(chunk...)
em.Event(chunk...)
}
lastProgress := uint32(0)
for i := uint64(0); i < numEvents-2; i++ {
Expand All @@ -1450,26 +1473,6 @@ func TestAsyncEvents(t *testing.T) {
}
},
},
{
name: "SyncEvent() events should block",
progChSize: 32,
fn: func(t *testing.T, em *EventManager, progCh chan TransferProgressInfo) {
done := make(chan struct{})
go func() {
for _, chunk := range chunks {
em.SyncEvent(chunk...)
}
<-done
}()

timeout := time.After(time.Millisecond * 100)
select {
case <-done:
t.Fatalf("SyncEvent() should block, but it didn't")
case <-timeout:
}
},
},
}

for _, test := range tests {
Expand Down
3 changes: 1 addition & 2 deletions fileshare/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
)

type EventCallback interface {
AsyncEvent(event ...Event)
SyncEvent(event ...Event)
Event(event ...Event)
}

type Event struct {
Expand Down
2 changes: 1 addition & 1 deletion fileshare/libdrop/libdrop.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type libdropEventCallback struct {

func (lec libdropEventCallback) OnEvent(nev norddrop.Event) {
ev := libdropEventToInternalEvent(nev)
lec.eventCallback.AsyncEvent(ev)
lec.eventCallback.Event(ev)
}

func libdropEventToInternalEvent(nev norddrop.Event) fileshare.Event {
Expand Down

0 comments on commit 30b0ca8

Please sign in to comment.