@@ -15,43 +15,58 @@ import (
15
15
"github.com/splitio/go-split-commons/v6/provisional/strategy"
16
16
"github.com/splitio/go-split-commons/v6/service/api"
17
17
"github.com/splitio/go-split-commons/v6/storage"
18
+ "github.com/splitio/go-split-commons/v6/storage/filter"
18
19
"github.com/splitio/go-split-commons/v6/storage/inmemory"
19
20
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap"
20
21
"github.com/splitio/go-split-commons/v6/synchronizer"
21
22
"github.com/splitio/go-split-commons/v6/synchronizer/worker/impressionscount"
22
23
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
23
24
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
24
25
"github.com/splitio/go-split-commons/v6/tasks"
26
+ "github.com/splitio/go-split-commons/v6/telemetry"
25
27
"github.com/splitio/go-toolkit/v5/logging"
26
28
)
27
29
30
+ const (
31
+ bfExpectedElemenets = 10000000
32
+ bfFalsePositiveProbability = 0.01
33
+ bfCleaningPeriod = 86400 // 24 hours
34
+ uniqueKeysPeriodTaskInMemory = 900 // 15 min
35
+ uniqueKeysPeriodTaskRedis = 300 // 5 min
36
+ impressionsCountPeriodTaskInMemory = 1800 // 30 min
37
+ impressionsCountPeriodTaskRedis = 300 // 5 min
38
+ impressionsBulkSizeRedis = 100
39
+ )
40
+
28
41
func setupWorkers (
29
42
logger logging.LoggerInterface ,
30
43
api * api.SplitAPI ,
31
44
str * storages ,
32
45
hc application.MonitorProducerInterface ,
33
46
cfg * sdkConf.Config ,
34
47
flagSetsFilter flagsets.FlagSetFilter ,
48
+ md dtos.Metadata ,
49
+ impComponents impComponents ,
35
50
) * synchronizer.Workers {
36
51
return & synchronizer.Workers {
37
- SplitUpdater : split .NewSplitUpdater (str .splits , api .SplitFetcher , logger , str .telemetry , hc , flagSetsFilter ),
38
- SegmentUpdater : segment .NewSegmentUpdater (str .splits , str .segments , api .SegmentFetcher , logger , str .telemetry , hc ),
39
- ImpressionRecorder : workers .NewImpressionsWorker (logger , str .telemetry , api .ImpressionRecorder , str .impressions , & cfg .Impressions ),
40
- EventRecorder : workers .NewEventsWorker (logger , str .telemetry , api .EventRecorder , str .events , & cfg .Events ),
52
+ SplitUpdater : split .NewSplitUpdater (str .splits , api .SplitFetcher , logger , str .telemetry , hc , flagSetsFilter ),
53
+ SegmentUpdater : segment .NewSegmentUpdater (str .splits , str .segments , api .SegmentFetcher , logger , str .telemetry , hc ),
54
+ ImpressionRecorder : workers .NewImpressionsWorker (logger , str .telemetry , api .ImpressionRecorder , str .impressions , & cfg .Impressions ),
55
+ EventRecorder : workers .NewEventsWorker (logger , str .telemetry , api .EventRecorder , str .events , & cfg .Events ),
56
+ ImpressionsCountRecorder : impressionscount .NewRecorderSingle (impComponents .counter , api .ImpressionRecorder , md , logger , str .telemetry ),
57
+ TelemetryRecorder : telemetry .NewTelemetrySynchronizer (str .telemetry , api .TelemetryRecorder , str .splits , str .segments , logger , md , str .telemetry ),
41
58
}
42
59
}
43
60
44
61
func setupTasks (
45
62
cfg * sdkConf.Config ,
46
- str * storages ,
47
63
logger logging.LoggerInterface ,
48
64
workers * synchronizer.Workers ,
49
65
impComponents impComponents ,
50
- md dtos.Metadata ,
51
- api * api.SplitAPI ,
52
66
) * synchronizer.SplitTasks {
53
67
impCfg := cfg .Impressions
54
68
evCfg := cfg .Events
69
+ dummyHC := & application.Dummy {}
55
70
tg := & synchronizer.SplitTasks {
56
71
SplitSyncTask : tasks .NewFetchSplitsTask (workers .SplitUpdater , int (cfg .Splits .SyncPeriod .Seconds ()), logger ),
57
72
SegmentSyncTask : tasks .NewFetchSegmentsTask (
@@ -60,21 +75,18 @@ func setupTasks(
60
75
cfg .Segments .WorkerCount ,
61
76
cfg .Segments .QueueSize ,
62
77
logger ,
78
+ dummyHC ,
63
79
),
64
- ImpressionSyncTask : tasks .NewRecordImpressionsTask (workers .ImpressionRecorder , int (impCfg .SyncPeriod .Seconds ()), logger , 5000 ),
65
- EventSyncTask : tasks .NewRecordEventsTask (workers .EventRecorder , 5000 , int (evCfg .SyncPeriod .Seconds ()), logger ),
66
- TelemetrySyncTask : & NoOpTask {},
67
- UniqueKeysTask : & NoOpTask {},
68
- CleanFilterTask : & NoOpTask {},
69
- ImpsCountConsumerTask : & NoOpTask {},
70
- }
71
-
72
- if impCfg .Mode == "optimized" {
73
- tg .ImpressionsCountSyncTask = tasks .NewRecordImpressionsCountTask (
74
- impressionscount .NewRecorderSingle (impComponents .counter , api .ImpressionRecorder , md , logger , str .telemetry ),
80
+ ImpressionSyncTask : tasks .NewRecordImpressionsTask (workers .ImpressionRecorder , int (impCfg .SyncPeriod .Seconds ()), logger , 5000 ),
81
+ EventSyncTask : tasks .NewRecordEventsTask (workers .EventRecorder , 5000 , int (evCfg .SyncPeriod .Seconds ()), logger ),
82
+ TelemetrySyncTask : & NoOpTask {},
83
+ UniqueKeysTask : tasks .NewRecordUniqueKeysTask (workers .TelemetryRecorder , * impComponents .tracker , uniqueKeysPeriodTaskInMemory , logger ),
84
+ CleanFilterTask : tasks .NewCleanFilterTask (* impComponents .filter , logger , bfCleaningPeriod ),
85
+ ImpsCountConsumerTask : tasks .NewRecordImpressionsCountTask (
86
+ workers .ImpressionsCountRecorder ,
75
87
logger ,
76
88
int (impCfg .CountSyncPeriod .Seconds ()),
77
- )
89
+ ),
78
90
}
79
91
80
92
return tg
@@ -83,6 +95,8 @@ func setupTasks(
83
95
type impComponents struct {
84
96
manager provisional.ImpressionManager
85
97
counter * strategy.ImpressionsCounter
98
+ tracker * strategy.UniqueKeysTracker
99
+ filter * storage.Filter
86
100
}
87
101
88
102
func setupImpressionsComponents (c * sdkConf.Impressions , telemetry storage.TelemetryRuntimeProducer ) (impComponents , error ) {
@@ -92,20 +106,28 @@ func setupImpressionsComponents(c *sdkConf.Impressions, telemetry storage.Teleme
92
106
return impComponents {}, fmt .Errorf ("error building impressions observer: %w" , err )
93
107
}
94
108
109
+ counter := strategy .NewImpressionsCounter ()
110
+ bf := filter .NewBloomFilter (bfExpectedElemenets , bfFalsePositiveProbability )
111
+ tracker := strategy .NewUniqueKeysTracker (bf )
112
+ none := strategy .NewNoneImpl (counter , tracker , false )
113
+
95
114
var s strategy.ProcessStrategyInterface
96
- var counter * strategy.ImpressionsCounter
97
115
switch c .Mode {
98
116
case conf .ImpressionsModeDebug :
99
117
s = strategy .NewDebugImpl (observer , false )
100
118
case conf .ImpressionsModeNone :
119
+ s = none
101
120
default : // optimized
102
- counter = strategy .NewImpressionsCounter ()
103
121
s = strategy .NewOptimizedImpl (observer , counter , telemetry , false )
104
122
}
105
123
124
+ impManager := provisional .NewImpressionManagerImp (none , s )
125
+
106
126
return impComponents {
107
- manager : provisional . NewImpressionManager ( s ) ,
127
+ manager : impManager ,
108
128
counter : counter ,
129
+ tracker : & tracker ,
130
+ filter : & bf ,
109
131
}, nil
110
132
}
111
133
0 commit comments