@@ -25,22 +25,16 @@ type KV interface {
25
25
var _ KV = new (CometKV )
26
26
27
27
type CometKV struct {
28
- mem memtable.IMemtable
29
- sst sst.IO
30
-
31
- localInsertCounter int64
32
- globalLongRangeScanCount atomic.Int64
33
- globalLongRangeScanDuration time.Duration
28
+ mem memtable.IMemtable
29
+ sst sst.IO
30
+ localInsertCounter int64
34
31
}
35
32
36
33
func NewCometKV (ctx context.Context , mTyp memtable.Typ , dTyp sst.Type , gcInterval , ttl , flushInterval time.Duration ) KV {
37
34
kv := CometKV {
38
- mem : NewMemtable (mTyp , gcInterval , ttl , true , ctx ),
39
- sst : sst .NewSstIO (dTyp ),
40
-
41
- localInsertCounter : 0 ,
42
- globalLongRangeScanCount : atomic.Int64 {},
43
- globalLongRangeScanDuration : time .Duration (0 ),
35
+ mem : NewMemtable (mTyp , gcInterval , ttl , true , ctx ),
36
+ sst : sst .NewSstIO (dTyp ),
37
+ localInsertCounter : 0 ,
44
38
}
45
39
kv .startFlushThread (flushInterval , ctx )
46
40
return & kv
@@ -79,6 +73,7 @@ func (c *CometKV) Delete(key string) {
79
73
func (c * CometKV ) Close () {
80
74
c .mem .Close ()
81
75
c .sst .Destroy ()
76
+ c .localInsertCounter = 0
82
77
}
83
78
84
79
func (c * CometKV ) startFlushThread (flushInterval time.Duration , ctx context.Context ) {
@@ -92,17 +87,8 @@ func (c *CometKV) startFlushThread(flushInterval time.Duration, ctx context.Cont
92
87
return
93
88
case <- ticker .C :
94
89
totalInsertsForLongRangeDuration := c .atomicCasLocalInsertCounter ()
95
-
96
- startTs := time .Now ()
97
-
98
90
records := c .mem .Scan ("" , int (totalInsertsForLongRangeDuration ), memtable.ScanOptions {SnapshotTs : time .Now ()})
99
91
_ = c .sst .Create (records )
100
-
101
- endTs := time .Now ()
102
- diff := endTs .Sub (startTs )
103
-
104
- c .globalLongRangeScanCount .Add (int64 (len (records )))
105
- c .globalLongRangeScanDuration += diff
106
92
}
107
93
}
108
94
}()
@@ -122,6 +108,7 @@ func (c *CometKV) atomicCasLocalInsertCounter() int64 {
122
108
func (c * CometKV ) MemTableName () string {
123
109
return c .mem .Name ()
124
110
}
111
+
125
112
func (c * CometKV ) SstStorageName () string {
126
113
return c .sst .Name ()
127
114
}
0 commit comments