Skip to content

Commit

Permalink
add filterExpired for ascend/descend keys
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Sep 13, 2023
1 parent ebb47cb commit 93ddbb4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 11 deletions.
45 changes: 38 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte)
}

// AscendKeys calls handleFn for each key in the db in ascending order.
func (db *DB) AscendKeys(pattern []byte, handleFn func(k []byte) (bool, error)) {
// Since our expiry time is stored in the value, if you want to filter expired keys,
// you need to set parameter filterExpired to true. But the performance will be affected.
// Because we need to read the value of each key to determine if it is expired.
func (db *DB) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()

Expand All @@ -404,8 +407,21 @@ func (db *DB) AscendKeys(pattern []byte, handleFn func(k []byte) (bool, error))
reg = regexp.MustCompile(string(pattern))
}

db.index.Ascend(func(key []byte, _ *wal.ChunkPosition) (bool, error) {
db.index.Ascend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if reg == nil || reg.Match(key) {
var invalid bool
if filterExpired {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value == nil {
invalid = true
}
}
if invalid {
return true, nil
}
return handleFn(key)
}
return true, nil
Expand Down Expand Up @@ -464,7 +480,10 @@ func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (
}

// DescendKeys calls handleFn for each key in the db in descending order.
func (db *DB) DescendKeys(pattern []byte, handleFn func(k []byte) (bool, error)) {
// Since our expiry time is stored in the value, if you want to filter expired keys,
// you need to set parameter filterExpired to true. But the performance will be affected.
// Because we need to read the value of each key to determine if it is expired.
func (db *DB) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()

Expand All @@ -473,8 +492,21 @@ func (db *DB) DescendKeys(pattern []byte, handleFn func(k []byte) (bool, error))
reg = regexp.MustCompile(string(pattern))
}

db.index.Descend(func(key []byte, _ *wal.ChunkPosition) (bool, error) {
db.index.Descend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if reg == nil || reg.Match(key) {
var invalid bool
if filterExpired {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value == nil {
invalid = true
}
}
if invalid {
return true, nil
}
return handleFn(key)
}
return true, nil
Expand Down Expand Up @@ -575,9 +607,6 @@ func (db *DB) loadIndexFromWAL() error {
// It is a time-consuming operation, so we need to specify a timeout
// to prevent the DB from being unavailable for a long time.
func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
db.mu.Lock()
defer db.mu.Unlock()

// set timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand All @@ -586,6 +615,8 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
var innerErr error
now := time.Now().UnixNano()
go func(ctx context.Context) {
db.mu.Lock()
defer db.mu.Unlock()
for {
// select 100 keys from the db.index
positions := make([]*wal.ChunkPosition, 0, 100)
Expand Down
57 changes: 55 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestDB_AscendKeys(t *testing.T) {

validate := func(target [][]byte, pattern []byte) {
var keys [][]byte
db.AscendKeys(pattern, func(k []byte) (bool, error) {
db.AscendKeys(pattern, false, func(k []byte) (bool, error) {
keys = append(keys, k)
return true, nil
})
Expand All @@ -370,6 +370,32 @@ func TestDB_AscendKeys(t *testing.T) {
validate([][]byte{[]byte("aacd"), []byte("bbde"), []byte("bcae"), []byte("cdea")}, nil)
}

func TestDB_AscendKeysExpired(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

validate := func(target [][]byte, pattern []byte) {
var keys [][]byte
db.AscendKeys(pattern, true, func(k []byte) (bool, error) {
keys = append(keys, k)
return true, nil
})
assert.Equal(t, keys, target)
}

err = db.PutWithTTL([]byte("bbde"), utils.RandomValue(10), time.Millisecond*500)
assert.Nil(t, err)
err = db.Put([]byte("cdea"), utils.RandomValue(10))
assert.Nil(t, err)
err = db.Put([]byte("bcae"), utils.RandomValue(10))
assert.Nil(t, err)
time.Sleep(time.Millisecond * 600)

validate([][]byte{[]byte("bcae"), []byte("cdea")}, nil)
}

func TestDB_DescendKeys(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
Expand All @@ -381,7 +407,7 @@ func TestDB_DescendKeys(t *testing.T) {

validate := func(target [][]byte, pattern []byte) {
var keys [][]byte
db.DescendKeys(pattern, func(k []byte) (bool, error) {
db.DescendKeys(pattern, false, func(k []byte) (bool, error) {
keys = append(keys, k)
return true, nil
})
Expand All @@ -400,6 +426,33 @@ func TestDB_DescendKeys(t *testing.T) {
validate([][]byte{[]byte("cdea"), []byte("bcae"), []byte("bbde"), []byte("aacd")}, nil)
}

func TestDB_DescendKeysExpired(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
assert.Nil(t, err)
defer destroyDB(db)

validate := func(target [][]byte, pattern []byte) {
var keys [][]byte
db.DescendKeys(pattern, true, func(k []byte) (bool, error) {
keys = append(keys, k)
return true, nil
})
assert.Equal(t, keys, target)
}

err = db.Put([]byte("bbde"), utils.RandomValue(10))
assert.Nil(t, err)
err = db.PutWithTTL([]byte("cdea"), utils.RandomValue(10), time.Millisecond*500)
assert.Nil(t, err)
err = db.PutWithTTL([]byte("bcae"), utils.RandomValue(10), time.Millisecond*500)
assert.Nil(t, err)

time.Sleep(time.Millisecond * 600)

validate([][]byte{[]byte("bbde")}, nil)
}

func TestDB_PutWithTTL(t *testing.T) {
options := DefaultOptions
db, err := Open(options)
Expand Down
4 changes: 2 additions & 2 deletions examples/iterate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
_ = db.Put([]byte("key41"), []byte("value41"))

// iterate all keys in order
db.AscendKeys(nil, func(k []byte) (bool, error) {
db.AscendKeys(nil, true, func(k []byte) (bool, error) {
fmt.Println("key = ", string(k))
return true, nil
})
Expand All @@ -41,7 +41,7 @@ func main() {
})

// iterate all keys in reverse order
db.DescendKeys(nil, func(k []byte) (bool, error) {
db.DescendKeys(nil, true, func(k []byte) (bool, error) {
fmt.Println("key = ", string(k))
return true, nil
})
Expand Down

0 comments on commit 93ddbb4

Please sign in to comment.