Skip to content

Commit 68a857c

Browse files
DSET-4449: Do not count dropped buffer as processed (#52)
* DSET-4449: Do not count dropped buffer as processed * Add more tests to verify statistics * Fix the test * Fix failing test in CI/CD * Do not drop event * Improve comment explaining the fix * Update comments * Increase version to 0.13.0
1 parent 696f662 commit 68a857c

File tree

8 files changed

+304
-53
lines changed

8 files changed

+304
-53
lines changed

RELEASE_NOTES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Release Notes
22

3+
## 0.13.0
4+
5+
* fix: do not drop event when buffer is being published
6+
* fix: do not double count dropped buffers
7+
38
## 0.12.0
49

510
* fix: make client shutdown timeout configurable.

pkg/client/add_events.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,12 @@ func (client *DataSetClient) listenAndSendBundlesForKey(key string, ch chan inte
170170
buf = getBuffer(key)
171171
} else {
172172
client.Logger.Error("Cannot add bundle", zap.Error(err))
173-
// TODO: what to do? For now, lets skip it
174-
continue
173+
// TODO: what to do?
174+
// this error happens when the buffer is already publishing
175+
// we can solve it by waiting little bit
176+
// since we are also returning TooMuch we will get new buffer
177+
// few lines below
178+
time.Sleep(10 * time.Millisecond)
175179
}
176180
}
177181

pkg/client/add_events_long_running_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,21 @@ func TestAddEventsManyLogsShouldSucceed(t *testing.T) {
157157
err = sc.Shutdown()
158158
assert.Nil(t, err, err)
159159

160+
stats := sc.Statistics()
161+
assert.Equal(t, uint64(ExpectedLogs), stats.Events.Enqueued())
162+
assert.Equal(t, uint64(ExpectedLogs), stats.Events.Processed())
163+
assert.Equal(t, uint64(0), stats.Events.Waiting())
164+
assert.Equal(t, uint64(0), stats.Events.Dropped())
165+
assert.Equal(t, uint64(0), stats.Events.Broken())
166+
assert.Equal(t, 1.0, stats.Events.SuccessRate())
167+
168+
assert.Equal(t, uint64(0), stats.Buffers.Waiting())
169+
assert.Equal(t, uint64(0), stats.Buffers.Dropped())
170+
assert.Equal(t, uint64(0), stats.Buffers.Broken())
171+
assert.Equal(t, 1.0, stats.Buffers.SuccessRate())
172+
173+
assert.Equal(t, 1.0, stats.Transfer.SuccessRate())
174+
160175
assert.Equal(t, seenKeys, expectedKeys)
161176
assert.Equal(t, int(processedEvents.Load()), int(ExpectedLogs), "processed items")
162177
assert.Equal(t, int(len(seenKeys)), int(ExpectedLogs), "unique items")

pkg/client/add_events_test.go

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,29 @@ func TestAddEventsRetry(t *testing.T) {
135135
assert.Nil(t, err)
136136
err = sc.Shutdown()
137137
assert.Nil(t, err)
138-
139138
assert.True(t, wasSuccessful.Load())
140-
assert.Nil(t, err)
141-
142139
assert.Equal(t, attempt.Load(), succeedInAttempt)
140+
141+
stats := sc.Statistics()
142+
assert.Equal(t, uint64(1), stats.Events.Enqueued())
143+
assert.Equal(t, uint64(1), stats.Events.Processed())
144+
assert.Equal(t, uint64(0), stats.Events.Waiting())
145+
assert.Equal(t, uint64(0), stats.Events.Dropped())
146+
assert.Equal(t, uint64(0), stats.Events.Broken())
147+
assert.Equal(t, 1.0, stats.Events.SuccessRate())
148+
assert.Equal(t, uint64(1), stats.Buffers.Enqueued())
149+
assert.Equal(t, uint64(1), stats.Buffers.Processed())
150+
assert.Equal(t, uint64(0), stats.Buffers.Waiting())
151+
assert.Equal(t, uint64(0), stats.Buffers.Dropped())
152+
assert.Equal(t, uint64(0), stats.Buffers.Broken())
153+
assert.Equal(t, 1.0, stats.Buffers.SuccessRate())
154+
assert.Equal(t, 1.0/float64(succeedInAttempt), stats.Transfer.SuccessRate())
155+
assert.Equal(t, uint64(1), stats.Transfer.BuffersProcessed())
156+
/* TODO: on my Mac it's 337 in GitHub action on ubuntu-latest it's 339
157+
assert.Equal(t, uint64(0x3f3), stats.Transfer.BytesSent())
158+
assert.Equal(t, uint64(0x151), stats.Transfer.BytesAccepted())
159+
assert.Equal(t, 337.0, stats.Transfer.AvgBufferBytes())
160+
*/
143161
}
144162

145163
func TestAddEventsRetryAfterSec(t *testing.T) {
@@ -238,8 +256,11 @@ func TestAddEventsRetryAfterSec(t *testing.T) {
238256
wasSuccessful.Store(false)
239257
assert.Nil(t, err2)
240258
assert.Nil(t, sc.LastError())
241-
// info2 := httpmock.GetCallCountInfo()
242-
// assert.CmpDeeply(info2, map[string]int{"POST https://example.com/api/addEvents": 3})
259+
260+
stats := sc.Statistics()
261+
assert.Equal(t, uint64(2), stats.Buffers.Enqueued())
262+
assert.Equal(t, uint64(0), stats.Buffers.Waiting())
263+
assert.Equal(t, uint64(0), stats.Buffers.Dropped())
243264
}
244265

245266
func TestAddEventsRetryAfterTime(t *testing.T) {
@@ -388,12 +409,27 @@ func TestAddEventsLargeEvent(t *testing.T) {
388409
assert.Nil(t, err)
389410
err = sc.Shutdown()
390411
assert.Nil(t, err)
391-
392412
assert.True(t, wasSuccessful.Load())
393-
assert.Nil(t, err)
394413
assert.Nil(t, sc.LastError())
395-
// info := httpmock.GetCallCountInfo()
396-
// assert.CmpDeeply(info, map[string]int{"POST https://example.com/api/addEvents": 1})
414+
415+
stats := sc.Statistics()
416+
assert.Equal(t, uint64(1), stats.Events.Enqueued())
417+
assert.Equal(t, uint64(1), stats.Events.Processed())
418+
assert.Equal(t, uint64(0), stats.Events.Waiting())
419+
assert.Equal(t, uint64(0), stats.Events.Dropped())
420+
assert.Equal(t, uint64(0), stats.Events.Broken())
421+
assert.Equal(t, 1.0, stats.Events.SuccessRate())
422+
assert.Equal(t, uint64(2), stats.Buffers.Enqueued())
423+
assert.Equal(t, uint64(2), stats.Buffers.Processed())
424+
assert.Equal(t, uint64(0), stats.Buffers.Waiting())
425+
assert.Equal(t, uint64(0), stats.Buffers.Dropped())
426+
assert.Equal(t, uint64(0), stats.Buffers.Broken())
427+
assert.Equal(t, 1.0, stats.Buffers.SuccessRate())
428+
assert.Equal(t, 1.0, stats.Transfer.SuccessRate())
429+
assert.Equal(t, uint64(2), stats.Transfer.BuffersProcessed())
430+
assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesSent())
431+
assert.Equal(t, uint64(0x5f006d), stats.Transfer.BytesAccepted())
432+
assert.Equal(t, 3113014.5, stats.Transfer.AvgBufferBytes())
397433
}
398434

399435
func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
@@ -579,6 +615,21 @@ func TestAddEventsDoNotRetryForever(t *testing.T) {
579615
assert.Nil(t, err)
580616
err = sc.Shutdown()
581617

618+
stats := sc.Statistics()
619+
assert.Equal(t, uint64(1), stats.Events.Enqueued())
620+
assert.Equal(t, uint64(1), stats.Events.Processed())
621+
assert.Equal(t, uint64(0), stats.Events.Waiting())
622+
assert.Equal(t, uint64(0), stats.Events.Dropped())
623+
assert.Equal(t, uint64(0), stats.Events.Broken())
624+
assert.Equal(t, 1.0, stats.Events.SuccessRate())
625+
assert.Equal(t, uint64(1), stats.Buffers.Enqueued())
626+
assert.Equal(t, uint64(0), stats.Buffers.Processed())
627+
assert.Equal(t, uint64(0), stats.Buffers.Waiting())
628+
assert.Equal(t, uint64(1), stats.Buffers.Dropped())
629+
assert.Equal(t, uint64(0), stats.Buffers.Broken())
630+
assert.Equal(t, 0.0, stats.Buffers.SuccessRate())
631+
assert.Equal(t, 0.0, stats.Transfer.SuccessRate())
632+
582633
assert.NotNil(t, err)
583634
assert.Errorf(t, err, "some buffers were dropped during finishing - 1")
584635
assert.GreaterOrEqual(t, attempt.Load(), int32(2))

pkg/client/client.go

Lines changed: 84 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,9 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
307307
client.buffersProcessed.Add(1)
308308
continue
309309
}
310-
client.sendBufferWithRetryPolicy(buf)
311-
client.buffersProcessed.Add(1)
310+
if client.sendBufferWithRetryPolicy(buf) {
311+
client.buffersProcessed.Add(1)
312+
}
312313
} else {
313314
client.Logger.Error(
314315
"Cannot convert message to Buffer",
@@ -322,7 +323,7 @@ func (client *DataSetClient) listenAndSendBufferForSession(session string, ch ch
322323
}
323324

324325
// Sends buffer to DataSet. If not succeeds and try is possible (it retryable), try retry until possible (timeout)
325-
func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
326+
func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) bool {
326327
// Do not use NewExponentialBackOff since it calls Reset and the code here must
327328
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
328329
expBackoff := backoff.ExponentialBackOff{
@@ -347,7 +348,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
347348
lastHttpStatus = HttpErrorHasErrorMessage
348349
client.LastHttpStatus.Store(lastHttpStatus)
349350
client.onBufferDrop(buf, lastHttpStatus, err)
350-
break // exit loop (failed to send buffer)
351+
return false // exit loop (failed to send buffer)
351352
}
352353
lastHttpStatus = HttpErrorCannotConnect
353354
client.LastHttpStatus.Store(lastHttpStatus)
@@ -375,7 +376,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
375376
if isOkStatus(lastHttpStatus) {
376377
// everything was fine, there is no need for retries
377378
client.bytesAPIAccepted.Add(uint64(payloadLen))
378-
break // exit loop (buffer sent)
379+
return true // exit loop (buffer sent)
379380
}
380381

381382
backoffDelay := expBackoff.NextBackOff()
@@ -384,7 +385,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
384385
// throw away the batch
385386
err = fmt.Errorf("max elapsed time expired %w", err)
386387
client.onBufferDrop(buf, lastHttpStatus, err)
387-
break // exit loop (failed to send buffer)
388+
return false // exit loop (failed to send buffer)
388389
}
389390

390391
if isRetryableStatus(lastHttpStatus) {
@@ -406,7 +407,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) {
406407
} else {
407408
err = fmt.Errorf("non recoverable error %w", err)
408409
client.onBufferDrop(buf, lastHttpStatus, err)
409-
break // exit loop (failed to send buffer)
410+
return false // exit loop (failed to send buffer)
410411
}
411412
retryNum++
412413
}
@@ -420,9 +421,8 @@ func (client *DataSetClient) statisticsSweeper() {
420421
}
421422
}
422423

423-
func (client *DataSetClient) logStatistics() {
424-
mb := float64(1024 * 1024)
425-
424+
// Statistics returns statistics about events, buffers processing from the start time
425+
func (client *DataSetClient) Statistics() *Statistics {
426426
// for how long are events being processed
427427
firstAt := time.Unix(0, client.firstReceivedAt.Load())
428428
lastAt := time.Unix(0, client.lastAcceptedAt.Load())
@@ -431,54 +431,100 @@ func (client *DataSetClient) logStatistics() {
431431

432432
// if nothing was processed, do not log statistics
433433
if processingInSec <= 0 {
434-
return
434+
return nil
435435
}
436436

437437
// log buffer stats
438438
bProcessed := client.buffersProcessed.Load()
439439
bEnqueued := client.buffersEnqueued.Load()
440440
bDropped := client.buffersDropped.Load()
441441
bBroken := client.buffersBroken.Load()
442-
client.Logger.Info(
443-
"Buffers' Queue Stats:",
444-
zap.Uint64("processed", bProcessed),
445-
zap.Uint64("enqueued", bEnqueued),
446-
zap.Uint64("dropped", bDropped),
447-
zap.Uint64("broken", bBroken),
448-
zap.Uint64("waiting", bEnqueued-bProcessed-bDropped-bBroken),
449-
zap.Float64("processingS", processingInSec),
450-
)
442+
443+
buffersStats := QueueStats{
444+
bEnqueued,
445+
bProcessed,
446+
bDropped,
447+
bBroken,
448+
processingDur,
449+
}
451450

452451
// log events stats
453452
eProcessed := client.eventsProcessed.Load()
454453
eEnqueued := client.eventsEnqueued.Load()
455454
eDropped := client.eventsDropped.Load()
456455
eBroken := client.eventsBroken.Load()
456+
457+
eventsStats := QueueStats{
458+
eEnqueued,
459+
eProcessed,
460+
eDropped,
461+
eBroken,
462+
processingDur,
463+
}
464+
465+
// log transferred stats
466+
bAPISent := client.bytesAPISent.Load()
467+
bAPIAccepted := client.bytesAPIAccepted.Load()
468+
transferStats := TransferStats{
469+
bAPISent,
470+
bAPIAccepted,
471+
bProcessed,
472+
processingDur,
473+
}
474+
475+
return &Statistics{
476+
Buffers: buffersStats,
477+
Events: eventsStats,
478+
Transfer: transferStats,
479+
}
480+
}
481+
482+
func (client *DataSetClient) logStatistics() {
483+
stats := client.Statistics()
484+
if stats == nil {
485+
return
486+
}
487+
488+
b := stats.Buffers
489+
client.Logger.Info(
490+
"Buffers' Queue Stats:",
491+
zap.Uint64("processed", b.Processed()),
492+
zap.Uint64("enqueued", b.Enqueued()),
493+
zap.Uint64("dropped", b.Dropped()),
494+
zap.Uint64("broken", b.Broken()),
495+
zap.Uint64("waiting", b.Waiting()),
496+
zap.Float64("successRate", b.SuccessRate()),
497+
zap.Float64("processingS", b.ProcessingTime().Seconds()),
498+
zap.Duration("processing", b.ProcessingTime()),
499+
)
500+
501+
// log events stats
502+
e := stats.Events
457503
client.Logger.Info(
458504
"Events' Queue Stats:",
459-
zap.Uint64("processed", eProcessed),
460-
zap.Uint64("enqueued", eEnqueued),
461-
zap.Uint64("dropped", eDropped),
462-
zap.Uint64("broken", eBroken),
463-
zap.Uint64("waiting", eEnqueued-eProcessed-eDropped-eBroken),
464-
zap.Float64("processingS", processingInSec),
505+
zap.Uint64("processed", e.Processed()),
506+
zap.Uint64("enqueued", e.Enqueued()),
507+
zap.Uint64("dropped", e.Dropped()),
508+
zap.Uint64("broken", e.Broken()),
509+
zap.Uint64("waiting", e.Waiting()),
510+
zap.Float64("successRate", e.SuccessRate()),
511+
zap.Float64("processingS", e.ProcessingTime().Seconds()),
512+
zap.Duration("processing", e.ProcessingTime()),
465513
)
466514

467515
// log transferred stats
468-
bAPISent := float64(client.bytesAPISent.Load())
469-
bAPIAccepted := float64(client.bytesAPIAccepted.Load())
470-
throughput := bAPIAccepted / mb / processingInSec
471-
successRate := (bAPIAccepted + 1) / (bAPISent + 1)
472-
perBuffer := (bAPIAccepted) / float64(bProcessed)
516+
mb := float64(1024 * 1024)
517+
t := stats.Transfer
473518
client.Logger.Info(
474519
"Transfer Stats:",
475-
zap.Float64("bytesSentMB", bAPISent/mb),
476-
zap.Float64("bytesAcceptedMB", bAPIAccepted/mb),
477-
zap.Float64("throughputMBpS", throughput),
478-
zap.Float64("perBufferMB", perBuffer/mb),
479-
zap.Float64("successRate", successRate),
480-
zap.Float64("processingS", processingInSec),
481-
zap.Duration("processing", processingDur),
520+
zap.Float64("bytesSentMB", float64(t.BytesSent())/mb),
521+
zap.Float64("bytesAcceptedMB", float64(t.BytesAccepted())/mb),
522+
zap.Float64("throughputMBpS", t.ThroughputBpS()/mb),
523+
zap.Uint64("buffersProcessed", t.BuffersProcessed()),
524+
zap.Float64("perBufferMB", t.AvgBufferBytes()/mb),
525+
zap.Float64("successRate", t.SuccessRate()),
526+
zap.Float64("processingS", t.ProcessingTime().Seconds()),
527+
zap.Duration("processing", t.ProcessingTime()),
482528
)
483529
}
484530

0 commit comments

Comments
 (0)