Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send one sample event, response per label set in the configured duration #5298

Merged
merged 71 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
19e9cfb
chore: send multiple reports in a single request to reporting
vamsikrishnakandi Nov 6, 2024
5c3cab8
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Nov 7, 2024
1744bbc
chore: default batch size set to 10
vamsikrishnakandi Nov 7, 2024
6eaa4fd
chore: added test
vamsikrishnakandi Nov 7, 2024
ff63e49
fix: failing tests
vamsikrishnakandi Nov 7, 2024
98ed866
fix: failing tests
vamsikrishnakandi Nov 7, 2024
816b213
fix: failing tests
vamsikrishnakandi Nov 7, 2024
0b926dd
fix: failing tests
vamsikrishnakandi Nov 7, 2024
e3185a1
chore: using config from outside
vamsikrishnakandi Nov 11, 2024
676ec7a
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Nov 11, 2024
22a89e8
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Nov 14, 2024
3c645f8
feat: send sample event and response in one report per label set in t…
vamsikrishnakandi Nov 14, 2024
9e6814f
fix: lint
vamsikrishnakandi Nov 14, 2024
b22ffc6
fix: minor
vamsikrishnakandi Nov 14, 2024
bc0acdd
fix: minor
vamsikrishnakandi Nov 15, 2024
e77447d
feat: aggregate reports based on the configured interval before sendi…
itsmihir Nov 26, 2024
7ba6924
chore: rename aggregationInterval to aggregationIntervalMinutes
itsmihir Nov 26, 2024
a7008e4
feat: modify query to get reports based on aggregation time
itsmihir Nov 27, 2024
bd8c140
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir Nov 27, 2024
57454fe
fix: formatting
itsmihir Nov 27, 2024
36021e0
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir Nov 28, 2024
58b4f0a
fix: add aggregation time to the grouping identifiers
itsmihir Nov 28, 2024
308d9ca
Merge branch 'master' into feat.reports-bactching
itsmihir Nov 28, 2024
2b27e81
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir Nov 28, 2024
ff8fdbd
fix: use aggregationIntervalMin value that was used to query the repo…
itsmihir Dec 2, 2024
c1c89e7
chore: rename aggregationInterval to aggregationIntervalMin
itsmihir Dec 2, 2024
ae9fcdf
chore: addressed review comments
vamsikrishnakandi Dec 3, 2024
4af228c
fix: tests
vamsikrishnakandi Dec 3, 2024
8644364
chore: addressed review comments
vamsikrishnakandi Dec 3, 2024
b3ea878
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Dec 3, 2024
35b24a2
Merge branch 'chore.test' into feat.reports-bactching
vamsikrishnakandi Dec 3, 2024
15d3139
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Dec 4, 2024
931ff6c
chore: add validation for aggregationIntervalMinutes
itsmihir Dec 4, 2024
4ef47f5
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir Dec 4, 2024
2d904d6
chore: check if aggregationIntervalMin is less than or equal to 0
itsmihir Dec 4, 2024
dca10bd
chore: add in memory cache implementation
vamsikrishnakandi Dec 4, 2024
11286d6
Merge branch 'feat.reports-bactching' into feat.event-sampling-badger
vamsikrishnakandi Dec 4, 2024
5e0e7e3
fix: minor
vamsikrishnakandi Dec 4, 2024
7ae7513
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi Dec 5, 2024
0c8a3b4
chore: add bucket to label set and send it to reporting
vamsikrishnakandi Dec 5, 2024
5f212c7
chore: addressed review comments
vamsikrishnakandi Dec 5, 2024
84a3cff
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Dec 5, 2024
0b56a89
chore: fix tests
vamsikrishnakandi Dec 5, 2024
cae8d5d
Merge branch 'feat.reports-bactching' of github.com:rudderlabs/rudder…
vamsikrishnakandi Dec 5, 2024
828ae60
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi Dec 5, 2024
10e2931
Merge branch 'feat.reports-bactching' into feat.event-sampling-badger
vamsikrishnakandi Dec 5, 2024
e738f73
fix: tests
vamsikrishnakandi Dec 5, 2024
24671a7
Merge branch 'master' into feature/obs-415-support-aggregation-time-a…
vamsikrishnakandi Dec 5, 2024
6dab700
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi Dec 5, 2024
7492a63
fix: conflicts
vamsikrishnakandi Dec 5, 2024
568207c
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir Dec 5, 2024
c9e235a
feat: round aggregation interval to the nearest factor of 60
itsmihir Dec 6, 2024
da3a2b9
Merge branch 'master' into feature/obs-415-support-aggregation-time-a…
itsmihir Dec 6, 2024
b1bf779
chore: addressed review comments
vamsikrishnakandi Dec 6, 2024
d447a33
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi Dec 6, 2024
177b83f
fix: minor
vamsikrishnakandi Dec 6, 2024
fad3dda
fix: tests
vamsikrishnakandi Dec 6, 2024
6f9665c
chore: addressed review comments
vamsikrishnakandi Dec 6, 2024
c3d0db9
chore: remove bucketStart from getReports and delete query
itsmihir Dec 6, 2024
e74a241
chore: add comment to explain the rounding of intervalMs to the neare…
itsmihir Dec 6, 2024
1e7e49d
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
itsmihir Dec 6, 2024
75b6b6c
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi Dec 6, 2024
8be59d6
fix: tests
vamsikrishnakandi Dec 6, 2024
1507ca7
fix: revert
vamsikrishnakandi Dec 6, 2024
11ee199
Revert "chore: remove bucketStart from getReports and delete query"
itsmihir Dec 6, 2024
06fd221
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi Dec 6, 2024
8b3239b
chore: merge master
vamsikrishnakandi Dec 6, 2024
8428484
chore: addressed review comments
vamsikrishnakandi Dec 6, 2024
8e51ecc
fix: tests
vamsikrishnakandi Dec 6, 2024
fa4f074
chore: addressed review comments
vamsikrishnakandi Dec 6, 2024
b040e13
Merge branch 'master' into feat.event-sampling-badger
vamsikrishnakandi Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package event_sampler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for both badger samples and in memory samplers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are present in event_sampler_test.go


import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
)

type BadgerEventSampler struct {
db *badger.DB
mu sync.Mutex
ttl config.ValueLoader[time.Duration]
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func DefaultPath(pathName string) (string, error) {
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
return "", err
}

Check warning on line 32 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L31-L32

Added lines #L31 - L32 were not covered by tests
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil
}

func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(pathName)
if err != nil || dbPath == "" {
return nil, err
}

Check warning on line 40 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L39-L40

Added lines #L39 - L40 were not covered by tests

opts := badger.DefaultOptions(dbPath).
WithLogger(badgerLogger{log}).
WithCompression(options.None).
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithBlockCacheSize(0).
WithNumVersionsToKeep(1).
WithNumMemtables(conf.GetInt("Reporting.eventSampling.badgerDB.numMemtable", 5)).
WithValueThreshold(conf.GetInt64("Reporting.eventSampling.badgerDB.valueThreshold", 1048576)).
WithNumLevelZeroTables(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTables", 5)).
WithNumLevelZeroTablesStall(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTablesStall", 15)).
WithSyncWrites(conf.GetBool("Reporting.eventSampling.badgerDB.syncWrites", false)).
WithDetectConflicts(conf.GetBool("Reporting.eventSampling.badgerDB.detectConflicts", false))

ctx, cancel := context.WithCancel(ctx)

db, err := badger.Open(opts)

es := &BadgerEventSampler{
db: db,
ttl: ttl,
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
}

if err != nil {
return nil, err
}

Check warning on line 70 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L69-L70

Added lines #L69 - L70 were not covered by tests

es.wg.Add(1)
rruntime.Go(func() {
defer es.wg.Done()
es.gcLoop()
})

return es, nil
}

func (es *BadgerEventSampler) Get(key string) (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()

var found bool

err := es.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}

found = item != nil
return nil
})

if err == badger.ErrKeyNotFound {
return false, nil
} else if err != nil {
return false, err
}

Check warning on line 101 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L100-L101

Added lines #L100 - L101 were not covered by tests

return found, nil
}

func (es *BadgerEventSampler) Put(key string) error {
es.mu.Lock()
defer es.mu.Unlock()

return es.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load())
return txn.SetEntry(entry)
})
}

func (es *BadgerEventSampler) gcLoop() {
for {
select {
case <-es.ctx.Done():
_ = es.db.RunValueLogGC(0.5)
return
case <-time.After(5 * time.Minute):

Check warning on line 122 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L122

Added line #L122 was not covered by tests
}
again:
if es.ctx.Err() != nil {
return
}

Check warning on line 127 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L125-L127

Added lines #L125 - L127 were not covered by tests
// One call would only result in removal of at max one log file.
// As an optimization, you could also immediately re-run it whenever it returns nil error
// (this is why `goto again` is used).
err := es.db.RunValueLogGC(0.5)
if err == nil {
goto again

Check warning on line 133 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L131-L133

Added lines #L131 - L133 were not covered by tests
}
}
}

func (es *BadgerEventSampler) Close() {
es.cancel()
es.wg.Wait()
if es.db != nil {
_ = es.db.Close()
}
}

type badgerLogger struct {
logger.Logger
}

func (badgerLogger) Errorf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)

Check warning on line 151 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L150-L151

Added lines #L150 - L151 were not covered by tests
}

func (badgerLogger) Warningf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)

Check warning on line 155 in enterprise/reporting/event_sampler/badger_event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/badger_event_sampler.go#L154-L155

Added lines #L154 - L155 were not covered by tests
}
45 changes: 45 additions & 0 deletions enterprise/reporting/event_sampler/event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package event_sampler

import (
"context"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

const (
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
BadgerEventSamplerPathName = "/reporting-badger"
)

type EventSampler interface {
Put(key string) error
Get(key string) (bool, error)
Close()
}

func NewEventSampler(
ctx context.Context,
ttl config.ValueLoader[time.Duration],
eventSamplerType config.ValueLoader[string],
eventSamplingCardinality config.ValueLoader[int],
conf *config.Config,
log logger.Logger,
) (es EventSampler, err error) {
switch eventSamplerType.Load() {
case BadgerTypeEventSampler:
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log)
case InMemoryCacheTypeEventSampler:
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality)
default:
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load())
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log)

Check warning on line 38 in enterprise/reporting/event_sampler/event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/event_sampler.go#L36-L38

Added lines #L36 - L38 were not covered by tests
}

if err != nil {
return nil, err
}

Check warning on line 43 in enterprise/reporting/event_sampler/event_sampler.go

View check run for this annotation

Codecov / codecov/patch

enterprise/reporting/event_sampler/event_sampler.go#L42-L43

Added lines #L42 - L43 were not covered by tests
return es, nil
}
178 changes: 178 additions & 0 deletions enterprise/reporting/event_sampler/event_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package event_sampler

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

func TestBadger(t *testing.T) {
ctx := context.Background()
conf := config.New()
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality")
log := logger.NewLogger()

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be present")
es.Close()
})

t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())

es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
defer es.Close()

_ = es.Put("key1")

require.Eventually(t, func() bool {
val1, _ := es.Get("key1")
return !val1
}, 1*time.Second, 50*time.Millisecond)
})
}
vamsikrishnakandi marked this conversation as resolved.
Show resolved Hide resolved

func TestInMemoryCache(t *testing.T) {
ctx := context.Background()
conf := config.New()
eventSamplerType := conf.GetReloadableStringVar("in_memory_cache", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(3, 1, "Reporting.eventSampling.cardinality")
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes")
log := logger.NewLogger()

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be present")
})

t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")

require.Eventually(t, func() bool {
val1, _ := es.Get("key1")
return !val1
}, 1*time.Second, 50*time.Millisecond)
})
vamsikrishnakandi marked this conversation as resolved.
Show resolved Hide resolved

t.Run("should not add keys if length exceeds", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 3000)
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
_ = es.Put("key4")
_ = es.Put("key5")

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")
val5, _ := es.Get("key5")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be added")
assert.False(t, val5, "Expected key5 to not be added")
})
}

func BenchmarkEventSampler(b *testing.B) {
testCases := []struct {
name string
eventSamplerType string
}{
{
name: "Badger",
eventSamplerType: "badger",
},
{
name: "InMemoryCache",
eventSamplerType: "in_memory_cache",
},
}

ctx := context.Background()
conf := config.New()
ttl := conf.GetReloadableDurationVar(1, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("default", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality")
log := logger.NewLogger()

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
conf.Set("Reporting.eventSampling.type", tc.eventSamplerType)

eventSampler, err := NewEventSampler(
ctx,
ttl,
eventSamplerType,
eventSamplingCardinality,
conf,
log,
)
require.NoError(b, err)

b.Run("Put", func(b *testing.B) {
for i := 0; i < b.N; i++ {
key := uuid.New().String()
err := eventSampler.Put(key)
require.NoError(b, err)
}
})

b.Run("Get", func(b *testing.B) {
for i := 0; i < b.N; i++ {
key := uuid.New().String()

err := eventSampler.Put(key)
require.NoError(b, err)

_, err = eventSampler.Get(key)
require.NoError(b, err)
}
})

eventSampler.Close()
})
}
}
Loading
Loading