Skip to content

Commit

Permalink
Fix error handling in Redis server and update key metadata pool size
Browse files Browse the repository at this point in the history
  • Loading branch information
diiyw committed Apr 18, 2024
1 parent c958843 commit 880bf89
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 97 deletions.
8 changes: 4 additions & 4 deletions key.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ func (n *Nodis) ExpireLT(key string, seconds int64) int64 {
ms := seconds * 1000
if meta.key.expiration > time.Now().UnixMilli()-ms {
meta.key.expiration -= ms
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}
meta.commit()
Expand All @@ -185,8 +185,8 @@ func (n *Nodis) ExpireGT(key string, seconds int64) int64 {
ms := seconds * 1000
if meta.key.expiration < now+ms {
meta.key.expiration += ms
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}
meta.commit()
Expand Down Expand Up @@ -284,15 +284,15 @@ func (n *Nodis) ExpireAtGT(key string, timestamp time.Time) int64 {
func (n *Nodis) Keys(pattern string) []string {
var keys []string
now := time.Now().UnixMilli()
n.store.RLock()
n.store.mu.Lock()
n.store.keys.Scan(func(key string, k *Key) bool {
matched, _ := filepath.Match(pattern, key)
if matched && !k.expired(now) {
keys = append(keys, key)
}
return true
})
n.store.RUnlock()
n.store.mu.Unlock()
return keys
}

Expand Down
38 changes: 9 additions & 29 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,17 @@ import (
"github.com/diiyw/nodis/ds"
)

var metaPool = sync.Pool{
New: func() any {
return new(metadata)
},
}

type metadata struct {
key *Key
ds ds.DataStruct
locker *sync.RWMutex
ok bool
writable bool
}

func newEmptyMetadata(locker *sync.RWMutex, writable bool) *metadata {
m := metaPool.Get().(*metadata)
m.locker = locker
m.writable = writable
return m
*sync.Mutex
key *Key
ds ds.DataStruct
ok bool
}

func newMetadata(key *Key, d ds.DataStruct, writable bool, locker *sync.RWMutex) *metadata {
m := metaPool.Get().(*metadata)
m.locker = locker
func (m *metadata) set(key *Key, d ds.DataStruct) *metadata {
m.Lock()
m.key = key
m.ds = d
m.writable = writable
m.ok = true
return m
}
Expand All @@ -48,15 +32,11 @@ func (m *metadata) markChanged() {
}

func (m *metadata) commit() {
if m.writable {
m.locker.Unlock()
} else {
m.locker.RUnlock()
if !m.ok {
return
}
m.ds = nil
m.locker = nil
m.key = nil
m.ok = false
m.writable = false
metaPool.Put(m)
m.Unlock()
}
6 changes: 3 additions & 3 deletions nodis.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func Open(opt *Options) *Nodis {
if opt.Filesystem == nil {
opt.Filesystem = &fs.Memory{}
}
if opt.LockPoolSize == 0 {
opt.LockPoolSize = 10240
if opt.MetaPoolSize == 0 {
opt.MetaPoolSize = 10240
}
n := &Nodis{
options: opt,
Expand All @@ -50,7 +50,7 @@ func Open(opt *Options) *Nodis {
} else if !isDir {
panic("Path is not a directory")
}
n.store = newStore(opt.Path, opt.FileSize, opt.LockPoolSize, opt.Filesystem)
n.store = newStore(opt.Path, opt.FileSize, opt.MetaPoolSize, opt.Filesystem)
go func() {
if opt.TidyDuration != 0 {
for {
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type Options struct {
// Synchronizer is the synchronizer to use. The default is nil and no synchronization is performed.
Synchronizer sync.Synchronizer

// LockPoolSize is the key locks pool size
LockPoolSize int
// MetaPoolSize is the key metadata pool size
MetaPoolSize int
}

var DefaultOptions = &Options{
Path: "data",
FileSize: FileSizeGB,
TidyDuration: 60 * time.Second,
LockPoolSize: 10240,
MetaPoolSize: 10240,
}
6 changes: 3 additions & 3 deletions redis/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func Serve(addr string, handler func(cmd Value, args []Value) Value) error {
func handleConn(conn net.Conn, handler func(cmd Value, args []Value) Value) {
writer := NewWriter(conn)
defer func() {
if r := recover(); r != nil {
_ = writer.Write(ErrorValue(r.(error).Error()))
}
// if r := recover(); r != nil {
// _ = writer.Write(ErrorValue(r.(error).Error()))
// }
conn.Close()
}()
resp := NewResp(conn)
Expand Down
107 changes: 55 additions & 52 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,33 @@ import (
)

type store struct {
locks []*sync.RWMutex
keys btree.Map[string, *Key]
values btree.Map[string, ds.DataStruct]
sync.RWMutex
mu sync.Mutex
metaPool []*metadata
keys btree.Map[string, *Key]
values btree.Map[string, ds.DataStruct]
fileSize int64
fileId uint16
path string
current string
indexFile string
aof fs.File
filesystem fs.Fs
lockPoolSize int
metaPoolSize int
closed bool
}

func newStore(path string, fileSize int64, lockPoolSize int, filesystem fs.Fs) *store {
func newStore(path string, fileSize int64, metaPoolSize int, filesystem fs.Fs) *store {
s := &store{
path: path,
fileSize: fileSize,
indexFile: filepath.Join(path, "nodis.index"),
lockPoolSize: lockPoolSize,
locks: make([]*sync.RWMutex, lockPoolSize),
metaPoolSize: metaPoolSize,
metaPool: make([]*metadata, metaPoolSize),
}
for i := 0; i < lockPoolSize; i++ {
s.locks[i] = &sync.RWMutex{}
for i := 0; i < metaPoolSize; i++ {
s.metaPool[i] = &metadata{
Mutex: &sync.Mutex{},
}
}
_ = filesystem.MkdirAll(path)
indexFile, err := filesystem.OpenFile(s.indexFile, os.O_RDWR|os.O_CREATE|os.O_APPEND)
Expand Down Expand Up @@ -97,86 +99,89 @@ func fnv32(key string) uint32 {
return h
}

func (s *store) spread(hashCode uint32) *sync.RWMutex {
tableSize := uint32(s.lockPoolSize)
return s.locks[(tableSize-1)&hashCode]
func (s *store) spread(hashCode uint32) *metadata {
tableSize := uint32(s.metaPoolSize)
return s.metaPool[(tableSize-1)&hashCode]
}

func (s *store) getLocker(key string) *sync.RWMutex {
func (s *store) getMetadata(key string) *metadata {
return s.spread(fnv32(key))
}

func (s *store) writeKey(key string, newFn func() ds.DataStruct) *metadata {
locker := s.getLocker(key)
locker.Lock()
s.mu.Lock()
defer s.mu.Unlock()
meta := s.getMetadata(key)
k, ok := s.keys.Get(key)
if ok {
if k.expired(time.Now().UnixMilli()) {
if newFn == nil {
return newEmptyMetadata(locker, true)
return meta
}
}
d, ok := s.values.Get(key)
if ok {
return newMetadata(k, d, true, locker)
meta.set(k, d)
return meta
}
return s.fromStorage(k, true, locker)
return s.fromStorage(k, meta)
}
if newFn != nil {
s.Lock()
defer s.Unlock()
d := newFn()
if d == nil {
return newEmptyMetadata(locker, true)
return meta
}
k = newKey()
s.keys.Set(key, k)
s.values.Set(key, d)
meta := newMetadata(k, d, true, locker)
meta.set(k, d)
meta.markChanged()
return meta
}
return newEmptyMetadata(locker, true)
return meta
}

func (s *store) readKey(key string) *metadata {
locker := s.getLocker(key)
locker.RLock()
meta := s.getMetadata(key)
k, ok := s.keys.Get(key)
if ok {
if k.expired(time.Now().UnixMilli()) {
return newEmptyMetadata(locker, false)
return meta
}
d, ok := s.values.Get(key)
if !ok {
// read from storage
return s.fromStorage(k, false, locker)
return s.fromStorage(k, meta)
}
return newMetadata(k, d, false, locker)
meta.set(k, d)
return meta
}
return newEmptyMetadata(locker, false)
return meta
}

func (s *store) delKey(key string) {
s.mu.Lock()
s.keys.Delete(key)
s.values.Delete(key)
s.mu.Unlock()
}

func (s *store) fromStorage(k *Key, writable bool, locker *sync.RWMutex) *metadata {
func (s *store) fromStorage(k *Key, meta *metadata) *metadata {
// try get from storage
v, err := s.getKey(k)
if err == nil && len(v) > 0 {
key, value, err := s.parseDs(v)
if err != nil {
log.Println("Parse DataStruct:", err)
return newEmptyMetadata(locker, writable)
return meta
}
if value != nil {
s.values.Set(key, value)
return newMetadata(k, value, writable, locker)
meta.set(k, value)
return meta
}
}
return newEmptyMetadata(locker, writable)
return meta
}

func (s *store) parseEntry(data []byte) (*pb.Entry, error) {
Expand Down Expand Up @@ -227,9 +232,8 @@ func (s *store) parseDs(data []byte) (string, ds.DataStruct, error) {
func (s *store) flushChanges() {
now := time.Now().UnixMilli()
s.keys.Scan(func(key string, k *Key) bool {
locker := s.getLocker(key)
locker.RLock()
defer locker.RUnlock()
meta := s.getMetadata(key)
defer meta.commit()
if !k.changed || k.expired(now) {
return true
}
Expand All @@ -248,19 +252,19 @@ func (s *store) flushChanges() {

// tidy removes expired and unused keys
func (s *store) tidy(ms int64) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
now := time.Now().UnixMilli()
recycleTime := now - ms
s.keys.Scan(func(key string, k *Key) bool {
locker := s.getLocker(key)
locker.Lock()
defer locker.Unlock()
meta := s.getMetadata(key)
defer meta.commit()
if k.expired(now) {
s.delKey(key)
s.keys.Delete(key)
s.values.Delete(key)
return true
}
if k.lastUse != 0 && k.lastUse <= recycleTime {
Expand Down Expand Up @@ -416,8 +420,8 @@ func (s *store) getKey(key *Key) ([]byte, error) {

// snapshot the store
func (s *store) snapshot(path string) {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
snapshotDir := filepath.Join(path, "snapshots", time.Now().Format("20060102_150405"))
err := s.filesystem.MkdirAll(snapshotDir)
if err != nil {
Expand Down Expand Up @@ -450,8 +454,8 @@ func (s *store) snapshot(path string) {

// close the store
func (s *store) close() error {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
s.closed = true
s.flushChanges()
err := s.aof.Close()
Expand All @@ -472,13 +476,12 @@ func (s *store) close() error {
Items: make([]*pb.Index_Item, 0, s.keys.Len()),
}
s.keys.Scan(func(key string, k *Key) bool {
locker := s.getLocker(key)
locker.Lock()
meta := s.getMetadata(key)
indexData.Items = append(indexData.Items, &pb.Index_Item{
Key: key,
Data: k.marshal(),
})
locker.Unlock()
meta.commit()
return true
})
data, err := proto.Marshal(indexData)
Expand All @@ -501,8 +504,8 @@ func (s *store) close() error {

// clear the store
func (s *store) clear() error {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
err := s.aof.Truncate(0)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 880bf89

Please sign in to comment.