Skip to content

Commit 93a8d76

Browse files
gbaranskidevzbysiu
andauthored
Non-blocking libdrop event handling (#699)
Co-authored-by: Bartosz Zbytniewski <[email protected]>
1 parent 2cc46a6 commit 93a8d76

File tree

4 files changed

+253
-61
lines changed

4 files changed

+253
-61
lines changed

fileshare/event_manager.go

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type EventManager struct {
5656
filesystem Filesystem
5757
notificationManager *NotificationManager
5858
defaultDownloadDir string
59+
60+
events chan []Event
5961
}
6062

6163
// NewEventManager loads transfer state from storage, or creates empty state if loading fails.
@@ -66,14 +68,50 @@ func NewEventManager(
6668
filesystem Filesystem,
6769
defaultDownloadDir string,
6870
) *EventManager {
69-
return &EventManager{
71+
em := &EventManager{
7072
isProd: isProd,
7173
liveTransfers: map[string]*LiveTransfer{},
7274
transferSubscriptions: map[string]chan TransferProgressInfo{},
7375
meshClient: meshClient,
7476
osInfo: osInfo,
7577
filesystem: filesystem,
7678
defaultDownloadDir: defaultDownloadDir,
79+
events: make(chan []Event, 32),
80+
}
81+
go em.process()
82+
83+
return em
84+
}
85+
86+
func (em *EventManager) process() {
87+
fn := func(ev []Event) {
88+
em.mutex.Lock()
89+
defer em.mutex.Unlock()
90+
for _, e := range ev {
91+
em.handleEvent(e)
92+
}
93+
}
94+
95+
for {
96+
events, ok := <-em.events
97+
if !ok {
98+
log.Println(internal.WarningPrefix, "events channel closed")
99+
return
100+
}
101+
fn(events)
102+
}
103+
}
104+
105+
// Event sends an event to the event manager in an asynchronous manner
106+
//
107+
// This function should return immediately,
108+
// unless the Events channel is full, in which case it will block until there is space
109+
func (em *EventManager) Event(event ...Event) {
110+
select {
111+
case em.events <- event:
112+
default:
113+
log.Println(internal.WarningPrefix, "async events channel is full. Event() will block until there is space")
114+
em.events <- event
77115
}
78116
}
79117

@@ -126,11 +164,7 @@ func (em *EventManager) DisableNotifications() error {
126164
return nil
127165
}
128166

129-
// OnEvent processes events and handles live transfer state.
130-
func (em *EventManager) OnEvent(event Event) {
131-
em.mutex.Lock()
132-
defer em.mutex.Unlock()
133-
167+
func (em *EventManager) handleEvent(event Event) {
134168
if !em.isProd {
135169
log.Printf(internal.InfoPrefix+" DROP EVENT: %s\n", EventToString(event))
136170
}
@@ -206,6 +240,29 @@ func (em *EventManager) handleRequestReceivedEvent(event EventKindRequestReceive
206240
}
207241
}
208242

243+
func (em *EventManager) withProgressCh(transferID string, fn func(ch chan TransferProgressInfo)) {
244+
if ch, ok := em.transferSubscriptions[transferID]; ok {
245+
fn(ch)
246+
}
247+
}
248+
249+
func (em *EventManager) reportProgress(transferID string, status pb.Status, transferred uint32) {
250+
em.withProgressCh(transferID, func(ch chan TransferProgressInfo) {
251+
progress := TransferProgressInfo{
252+
TransferID: transferID,
253+
Transferred: transferred,
254+
Status: status,
255+
}
256+
select {
257+
case ch <- progress:
258+
default:
259+
log.Println(internal.WarningPrefix, " progress channel is full. removing oldest item and sending")
260+
<-ch
261+
ch <- progress
262+
}
263+
})
264+
}
265+
209266
func (em *EventManager) handleFileProgressEvent(event EventKindFileProgress) {
210267
transfer, err := em.getLiveTransfer(event.TransferId)
211268
if err != nil {
@@ -223,17 +280,11 @@ func (em *EventManager) handleFileProgressEvent(event EventKindFileProgress) {
223280
transfer.TotalTransferred += event.Transferred - file.Transferred // add only delta
224281
file.Transferred = event.Transferred
225282

226-
if progressCh, ok := em.transferSubscriptions[transfer.ID]; ok {
227-
var progressPercent uint32
228-
if transfer.TotalSize > 0 { // transfer progress percentage should be reported to subscriber
229-
progressPercent = uint32(float64(transfer.TotalTransferred) / float64(transfer.TotalSize) * 100)
230-
}
231-
progressCh <- TransferProgressInfo{
232-
TransferID: event.TransferId,
233-
Transferred: progressPercent,
234-
Status: pb.Status_ONGOING,
235-
}
283+
var progressPercent uint32
284+
if transfer.TotalSize > 0 { // transfer progress percentage should be reported to subscriber
285+
progressPercent = uint32(float64(transfer.TotalTransferred) / float64(transfer.TotalSize) * 100)
236286
}
287+
em.reportProgress(transfer.ID, pb.Status_ONGOING, progressPercent)
237288
}
238289

239290
func (em *EventManager) handleFileDownloadedEvent(event EventKindFileDownloaded) {
@@ -367,16 +418,13 @@ func (em *EventManager) handleTransferFinalizedEvent(event EventKindTransferFina
367418
}
368419

369420
func (em *EventManager) finalizeTransfer(transfer *LiveTransfer, status pb.Status) {
370-
if progressCh, ok := em.transferSubscriptions[transfer.ID]; ok {
371-
progressCh <- TransferProgressInfo{
372-
TransferID: transfer.ID,
373-
Status: status,
374-
}
421+
em.reportProgress(transfer.ID, status, 0)
422+
em.withProgressCh(transfer.ID, func(ch chan TransferProgressInfo) {
375423
// unsubscribe finished transfer
376-
close(progressCh)
377-
delete(em.transferSubscriptions, transfer.ID)
378-
}
424+
close(ch)
425+
})
379426

427+
delete(em.transferSubscriptions, transfer.ID)
380428
delete(em.liveTransfers, transfer.ID)
381429
}
382430

@@ -601,7 +649,7 @@ func (em *EventManager) Subscribe(id string) <-chan TransferProgressInfo {
601649
em.mutex.Lock()
602650
defer em.mutex.Unlock()
603651

604-
em.transferSubscriptions[id] = make(chan TransferProgressInfo)
652+
em.transferSubscriptions[id] = make(chan TransferProgressInfo, 32) // use buffered channels, because we don't want to block the event processing
605653

606654
return em.transferSubscriptions[id]
607655
}

0 commit comments

Comments
 (0)