Skip to content

Commit

Permalink
Fix bugs and optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
diiyw committed Apr 17, 2024
1 parent 91c4986 commit c958843
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 57 deletions.
5 changes: 5 additions & 0 deletions examples/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package main

import (
"fmt"
"net/http"
_ "net/http/pprof"

"github.com/diiyw/nodis"
)

func main() {
opt := nodis.DefaultOptions
n := nodis.Open(opt)
go func() {
_ = http.ListenAndServe("0.0.0.0:6060", nil)
}()
if err := n.Serve(":6380"); err != nil {
fmt.Printf("Serve() = %v", err)
}
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
6 changes: 5 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package nodis

import (
"github.com/tidwall/btree"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/tidwall/btree"

"github.com/diiyw/nodis/redis"
)

Expand Down Expand Up @@ -355,6 +356,9 @@ func getString(n *Nodis, cmd redis.Value, args []redis.Value) redis.Value {
}
key := args[0].Bulk
v := n.Get(key)
if v == nil {
return redis.NullValue()
}
return redis.StringValue(string(v))
}

Expand Down
17 changes: 9 additions & 8 deletions key.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (n *Nodis) Expire(key string, seconds int64) int64 {
meta.key.expiration = time.Now().UnixMilli()
}
meta.key.expiration += seconds * 1000
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand All @@ -111,8 +111,8 @@ func (n *Nodis) ExpirePX(key string, milliseconds int64) int64 {
meta.key.expiration = time.Now().UnixMilli()
}
meta.key.expiration += milliseconds
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand All @@ -127,8 +127,8 @@ func (n *Nodis) ExpireNX(key string, seconds int64) int64 {
return 0
}
meta.key.expiration = time.Now().UnixMilli() + seconds*1000
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand All @@ -144,8 +144,8 @@ func (n *Nodis) ExpireXX(key string, seconds int64) int64 {
return 0
}
meta.key.expiration += seconds * 1000
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand Down Expand Up @@ -201,8 +201,8 @@ func (n *Nodis) ExpireAt(key string, timestamp time.Time) int64 {
return 0
}
meta.key.expiration = timestamp.UnixMilli()
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand All @@ -218,8 +218,8 @@ func (n *Nodis) ExpireAtNX(key string, timestamp time.Time) int64 {
return 0
}
meta.key.expiration = timestamp.UnixMilli()
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand All @@ -235,8 +235,8 @@ func (n *Nodis) ExpireAtXX(key string, timestamp time.Time) int64 {
return 0
}
meta.key.expiration = timestamp.UnixMilli()
meta.commit()
n.notify(pb.NewOp(pb.OpType_Expire, key).Expiration(meta.key.expiration))
meta.commit()
return 1
}

Expand Down Expand Up @@ -341,8 +341,9 @@ func (n *Nodis) Type(key string) string {
meta.commit()
return "none"
}
v := ds.DataTypeMap[meta.ds.Type()]
meta.commit()
return ds.DataTypeMap[meta.ds.Type()]
return v
}

// Scan the keys
Expand Down
56 changes: 33 additions & 23 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,57 @@ import (
"github.com/diiyw/nodis/ds"
)

var metaPool = sync.Pool{
New: func() any {
return new(metadata)
},
}

type metadata struct {
locker *sync.RWMutex
key *Key
ds ds.DataStruct
locker *sync.RWMutex
ok bool
writable bool
}

func newEmptyMetadata(locker *sync.RWMutex, writable bool) *metadata {
return &metadata{
locker: locker,
ok: false,
writable: writable,
}
m := metaPool.Get().(*metadata)
m.locker = locker
m.writable = writable
return m
}

func newMetadata(key *Key, d ds.DataStruct, writable bool, locker *sync.RWMutex) *metadata {
meta := &metadata{
locker: locker,
key: key,
ds: d,
writable: writable,
ok: true,
}
return meta
m := metaPool.Get().(*metadata)
m.locker = locker
m.key = key
m.ds = d
m.writable = writable
m.ok = true
return m
}

func (t *metadata) isOk() bool {
return t.ok
func (m *metadata) isOk() bool {
return m.ok
}

func (t *metadata) markChanged() {
if t.ok {
t.key.changed = true
func (m *metadata) markChanged() {
if m.ok {
m.key.changed = true
}
}

func (t *metadata) commit() {
if t.writable {
t.locker.Unlock()
func (m *metadata) commit() {
if m.writable {
m.locker.Unlock()
} else {
t.locker.RUnlock()
m.locker.RUnlock()
}
m.ds = nil
m.locker = nil
m.key = nil
m.ok = false
m.writable = false
metaPool.Put(m)
}
1 change: 1 addition & 0 deletions nodis.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (n *Nodis) notify(ops ...*pb.Op) {
for _, op := range ops {
if w.Matched(op.Key) {
w.Push(op.Operation)
op.Reset()
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions pb/op.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package pb

import sync "sync"

type Op struct {
*Operation
}

var opPool = sync.Pool{
New: func() any {
return &Op{
Operation: &Operation{},
}
},
}

func NewOp(typ OpType, key string) *Op {
return &Op{
Operation: &Operation{
Type: typ,
Key: key,
},
}
op := opPool.Get().(*Op)
op.Type = typ
op.Key = key
return op
}

func (o *Op) Value(v []byte) *Op {
Expand Down Expand Up @@ -125,3 +133,8 @@ func (o *Op) Before(before bool) *Op {
o.Operation.Before = before
return o
}

func (o *Op) Reset() {
o.Operation.Reset()
opPool.Put(o)
}
27 changes: 12 additions & 15 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ import (
)

type store struct {
locks []*sync.RWMutex
keys btree.Map[string, *Key]
values btree.Map[string, ds.DataStruct]
sync.RWMutex
fileSize int64
fileId uint16
path string
current string
indexFile string
aof fs.File
keys btree.Map[string, *Key]
values btree.Map[string, ds.DataStruct]
filesystem fs.Fs
locks []*sync.RWMutex
lockPoolSize int
closed bool
}
Expand Down Expand Up @@ -162,21 +162,18 @@ func (s *store) delKey(key string) {
s.values.Delete(key)
}

func (s *store) fromStorage(key *Key, writable bool, locker *sync.RWMutex) *metadata {
func (s *store) fromStorage(k *Key, writable bool, locker *sync.RWMutex) *metadata {
// try get from storage
v, err := s.getKey(key)
v, err := s.getKey(k)
if err == nil && len(v) > 0 {
key, d, expiration, err := s.parseDs(v)
key, value, err := s.parseDs(v)
if err != nil {
log.Println("Parse DataStruct:", err)
return newEmptyMetadata(locker, writable)
}
if d != nil {
s.values.Set(key, d)
k := newKey()
k.expiration = expiration
s.keys.Set(key, k)
return newMetadata(k, d, writable, locker)
if value != nil {
s.values.Set(key, value)
return newMetadata(k, value, writable, locker)
}
}
return newEmptyMetadata(locker, writable)
Expand All @@ -195,10 +192,10 @@ func (s *store) parseEntry(data []byte) (*pb.Entry, error) {
}

// parseDs the data
func (s *store) parseDs(data []byte) (string, ds.DataStruct, int64, error) {
func (s *store) parseDs(data []byte) (string, ds.DataStruct, error) {
var entity, err = s.parseEntry(data)
if err != nil {
return "", nil, 0, err
return "", nil, err
}
var dataStruct ds.DataStruct
switch ds.DataType(entity.Type) {
Expand All @@ -223,7 +220,7 @@ func (s *store) parseDs(data []byte) (string, ds.DataStruct, int64, error) {
v.SetValue(entity.GetSetValue().Values)
dataStruct = v
}
return entity.Key, dataStruct, entity.Expiration, nil
return entity.Key, dataStruct, nil
}

// flushChanges flush changed keys to disk
Expand Down
2 changes: 1 addition & 1 deletion store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestStore_parseDs(t *testing.T) {
n := Open(opt)
n.Set("test", []byte("test"))
data := n.GetEntry("test")
k, d, _, err := n.store.parseDs(data)
k, d, err := n.store.parseDs(data)
if err != nil {
t.Errorf("parseDs() = %v, want %v", err, nil)
}
Expand Down
2 changes: 1 addition & 1 deletion str.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func (n *Nodis) SetEX(key string, value []byte, seconds int64) {
}
meta.key.expiration += seconds * 1000
meta.ds.(*str.String).Set(value)
meta.commit()
n.notify(pb.NewOp(pb.OpType_Set, key).Value(value).Expiration(meta.key.expiration))
meta.commit()
}

// SetPX set a key with specified expire time, in milliseconds (a positive integer).
Expand Down

0 comments on commit c958843

Please sign in to comment.