-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: send one sample event, response per label set in the configured duration #5298
Merged
Merged
Changes from all commits
Commits
Show all changes
71 commits
Select commit
Hold shift + click to select a range
19e9cfb
chore: send multiple reports in a single request to reporting
vamsikrishnakandi 5c3cab8
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 1744bbc
chore: default batch size set to 10
vamsikrishnakandi 6eaa4fd
chore: added test
vamsikrishnakandi ff63e49
fix: failing tests
vamsikrishnakandi 98ed866
fix: failing tests
vamsikrishnakandi 816b213
fix: failing tests
vamsikrishnakandi 0b926dd
fix: failing tests
vamsikrishnakandi e3185a1
chore: using config from outside
vamsikrishnakandi 676ec7a
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 22a89e8
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 3c645f8
feat: send sample event and response in one report per label set in t…
vamsikrishnakandi 9e6814f
fix: lint
vamsikrishnakandi b22ffc6
fix: minor
vamsikrishnakandi bc0acdd
fix: minor
vamsikrishnakandi e77447d
feat: aggregate reports based on the configured interval before sendi…
itsmihir 7ba6924
chore: rename aggregationInterval to aggregationIntervalMinutes
itsmihir a7008e4
feat: modify query to get reports based on aggregation time
itsmihir bd8c140
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir 57454fe
fix: formatting
itsmihir 36021e0
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir 58b4f0a
fix: add aggregation time to the grouping identifiers
itsmihir 308d9ca
Merge branch 'master' into feat.reports-bactching
itsmihir 2b27e81
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir ff8fdbd
fix: use aggregationIntervalMin value that was used to query the repo…
itsmihir c1c89e7
chore: rename aggregationInterval to aggregationIntervalMin
itsmihir ae9fcdf
chore: addressed review comments
vamsikrishnakandi 4af228c
fix: tests
vamsikrishnakandi 8644364
chore: addressed review comments
vamsikrishnakandi b3ea878
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 35b24a2
Merge branch 'chore.test' into feat.reports-bactching
vamsikrishnakandi 15d3139
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 931ff6c
chore: add validation for aggregationIntervalMinutes
itsmihir 4ef47f5
Merge branch 'feat.reports-bactching' of https://github.com/rudderlab…
itsmihir 2d904d6
chore: check if aggregationIntervalMin is less than or equal to 0
itsmihir dca10bd
chore: add in memory cache implementation
vamsikrishnakandi 11286d6
Merge branch 'feat.reports-bactching' into feat.event-sampling-badger
vamsikrishnakandi 5e0e7e3
fix: minor
vamsikrishnakandi 7ae7513
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi 0c8a3b4
chore: add bucket to label set and send it to reporting
vamsikrishnakandi 5f212c7
chore: addressed review comments
vamsikrishnakandi 84a3cff
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 0b56a89
chore: fix tests
vamsikrishnakandi cae8d5d
Merge branch 'feat.reports-bactching' of github.com:rudderlabs/rudder…
vamsikrishnakandi 828ae60
Merge branch 'master' into feat.reports-bactching
vamsikrishnakandi 10e2931
Merge branch 'feat.reports-bactching' into feat.event-sampling-badger
vamsikrishnakandi e738f73
fix: tests
vamsikrishnakandi 24671a7
Merge branch 'master' into feature/obs-415-support-aggregation-time-a…
vamsikrishnakandi 6dab700
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi 7492a63
fix: conflicts
vamsikrishnakandi 568207c
Merge branch 'master' of https://github.com/rudderlabs/rudder-server …
itsmihir c9e235a
feat: round aggregation interval to the nearest factor of 60
itsmihir da3a2b9
Merge branch 'master' into feature/obs-415-support-aggregation-time-a…
itsmihir b1bf779
chore: addressed review comments
vamsikrishnakandi d447a33
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi 177b83f
fix: minor
vamsikrishnakandi fad3dda
fix: tests
vamsikrishnakandi 6f9665c
chore: addressed review comments
vamsikrishnakandi c3d0db9
chore: remove bucketStart from getReports and delete query
itsmihir e74a241
chore: add comment to explain the rounding of intervalMs to the neare…
itsmihir 1e7e49d
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
itsmihir 75b6b6c
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi 8be59d6
fix: tests
vamsikrishnakandi 1507ca7
fix: revert
vamsikrishnakandi 11ee199
Revert "chore: remove bucketStart from getReports and delete query"
itsmihir 06fd221
Merge branch 'feature/obs-415-support-aggregation-time-as-a-configura…
vamsikrishnakandi 8b3239b
chore: merge master
vamsikrishnakandi 8428484
chore: addressed review comments
vamsikrishnakandi 8e51ecc
fix: tests
vamsikrishnakandi fa4f074
chore: addressed review comments
vamsikrishnakandi b040e13
Merge branch 'master' into feat.event-sampling-badger
vamsikrishnakandi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
156 changes: 156 additions & 0 deletions
156
enterprise/reporting/event_sampler/badger_event_sampler.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
package event_sampler | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/dgraph-io/badger/v4" | ||
"github.com/dgraph-io/badger/v4/options" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/config" | ||
"github.com/rudderlabs/rudder-go-kit/logger" | ||
"github.com/rudderlabs/rudder-server/rruntime" | ||
"github.com/rudderlabs/rudder-server/utils/misc" | ||
) | ||
|
||
type BadgerEventSampler struct { | ||
db *badger.DB | ||
mu sync.Mutex | ||
ttl config.ValueLoader[time.Duration] | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
wg sync.WaitGroup | ||
} | ||
|
||
func DefaultPath(pathName string) (string, error) { | ||
tmpDirPath, err := misc.CreateTMPDIR() | ||
if err != nil { | ||
return "", err | ||
} | ||
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil | ||
} | ||
|
||
func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) { | ||
dbPath, err := DefaultPath(pathName) | ||
if err != nil || dbPath == "" { | ||
return nil, err | ||
} | ||
|
||
opts := badger.DefaultOptions(dbPath). | ||
WithLogger(badgerLogger{log}). | ||
WithCompression(options.None). | ||
WithIndexCacheSize(16 << 20). // 16mb | ||
WithNumGoroutines(1). | ||
WithBlockCacheSize(0). | ||
WithNumVersionsToKeep(1). | ||
WithNumMemtables(conf.GetInt("Reporting.eventSampling.badgerDB.numMemtable", 5)). | ||
WithValueThreshold(conf.GetInt64("Reporting.eventSampling.badgerDB.valueThreshold", 1048576)). | ||
WithNumLevelZeroTables(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTables", 5)). | ||
WithNumLevelZeroTablesStall(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTablesStall", 15)). | ||
WithSyncWrites(conf.GetBool("Reporting.eventSampling.badgerDB.syncWrites", false)). | ||
WithDetectConflicts(conf.GetBool("Reporting.eventSampling.badgerDB.detectConflicts", false)) | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
|
||
db, err := badger.Open(opts) | ||
|
||
es := &BadgerEventSampler{ | ||
db: db, | ||
ttl: ttl, | ||
ctx: ctx, | ||
cancel: cancel, | ||
wg: sync.WaitGroup{}, | ||
} | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
es.wg.Add(1) | ||
rruntime.Go(func() { | ||
defer es.wg.Done() | ||
es.gcLoop() | ||
}) | ||
|
||
return es, nil | ||
} | ||
|
||
func (es *BadgerEventSampler) Get(key string) (bool, error) { | ||
es.mu.Lock() | ||
defer es.mu.Unlock() | ||
|
||
var found bool | ||
|
||
err := es.db.View(func(txn *badger.Txn) error { | ||
item, err := txn.Get([]byte(key)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
found = item != nil | ||
return nil | ||
}) | ||
|
||
if err == badger.ErrKeyNotFound { | ||
return false, nil | ||
} else if err != nil { | ||
return false, err | ||
} | ||
|
||
return found, nil | ||
} | ||
|
||
func (es *BadgerEventSampler) Put(key string) error { | ||
es.mu.Lock() | ||
defer es.mu.Unlock() | ||
|
||
return es.db.Update(func(txn *badger.Txn) error { | ||
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load()) | ||
return txn.SetEntry(entry) | ||
}) | ||
} | ||
|
||
func (es *BadgerEventSampler) gcLoop() { | ||
for { | ||
select { | ||
case <-es.ctx.Done(): | ||
_ = es.db.RunValueLogGC(0.5) | ||
return | ||
case <-time.After(5 * time.Minute): | ||
} | ||
again: | ||
if es.ctx.Err() != nil { | ||
return | ||
} | ||
// One call would only result in removal of at max one log file. | ||
// As an optimization, you could also immediately re-run it whenever it returns nil error | ||
// (this is why `goto again` is used). | ||
err := es.db.RunValueLogGC(0.5) | ||
if err == nil { | ||
goto again | ||
} | ||
} | ||
} | ||
|
||
func (es *BadgerEventSampler) Close() { | ||
es.cancel() | ||
es.wg.Wait() | ||
if es.db != nil { | ||
_ = es.db.Close() | ||
} | ||
} | ||
|
||
type badgerLogger struct { | ||
logger.Logger | ||
} | ||
|
||
func (badgerLogger) Errorf(format string, a ...interface{}) { | ||
_, _ = fmt.Fprintf(os.Stderr, format, a...) | ||
} | ||
|
||
func (badgerLogger) Warningf(format string, a ...interface{}) { | ||
_, _ = fmt.Fprintf(os.Stderr, format, a...) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package event_sampler | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/config" | ||
"github.com/rudderlabs/rudder-go-kit/logger" | ||
) | ||
|
||
const ( | ||
BadgerTypeEventSampler = "badger" | ||
InMemoryCacheTypeEventSampler = "in_memory_cache" | ||
BadgerEventSamplerPathName = "/reporting-badger" | ||
) | ||
|
||
type EventSampler interface { | ||
Put(key string) error | ||
Get(key string) (bool, error) | ||
Close() | ||
} | ||
|
||
func NewEventSampler( | ||
ctx context.Context, | ||
ttl config.ValueLoader[time.Duration], | ||
eventSamplerType config.ValueLoader[string], | ||
eventSamplingCardinality config.ValueLoader[int], | ||
conf *config.Config, | ||
log logger.Logger, | ||
) (es EventSampler, err error) { | ||
switch eventSamplerType.Load() { | ||
case BadgerTypeEventSampler: | ||
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log) | ||
case InMemoryCacheTypeEventSampler: | ||
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality) | ||
default: | ||
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load()) | ||
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log) | ||
} | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
return es, nil | ||
} |
178 changes: 178 additions & 0 deletions
178
enterprise/reporting/event_sampler/event_sampler_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package event_sampler | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/config" | ||
"github.com/rudderlabs/rudder-go-kit/logger" | ||
) | ||
|
||
func TestBadger(t *testing.T) { | ||
ctx := context.Background() | ||
conf := config.New() | ||
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes") | ||
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type") | ||
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality") | ||
log := logger.NewLogger() | ||
|
||
t.Run("should put and get keys", func(t *testing.T) { | ||
assert.Equal(t, 3000*time.Millisecond, ttl.Load()) | ||
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) | ||
_ = es.Put("key1") | ||
_ = es.Put("key2") | ||
_ = es.Put("key3") | ||
val1, _ := es.Get("key1") | ||
val2, _ := es.Get("key2") | ||
val3, _ := es.Get("key3") | ||
val4, _ := es.Get("key4") | ||
|
||
assert.True(t, val1, "Expected key1 to be present") | ||
assert.True(t, val2, "Expected key2 to be present") | ||
assert.True(t, val3, "Expected key3 to be present") | ||
assert.False(t, val4, "Expected key4 to not be present") | ||
es.Close() | ||
}) | ||
|
||
t.Run("should not get evicted keys", func(t *testing.T) { | ||
conf.Set("Reporting.eventSampling.durationInMinutes", 100) | ||
assert.Equal(t, 100*time.Millisecond, ttl.Load()) | ||
|
||
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) | ||
defer es.Close() | ||
|
||
_ = es.Put("key1") | ||
|
||
require.Eventually(t, func() bool { | ||
val1, _ := es.Get("key1") | ||
return !val1 | ||
}, 1*time.Second, 50*time.Millisecond) | ||
}) | ||
} | ||
vamsikrishnakandi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func TestInMemoryCache(t *testing.T) { | ||
ctx := context.Background() | ||
conf := config.New() | ||
eventSamplerType := conf.GetReloadableStringVar("in_memory_cache", "Reporting.eventSampling.type") | ||
eventSamplingCardinality := conf.GetReloadableIntVar(3, 1, "Reporting.eventSampling.cardinality") | ||
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes") | ||
log := logger.NewLogger() | ||
|
||
t.Run("should put and get keys", func(t *testing.T) { | ||
assert.Equal(t, 3000*time.Millisecond, ttl.Load()) | ||
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) | ||
_ = es.Put("key1") | ||
_ = es.Put("key2") | ||
_ = es.Put("key3") | ||
val1, _ := es.Get("key1") | ||
val2, _ := es.Get("key2") | ||
val3, _ := es.Get("key3") | ||
val4, _ := es.Get("key4") | ||
|
||
assert.True(t, val1, "Expected key1 to be present") | ||
assert.True(t, val2, "Expected key2 to be present") | ||
assert.True(t, val3, "Expected key3 to be present") | ||
assert.False(t, val4, "Expected key4 to not be present") | ||
}) | ||
|
||
t.Run("should not get evicted keys", func(t *testing.T) { | ||
conf.Set("Reporting.eventSampling.durationInMinutes", 100) | ||
assert.Equal(t, 100*time.Millisecond, ttl.Load()) | ||
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) | ||
_ = es.Put("key1") | ||
|
||
require.Eventually(t, func() bool { | ||
val1, _ := es.Get("key1") | ||
return !val1 | ||
}, 1*time.Second, 50*time.Millisecond) | ||
}) | ||
vamsikrishnakandi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
t.Run("should not add keys if length exceeds", func(t *testing.T) { | ||
conf.Set("Reporting.eventSampling.durationInMinutes", 3000) | ||
assert.Equal(t, 3000*time.Millisecond, ttl.Load()) | ||
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log) | ||
_ = es.Put("key1") | ||
_ = es.Put("key2") | ||
_ = es.Put("key3") | ||
_ = es.Put("key4") | ||
_ = es.Put("key5") | ||
|
||
val1, _ := es.Get("key1") | ||
val2, _ := es.Get("key2") | ||
val3, _ := es.Get("key3") | ||
val4, _ := es.Get("key4") | ||
val5, _ := es.Get("key5") | ||
|
||
assert.True(t, val1, "Expected key1 to be present") | ||
assert.True(t, val2, "Expected key2 to be present") | ||
assert.True(t, val3, "Expected key3 to be present") | ||
assert.False(t, val4, "Expected key4 to not be added") | ||
assert.False(t, val5, "Expected key5 to not be added") | ||
}) | ||
} | ||
|
||
func BenchmarkEventSampler(b *testing.B) { | ||
testCases := []struct { | ||
name string | ||
eventSamplerType string | ||
}{ | ||
{ | ||
name: "Badger", | ||
eventSamplerType: "badger", | ||
}, | ||
{ | ||
name: "InMemoryCache", | ||
eventSamplerType: "in_memory_cache", | ||
}, | ||
} | ||
|
||
ctx := context.Background() | ||
conf := config.New() | ||
ttl := conf.GetReloadableDurationVar(1, time.Minute, "Reporting.eventSampling.durationInMinutes") | ||
eventSamplerType := conf.GetReloadableStringVar("default", "Reporting.eventSampling.type") | ||
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality") | ||
log := logger.NewLogger() | ||
|
||
for _, tc := range testCases { | ||
b.Run(tc.name, func(b *testing.B) { | ||
conf.Set("Reporting.eventSampling.type", tc.eventSamplerType) | ||
|
||
eventSampler, err := NewEventSampler( | ||
ctx, | ||
ttl, | ||
eventSamplerType, | ||
eventSamplingCardinality, | ||
conf, | ||
log, | ||
) | ||
require.NoError(b, err) | ||
|
||
b.Run("Put", func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
key := uuid.New().String() | ||
err := eventSampler.Put(key) | ||
require.NoError(b, err) | ||
} | ||
}) | ||
|
||
b.Run("Get", func(b *testing.B) { | ||
for i := 0; i < b.N; i++ { | ||
key := uuid.New().String() | ||
|
||
err := eventSampler.Put(key) | ||
require.NoError(b, err) | ||
|
||
_, err = eventSampler.Get(key) | ||
require.NoError(b, err) | ||
} | ||
}) | ||
|
||
eventSampler.Close() | ||
}) | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests for both badger samples and in memory samplers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these are present in
event_sampler_test.go