Skip to content

Commit 731048e

Browse files
DSET-1556: Add test to check that each event is processed once (#12)
* DSET-1556: Add test to check that each event is processed once
1 parent f8ce8c1 commit 731048e

File tree

9 files changed

+350
-81
lines changed

9 files changed

+350
-81
lines changed

.gitignore

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,10 @@
1212
*.out
1313

1414
# Dependency directories (remove the comment below to include it)
15-
# vendor/
15+
vendor/
16+
17+
# log files
18+
out*.log
19+
20+
.idea
21+
coverage*

Makefile

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ endif
2424

2525
GO_BUILD_TAGS=""
2626
GOTEST_OPT?= -race -timeout 300s -parallel 4 -count=1 --tags=$(GO_BUILD_TAGS)
27-
GOTEST_INTEGRATION_OPT?= -race -timeout 360s -parallel 4
27+
GOTEST_LONG_RUNNING_OPT?= -race -timeout 360s -parallel 4 -count=1 -tags=long_running,$(GO_BUILD_TAGS)
2828
GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -coverprofile=coverage.txt -covermode=atomic
29-
GOTEST_OPT_WITH_INTEGRATION=$(GOTEST_INTEGRATION_OPT) -tags=integration,$(GO_BUILD_TAGS) -run=Integration -coverprofile=integration-coverage.txt -covermode=atomic
29+
GOTEST_OPT_WITH_COVERAGE_LONG_RUNNING=$(GOTEST_LONG_RUNNING_OPT) -coverprofile=coverage.txt -covermode=atomic
3030
GOCMD?= go
3131
GOTEST=$(GOCMD) test
3232

@@ -45,9 +45,16 @@ build:
4545
echo "Done"
4646

4747
.PHONY: test
48-
test:
48+
test: test-all
49+
50+
.PHONY: test-unit
51+
test-unit:
4952
$(GOTEST) $(GOTEST_OPT) ./...
5053

54+
.PHONY: test-all
55+
test-all:
56+
$(GOTEST) $(GOTEST_LONG_RUNNING_OPT) ./...
57+
5158
.PHONY: test-many-times
5259
test-many-times:
5360
set -e; \
@@ -58,11 +65,20 @@ test-many-times:
5865
fi; \
5966
for i in `seq 1 $${COUNT}`; do \
6067
echo "Running test $${i} / $${COUNT}"; \
61-
make test; \
68+
make test 2>&1 | tee out-test-$${i}.log; \
6269
done;
6370

6471

6572
.PHONY: coverage
66-
coverage:
73+
coverage: coverage-all
74+
75+
.PHONY: coverage-unit
76+
coverage-unit:
6777
$(GOTEST) $(GOTEST_OPT_WITH_COVERAGE) ./...
6878
$(GOCMD) tool cover -html=coverage.txt -o coverage.html
79+
80+
81+
.PHONY: coverage-all
82+
coverage-all:
83+
$(GOTEST) $(GOTEST_OPT_WITH_COVERAGE_LONG_RUNNING) ./...
84+
$(GOCMD) tool cover -html=coverage.txt -o coverage.html

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Release Notes
22

3+
## 0.0.4 Fix Concurrency Issues
4+
5+
* sometimes not all events have been delivered exactly once
6+
37
## 0.0.3 Fix Data Races
48

59
* fixed [data races](https://go.dev/doc/articles/race_detector)

pkg/buffer/buffer.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package buffer
1919
import (
2020
"encoding/json"
2121
"fmt"
22+
"sync"
2223
"sync/atomic"
2324
"time"
2425

@@ -41,12 +42,17 @@ type Status uint32
4142
const (
4243
Initialising = Status(iota)
4344
Ready
45+
AddingBundles
4446
Publishing
4547
Retrying
4648
)
4749

4850
func (s Status) String() string {
49-
return [...]string{"Initialising", "Ready", "Publishing", "Retrying"}[s]
51+
return [...]string{"Initialising", "Ready", "AddingBundles", "Publishing", "Retrying"}[s]
52+
}
53+
54+
func (s Status) IsActive() bool {
55+
return s == Ready || s == AddingBundles
5056
}
5157

5258
type AddStatus uint8
@@ -74,14 +80,16 @@ type Buffer struct {
7480
Session string
7581
Token string
7682

77-
Status atomic.Uint32
78-
Attempt uint
79-
createdAt atomic.Int64
83+
Attempt uint
84+
createdAt atomic.Int64
85+
status atomic.Uint32
86+
PublishAsap atomic.Bool
8087

8188
sessionInfo *add_events.SessionInfo
8289
threads map[string]*add_events.Thread
8390
logs map[string]*add_events.Log
8491
events []*add_events.Event
92+
dataMutex sync.Mutex
8593

8694
lenSessionInfo int
8795
lenThreads atomic.Int32
@@ -100,8 +108,9 @@ func NewEmptyBuffer(session string, token string) *Buffer {
100108
Id: id,
101109
Session: session,
102110
Token: token,
103-
Status: atomic.Uint32{},
111+
status: atomic.Uint32{},
104112
Attempt: 0,
113+
PublishAsap: atomic.Bool{},
105114
countThreads: atomic.Int32{},
106115
countLogs: atomic.Int32{},
107116
countEvents: atomic.Int32{},
@@ -119,16 +128,17 @@ func NewBuffer(session string, token string, sessionInfo *add_events.SessionInfo
119128
}
120129

121130
func (buffer *Buffer) Initialise(sessionInfo *add_events.SessionInfo) error {
122-
status := Status(buffer.Status.Load())
131+
status := buffer.Status()
123132
if status != Initialising {
124133
panic(fmt.Sprintf("NewBuffer was already initialised: %s", status))
125134
}
126135
buffer.threads = map[string]*add_events.Thread{}
127136
buffer.logs = map[string]*add_events.Log{}
128137
buffer.events = []*add_events.Event{}
138+
buffer.dataMutex = sync.Mutex{}
129139

130140
buffer.createdAt.Store(time.Now().UnixNano())
131-
buffer.Status.Store(uint32(Ready))
141+
buffer.SetStatus(Ready)
132142

133143
err := buffer.SetSessionInfo(sessionInfo)
134144
if err != nil {
@@ -171,8 +181,10 @@ func (buffer *Buffer) SessionInfo() *add_events.SessionInfo {
171181
}
172182

173183
func (buffer *Buffer) AddBundle(bundle *add_events.EventBundle) (AddStatus, error) {
174-
status := Status(buffer.Status.Load())
175-
if status != Ready {
184+
buffer.dataMutex.Lock()
185+
defer buffer.dataMutex.Unlock()
186+
status := buffer.Status()
187+
if status != Ready && status != AddingBundles {
176188
return TooMuch, &NotAcceptingError{status: status}
177189
}
178190
// append thread
@@ -404,7 +416,7 @@ func (buffer *Buffer) ZapStats(fields ...zap.Field) []zap.Field {
404416
res := []zap.Field{
405417
zap.String("uuid", buffer.Id.String()),
406418
zap.String("session", buffer.Session),
407-
zap.Uint32("status", buffer.Status.Load()),
419+
zap.String("status", buffer.Status().String()),
408420
zap.Uint("attempt", buffer.Attempt),
409421
zap.Int32("logs", buffer.countLogs.Load()),
410422
zap.Int32("threads", buffer.countThreads.Load()),
@@ -416,3 +428,15 @@ func (buffer *Buffer) ZapStats(fields ...zap.Field) []zap.Field {
416428
res = append(res, fields...)
417429
return res
418430
}
431+
432+
func (buffer *Buffer) SetStatus(status Status) {
433+
buffer.status.Store(uint32(status))
434+
}
435+
436+
func (buffer *Buffer) Status() Status {
437+
return Status(buffer.status.Load())
438+
}
439+
440+
func (buffer *Buffer) HasStatus(status Status) bool {
441+
return buffer.status.Load() == uint32(status)
442+
}

pkg/client/add_events.go

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -53,48 +53,70 @@ func (client *DataSetClient) AddEvents(bundles []*add_events.EventBundle) error
5353
grouped := client.groupBundles(bundles)
5454

5555
for key, bundles := range grouped {
56+
client.addBundle(key, bundles)
57+
}
58+
59+
return nil
60+
}
5661

62+
func (client *DataSetClient) addBundle(key string, bundles []*add_events.EventBundle) {
63+
// this function has to be called from AddEvents - inner loop
64+
// it assumes that all bundles have the same key
65+
getBuffer := func(key string) *buffer.Buffer {
5766
buf := client.Buffer(key, client.SessionInfo)
67+
// change state to mark that bundles are being added
68+
buf.SetStatus(buffer.AddingBundles)
69+
return buf
70+
}
71+
72+
publish := func(key string, buf *buffer.Buffer) *buffer.Buffer {
73+
client.PublishBuffer(buf)
74+
return getBuffer(key)
75+
}
76+
77+
buf := getBuffer(key)
78+
for _, bundle := range bundles {
79+
added, err := buf.AddBundle(bundle)
80+
if err != nil {
81+
if errors.Is(err, &buffer.NotAcceptingError{}) {
82+
buf = getBuffer(key)
83+
} else {
84+
client.Logger.Error("Cannot add bundle", zap.Error(err))
85+
// TODO: what to do? For now, lets skip it
86+
continue
87+
}
88+
}
89+
90+
if buf.ShouldSendSize() || added == buffer.TooMuch && buf.HasEvents() {
91+
buf = publish(key, buf)
92+
}
5893

59-
for _, bundle := range bundles {
60-
added, err := buf.AddBundle(bundle)
94+
if added == buffer.TooMuch {
95+
added, err = buf.AddBundle(bundle)
6196
if err != nil {
6297
if errors.Is(err, &buffer.NotAcceptingError{}) {
63-
buf = client.Buffer(key, client.SessionInfo)
98+
buf = getBuffer(key)
6499
} else {
65100
client.Logger.Error("Cannot add bundle", zap.Error(err))
66-
// TODO: what to do? For now, lets skip it
67101
continue
68102
}
69103
}
70-
71-
if buf.ShouldSendSize() || added == buffer.TooMuch && buf.HasEvents() {
72-
client.PublishBuffer(buf)
73-
buf = client.Buffer(key, client.SessionInfo)
104+
if buf.ShouldSendSize() {
105+
buf = publish(key, buf)
74106
}
75-
76107
if added == buffer.TooMuch {
77-
added, err = buf.AddBundle(bundle)
78-
if err != nil {
79-
if errors.Is(err, &buffer.NotAcceptingError{}) {
80-
buf = client.Buffer(key, client.SessionInfo)
81-
} else {
82-
client.Logger.Error("Cannot add bundle", zap.Error(err))
83-
continue
84-
}
85-
}
86-
if buf.ShouldSendSize() {
87-
client.PublishBuffer(buf)
88-
buf = client.Buffer(key, client.SessionInfo)
89-
}
90-
if added == buffer.TooMuch {
91-
client.Logger.Fatal("Bundle was not added for second time!", buf.ZapStats()...)
92-
}
108+
client.Logger.Fatal("Bundle was not added for second time!", buf.ZapStats()...)
93109
}
94110
}
95111
}
112+
buf.SetStatus(buffer.Ready)
96113

97-
return nil
114+
// it could happen that the buffer could have been published
115+
// by buffer sweeper, but it was skipped, because we have been
116+
// adding events, so lets check it and publish it if needed
117+
if buf.PublishAsap.Load() {
118+
client.PublishBuffer(buf)
119+
}
98120
}
99121

100122
// IsProcessingData returns True if there are still some unprocessed data.
@@ -223,14 +245,19 @@ func (client *DataSetClient) apiCall(req *http.Request, response response.SetRes
223245
}
224246

225247
func (client *DataSetClient) SendAllAddEventsBuffers() {
248+
buffers := client.getBuffers()
226249
client.Logger.Debug("Send all AddEvents buffers")
227-
client.buffer.Range(func(k, v interface{}) bool {
228-
buf, ok := v.(*buffer.Buffer)
229-
if ok {
230-
client.PublishBuffer(buf)
231-
} else {
232-
client.Logger.Error("Unable to convert message to buffer")
233-
}
234-
return true
235-
})
250+
for _, buf := range buffers {
251+
client.PublishBuffer(buf)
252+
}
253+
}
254+
255+
func (client *DataSetClient) getBuffers() []*buffer.Buffer {
256+
client.buffersMutex.Lock()
257+
defer client.buffersMutex.Unlock()
258+
buffers := make([]*buffer.Buffer, 0)
259+
for _, buf := range client.buffer {
260+
buffers = append(buffers, buf)
261+
}
262+
return buffers
236263
}

0 commit comments

Comments
 (0)