Skip to content

Commit 92ab3d5

Browse files
committed
fix delete expired keys
1 parent d7ab6f5 commit 92ab3d5

File tree

1 file changed

+31
-40
lines changed

1 file changed

+31
-40
lines changed

db.go

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package rosedb
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
@@ -354,11 +353,10 @@ func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) {
354353
if err != nil {
355354
return false, err
356355
}
357-
value, err := db.checkValue(chunk)
358-
if err != nil {
359-
return false, err
356+
if value := db.checkValue(chunk); value != nil {
357+
return handleFn(key, value)
360358
}
361-
return handleFn(key, value)
359+
return true, nil
362360
})
363361
}
364362

@@ -372,11 +370,10 @@ func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []b
372370
if err != nil {
373371
return false, nil
374372
}
375-
value, err := db.checkValue(chunk)
376-
if err != nil {
377-
return false, err
373+
if value := db.checkValue(chunk); value != nil {
374+
return handleFn(key, value)
378375
}
379-
return handleFn(key, value)
376+
return true, nil
380377
})
381378
}
382379

@@ -390,11 +387,10 @@ func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte)
390387
if err != nil {
391388
return false, nil
392389
}
393-
value, err := db.checkValue(chunk)
394-
if err != nil {
395-
return false, err
390+
if value := db.checkValue(chunk); value != nil {
391+
return handleFn(key, value)
396392
}
397-
return handleFn(key, value)
393+
return true, nil
398394
})
399395
}
400396

@@ -426,11 +422,10 @@ func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) {
426422
if err != nil {
427423
return false, nil
428424
}
429-
value, err := db.checkValue(chunk)
430-
if err != nil {
431-
return false, err
425+
if value := db.checkValue(chunk); value != nil {
426+
return handleFn(key, value)
432427
}
433-
return handleFn(key, value)
428+
return true, nil
434429
})
435430
}
436431

@@ -444,11 +439,10 @@ func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []
444439
if err != nil {
445440
return false, nil
446441
}
447-
value, err := db.checkValue(chunk)
448-
if err != nil {
449-
return false, err
442+
if value := db.checkValue(chunk); value != nil {
443+
return handleFn(key, value)
450444
}
451-
return handleFn(key, value)
445+
return true, nil
452446
})
453447
}
454448

@@ -462,11 +456,10 @@ func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (
462456
if err != nil {
463457
return false, nil
464458
}
465-
value, err := db.checkValue(chunk)
466-
if err != nil {
467-
return false, err
459+
if value := db.checkValue(chunk); value != nil {
460+
return handleFn(key, value)
468461
}
469-
return handleFn(key, value)
462+
return true, nil
470463
})
471464
}
472465

@@ -488,13 +481,13 @@ func (db *DB) DescendKeys(pattern []byte, handleFn func(k []byte) (bool, error))
488481
})
489482
}
490483

491-
func (db *DB) checkValue(chunk []byte) ([]byte, error) {
484+
func (db *DB) checkValue(chunk []byte) []byte {
492485
record := decodeLogRecord(chunk)
493486
now := time.Now().UnixNano()
494-
if record.Type == LogRecordDeleted || record.IsExpired(now) {
495-
return nil, ErrKeyNotFound
487+
if record.Type != LogRecordDeleted && !record.IsExpired(now) {
488+
return record.Value
496489
}
497-
return record.Value, nil
490+
return nil
498491
}
499492

500493
func checkOptions(options Options) error {
@@ -585,21 +578,18 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
585578
db.mu.Lock()
586579
defer db.mu.Unlock()
587580

588-
// set expiration time
581+
// set timeout
589582
ctx, cancel := context.WithTimeout(context.Background(), timeout)
590583
defer cancel()
584+
done := make(chan struct{}, 1)
591585

592-
innerErrs := make([]error, 0) // record anonymous func error
586+
var innerErr error
593587
now := time.Now().UnixNano()
594588
go func(ctx context.Context) {
595589
for {
596-
// get 100 key's positions
590+
// select 100 keys from the db.index
597591
positions := make([]*wal.ChunkPosition, 0, 100)
598592
db.index.AscendGreaterOrEqual(db.expiredCursorKey, func(k []byte, pos *wal.ChunkPosition) (bool, error) {
599-
// filter processed key
600-
if bytes.Compare(k, db.expiredCursorKey) == 0 {
601-
return true, nil
602-
}
603593
positions = append(positions, pos)
604594
if len(positions) >= 100 {
605595
return false, nil
@@ -610,14 +600,16 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
610600
// If keys in the db.index has been traversed, len(positions) will be 0.
611601
if len(positions) == 0 {
612602
db.expiredCursorKey = nil
603+
done <- struct{}{}
613604
return
614605
}
615606

616607
// delete from index if the key is expired.
617608
for _, pos := range positions {
618609
chunk, err := db.dataFiles.Read(pos)
619610
if err != nil {
620-
innerErrs = append(innerErrs, err)
611+
innerErr = err
612+
done <- struct{}{}
621613
return
622614
}
623615
record := decodeLogRecord(chunk)
@@ -631,9 +623,8 @@ func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
631623

632624
select {
633625
case <-ctx.Done():
634-
if len(innerErrs) > 0 {
635-
return innerErrs[0]
636-
}
626+
return innerErr
627+
case <-done:
637628
return nil
638629
}
639630
}

0 commit comments

Comments
 (0)