diff --git a/ds/ds.go b/ds/ds.go index 0c6e750..49117df 100644 --- a/ds/ds.go +++ b/ds/ds.go @@ -1,5 +1,13 @@ package ds +import ( + "errors" +) + +var ( + ErrCorruptedData = errors.New("corrupted data") +) + type Key struct { Name string Expiration int64 diff --git a/examples/watch/client/main.go b/examples/watch/client/main.go index 700ce00..5abd458 100644 --- a/examples/watch/client/main.go +++ b/examples/watch/client/main.go @@ -2,14 +2,13 @@ package main import ( "fmt" + "github.com/diiyw/nodis" - "github.com/diiyw/nodis/fs" "github.com/diiyw/nodis/patch" ) func main() { var opt = nodis.DefaultOptions - opt.Filesystem = &fs.Memory{} opt.Synchronizer = nodis.NewWebsocket() n := nodis.Open(opt) n.WatchKey([]string{"*"}, func(op patch.Op) { diff --git a/examples/watch/server/main.go b/examples/watch/server/main.go index bbe0583..8a98b28 100644 --- a/examples/watch/server/main.go +++ b/examples/watch/server/main.go @@ -2,9 +2,10 @@ package main import ( "fmt" + "time" + "github.com/diiyw/nodis" "github.com/diiyw/nodis/patch" - "time" ) func main() { @@ -20,7 +21,7 @@ func main() { n.Set("test", []byte(time.Now().Format("2006-01-02 15:04:05")), false) } }() - err := n.Publish("127.0.0.1:6380", []string{"*"}) + err := n.Broadcast("127.0.0.1:6380", []string{"*"}) if err != nil { panic(err) } diff --git a/fs/disk.go b/fs/disk.go deleted file mode 100644 index 7ae3f8b..0000000 --- a/fs/disk.go +++ /dev/null @@ -1,106 +0,0 @@ -//go:build !js - -package fs - -import ( - "io" - "os" -) - -type Disk struct { -} - -type DiskFile struct { - *os.File - flag int -} - -func (d *Disk) OpenFile(filename string, flag int) (File, error) { - fi, err := os.OpenFile(filename, flag, 0644) - if err != nil { - return nil, err - } - return &DiskFile{File: fi, flag: flag}, nil -} - -func (d *Disk) MkdirAll(path string) error { - return os.MkdirAll(path, 0755) -} - -func (d *Disk) Rename(oldpath, newpath string) error { - return os.Rename(oldpath, newpath) -} - -func (d *Disk) IsDir(path string) (bool, error) { - fi, err := os.Stat(path) - if err != nil { - return false, err - } - return fi.IsDir(), nil -} - -func (d *Disk) RemoveAll(path string) error { - return os.RemoveAll(path) -} - -func (d *Disk) Remove(filename string) error { - return os.Remove(filename) -} - -func (d *DiskFile) ReadAt(b []byte, off int64) (n int, err error) { - n, err = d.File.ReadAt(b, off) - if err == io.EOF { - err = nil - } - return n, err -} - -func (d *DiskFile) Write(b []byte) (n int, err error) { - return d.File.Write(b) -} - -func (d *DiskFile) Close() error { - err := d.File.Sync() - if err != nil { - return err - } - return d.File.Close() -} - -func (d *DiskFile) FileSize() (int64, error) { - fi, err := d.File.Stat() - if err != nil { - return 0, err - } - return fi.Size(), nil -} - -func (d *DiskFile) Truncate(size int64) error { - err := d.File.Close() - if err != nil { - return err - } - err = os.Truncate(d.Name(), size) - if err != nil { - return err - } - d.File, err = os.OpenFile(d.Name(), d.flag, 0644) - return err -} - -func (d *DiskFile) ReadAll() ([]byte, error) { - l, err := d.FileSize() - if err != nil { - return nil, err - } - size := int(l) - if size == 0 { - return nil, nil - } - data := make([]byte, size) - _, err = d.ReadAt(data, 0) - if err != nil { - return nil, err - } - return data, nil -} diff --git a/fs/fs.go b/fs/fs.go deleted file mode 100644 index 4efb197..0000000 --- a/fs/fs.go +++ /dev/null @@ -1,21 +0,0 @@ -package fs - -type Fs interface { - OpenFile(filename string, flag int) (File, error) - MkdirAll(path string) error - Rename(oldpath, newpath string) error - // IsDir if not exist, return false whith os.ErrNotExist - IsDir(path string) (bool, error) - RemoveAll(path string) error - Remove(filename string) error -} - -type File interface { - ReadAt(b []byte, off int64) (n int, err error) - Write(b []byte) (n int, err error) - WriteAt(b []byte, off int64) (n int, err error) - Close() error - FileSize() (int64, error) - Truncate(size int64) error - ReadAll() ([]byte, error) -} diff --git a/fs/memory.go b/fs/memory.go deleted file mode 100644 index f4ed740..0000000 --- a/fs/memory.go +++ /dev/null @@ -1,116 +0,0 @@ -package fs - -import ( - "sync" - - "github.com/tidwall/btree" -) - -type Memory struct { - sync.RWMutex - files btree.Map[string, *MemoryFile] -} - -func (m *Memory) OpenFile(filename string, flag int) (File, error) { - m.Lock() - defer m.Unlock() - if f, ok := m.files.Get(filename); ok { - return f, nil - } - mf := &MemoryFile{data: make([]byte, 0, 2048)} - m.files.Set(filename, mf) - return mf, nil -} - -func (m *Memory) MkdirAll(path string) error { - return nil -} - -func (m *Memory) Rename(oldpath, newpath string) error { - m.Lock() - defer m.Unlock() - if f, ok := m.files.Get(oldpath); ok { - m.files.Set(newpath, f) - m.files.Delete(oldpath) - } - return nil -} - -func (m *Memory) IsDir(path string) (bool, error) { - return true, nil -} - -func (m *Memory) RemoveAll(path string) error { - m.Lock() - defer m.Unlock() - m.files.Clear() - return nil -} - -func (m *Memory) Remove(filename string) error { - m.Lock() - defer m.Unlock() - m.files.Delete(filename) - return nil -} - -type MemoryFile struct { - data []byte -} - -func (m *MemoryFile) ReadAt(b []byte, off int64) (n int, err error) { - if off >= int64(len(m.data)) { - return 0, nil - } - return copy(b, m.data[off:]), nil -} - -func (m *MemoryFile) Write(b []byte) (n int, err error) { - m.data = append(m.data, b...) - return len(b), nil -} - -func (m *MemoryFile) WriteAt(b []byte, off int64) (n int, err error) { - if off < 0 { - return 0, nil - } - dataLen := int64(len(m.data)) - writeLen := int64(len(b)) - if off >= dataLen { - m.data = append(m.data, b...) - return len(b), nil - } - if off+int64(len(b)) > int64(len(m.data)) { - m.data = append(m.data, make([]byte, off+writeLen-dataLen)...) - } - return copy(m.data[off:], b), nil -} - -func (m *MemoryFile) Close() error { - return nil -} - -func (m *MemoryFile) FileSize() (int64, error) { - return int64(len(m.data)), nil -} - -func (m *MemoryFile) Truncate(size int64) error { - if size < 0 { - return nil - } - if size == 0 { - m.data = make([]byte, 0, 2048) - return nil - } - dataLen := int64(len(m.data)) - if size < dataLen { - m.data = m.data[:size] - return nil - } - m.data = append(m.data, make([]byte, size-dataLen)...) - return nil -} - -func (m *MemoryFile) ReadAll() ([]byte, error) { - return m.data, nil -} diff --git a/internal/notifier/notifier.go b/internal/listener/listener.go similarity index 66% rename from internal/notifier/notifier.go rename to internal/listener/listener.go index 8312336..619b130 100644 --- a/internal/notifier/notifier.go +++ b/internal/listener/listener.go @@ -1,24 +1,25 @@ -package notifier +package listener import ( - "github.com/diiyw/nodis/patch" "path/filepath" + + "github.com/diiyw/nodis/patch" ) -type Notifier struct { +type Listener struct { pattern []string fn func(op patch.Op) } -func New(pattern []string, fn func(op patch.Op)) *Notifier { - return &Notifier{ +func New(pattern []string, fn func(op patch.Op)) *Listener { + return &Listener{ pattern: pattern, fn: fn, } } // Matched checks if the key matches the pattern -func (w *Notifier) Matched(key string) bool { +func (w *Listener) Matched(key string) bool { for _, p := range w.pattern { matched, err := filepath.Match(p, key) if err != nil { @@ -32,6 +33,6 @@ func (w *Notifier) Matched(key string) bool { } // Push sends the operation to the watcher -func (w *Notifier) Push(op patch.Op) { +func (w *Listener) Push(op patch.Op) { w.fn(op) } diff --git a/internal/notifier/notifier_test.go b/internal/listener/listener_test.go similarity index 69% rename from internal/notifier/notifier_test.go rename to internal/listener/listener_test.go index 2288413..dfad656 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/listener/listener_test.go @@ -1,4 +1,4 @@ -package notifier +package listener import ( "testing" @@ -6,18 +6,18 @@ import ( "github.com/diiyw/nodis/patch" ) -func TestNotifier_Matched(t *testing.T) { +func TestListener_Matched(t *testing.T) { pattern := []string{"test"} w := New(pattern, nil) if w == nil { - t.Errorf("NewNotifier() = %v, want %v", w, "Notifier{}") + t.Errorf("NewListener() = %v, want %v", w, "Listener{}") } if !w.Matched("test") { t.Errorf("Matched() = %v, want %v", false, true) } } -func TestNotifier_Push(t *testing.T) { +func TestListener_Push(t *testing.T) { pattern := []string{"test"} w := New(pattern, func(op patch.Op) { if op.Data.GetKey() != "test" { @@ -25,7 +25,7 @@ func TestNotifier_Push(t *testing.T) { } }) if w == nil { - t.Errorf("NewNotifier() = %v, want %v", w, "Notifier{}") + t.Errorf("NewListener() = %v, want %v", w, "Listener{}") } w.Push(patch.Op{patch.OpTypeSet, &patch.OpSet{Key: "test", Value: []byte("test")}}) } diff --git a/key.go b/key.go index e5152e5..85775d1 100644 --- a/key.go +++ b/key.go @@ -2,16 +2,16 @@ package nodis import ( "errors" - "github.com/diiyw/nodis/ds" - "github.com/diiyw/nodis/ds/list" - "github.com/diiyw/nodis/redis" - "github.com/diiyw/nodis/storage" "math/rand" "path/filepath" "runtime" "sync" "time" + "github.com/diiyw/nodis/ds" + "github.com/diiyw/nodis/ds/list" + "github.com/diiyw/nodis/redis" + "github.com/diiyw/nodis/patch" ) @@ -449,7 +449,7 @@ func (n *Nodis) Rename(key, dstKey string) error { tx.delKey(key) if !dstMeta.isOk() { dstMeta.RWMutex = new(sync.RWMutex) - dstMeta.key = storage.NewKey(dstKey, meta.key.Expiration) + dstMeta.key = ds.NewKey(dstKey, meta.key.Expiration) n.store.mu.Lock() n.store.metadata.Set(dstKey, dstMeta) n.store.mu.Unlock() @@ -477,7 +477,7 @@ func (n *Nodis) RenameNX(key, dstKey string) error { } tx.delKey(key) dstMeta.RWMutex = new(sync.RWMutex) - dstMeta.key = storage.NewKey(dstKey, meta.key.Expiration) + dstMeta.key = ds.NewKey(dstKey, meta.key.Expiration) dstMeta.setValue(meta.value) n.store.mu.Lock() n.store.metadata.Set(dstKey, dstMeta) diff --git a/nodis.go b/nodis.go index ec73fab..104bbad 100644 --- a/nodis.go +++ b/nodis.go @@ -2,7 +2,6 @@ package nodis import ( "errors" - "github.com/diiyw/nodis/storage" "log" "os" "os/signal" @@ -10,8 +9,10 @@ import ( "syscall" "time" + "github.com/diiyw/nodis/storage" + "github.com/diiyw/nodis/ds/list" - "github.com/diiyw/nodis/internal/notifier" + "github.com/diiyw/nodis/internal/listener" "github.com/diiyw/nodis/patch" "github.com/diiyw/nodis/redis" "github.com/tidwall/btree" @@ -23,7 +24,7 @@ var ( type Nodis struct { store *store - notifiers []*notifier.Notifier + listeners []*listener.Listener blockingKeysMutex sync.RWMutex blockingKeys btree.Map[string, *list.LinkedListG[chan string]] // blocking keys options *Options @@ -79,11 +80,11 @@ func (n *Nodis) Clear() { } func (n *Nodis) notify(f func() []patch.Op) { - if len(n.notifiers) == 0 { + if len(n.listeners) == 0 { return } go func() { - for _, w := range n.notifiers { + for _, w := range n.listeners { for _, op := range f() { if w.Matched(op.Data.GetKey()) { w.Push(op) @@ -94,18 +95,18 @@ func (n *Nodis) notify(f func() []patch.Op) { } func (n *Nodis) WatchKey(pattern []string, fn func(op patch.Op)) int { - w := notifier.New(pattern, fn) - n.notifiers = append(n.notifiers, w) - return len(n.notifiers) - 1 + w := listener.New(pattern, fn) + n.listeners = append(n.listeners, w) + return len(n.listeners) - 1 } func (n *Nodis) UnWatchKey(id int) { - n.notifiers = append(n.notifiers[:id], n.notifiers[id+1:]...) + n.listeners = append(n.listeners[:id], n.listeners[id+1:]...) } -func (n *Nodis) Patch(ops ...patch.Op) error { +func (n *Nodis) ApplyPatch(ops ...patch.Op) error { for _, op := range ops { - err := n.patch(op) + err := n.appylyPatch(op) if err != nil { return err } @@ -113,7 +114,7 @@ func (n *Nodis) Patch(ops ...patch.Op) error { return nil } -func (n *Nodis) patch(p patch.Op) error { +func (n *Nodis) appylyPatch(p patch.Op) error { switch op := p.Data.(type) { case *patch.OpClear: n.Clear() @@ -185,7 +186,7 @@ func (n *Nodis) patch(p patch.Op) error { return nil } -func (n *Nodis) Publish(addr string, pattern []string) error { +func (n *Nodis) Broadcast(addr string, pattern []string) error { return n.options.Synchronizer.Publish(addr, func(s SyncConn) { id := n.WatchKey(pattern, func(op patch.Op) { err := s.Send(op) @@ -200,7 +201,7 @@ func (n *Nodis) Publish(addr string, pattern []string) error { func (n *Nodis) Subscribe(addr string) error { return n.options.Synchronizer.Subscribe(addr, func(o patch.Op) { - n.Patch(o) + n.ApplyPatch(o) }) } diff --git a/nodis_test.go b/nodis_test.go index 59b0730..9d20abf 100644 --- a/nodis_test.go +++ b/nodis_test.go @@ -149,7 +149,7 @@ func TestNodis_Patch(t *testing.T) { }, } n := Open(opt) - err := n.Patch(ops...) + err := n.ApplyPatch(ops...) if err != nil { t.Errorf("Patch() = %v, want %v", err, nil) } diff --git a/storage/entry.go b/storage/entry.go index 0164787..690f724 100644 --- a/storage/entry.go +++ b/storage/entry.go @@ -21,19 +21,15 @@ var ( type ValueEntry struct { Type uint8 Expiration int64 - Key string Value []byte } func (v *ValueEntry) encode() []byte { - var keyLen = len(v.Key) - var b = make([]byte, 1+8+1+keyLen+len(v.Value)) + var b = make([]byte, 1+8+len(v.Value)) b[0] = v.Type n := binary.PutVarint(b[1:], v.Expiration) - b[n+1+1] = byte(keyLen) - copy(b[n+1+1+1:], v.Key) - copy(b[n+1+1+1+keyLen:], v.Value) - b = b[:n+1+1+1+keyLen+len(v.Value)] + copy(b[n+1:], v.Value) + b = b[:n+1+len(v.Value)] c32 := crc32.ChecksumIEEE(b) var buf = make([]byte, len(b)+4) binary.LittleEndian.PutUint32(buf, c32) @@ -53,20 +49,13 @@ func (v *ValueEntry) decode(b []byte) error { v.Type = b[0] i, n := binary.Varint(b[1:]) v.Expiration = i - // type+Expiration+keyIndex - keyLen := int(b[1+n+1]) - if len(b) < keyLen { - return ErrCorruptedData - } - v.Key = string(b[1+n+1+1 : 1+n+1+1+keyLen]) - v.Value = b[1+n+1+1+keyLen:] + v.Value = b[n+1:] return nil } // NewValueEntry creates a new entity -func NewValueEntry(key string, v ds.Value, expiration int64) *ValueEntry { +func NewValueEntry(v ds.Value, expiration int64) *ValueEntry { e := &ValueEntry{ - Key: key, Expiration: expiration, Type: uint8(v.Type()), } @@ -82,10 +71,10 @@ func parseValueEntry(data []byte) (*ValueEntry, error) { return entry, nil } -func parseValue(data []byte) (ds.Value, error) { +func parseValue(data []byte) (*ValueEntry, ds.Value, error) { var entry, err = parseValueEntry(data) if err != nil { - return nil, err + return nil, nil, err } var value ds.Value switch ds.ValueType(entry.Type) { @@ -112,5 +101,5 @@ func parseValue(data []byte) (ds.Value, error) { default: panic("unhandled default case") } - return value, nil + return entry, value, nil } diff --git a/storage/entry_test.go b/storage/entry_test.go new file mode 100644 index 0000000..74178e3 --- /dev/null +++ b/storage/entry_test.go @@ -0,0 +1,61 @@ +package storage + +import ( + "reflect" + "testing" + + "github.com/diiyw/nodis/ds" + "github.com/diiyw/nodis/ds/str" +) + +func TestValueEntry_EncodeDecode(t *testing.T) { + value := str.NewString() + value.Set([]byte("test value")) + expiration := int64(1234567890) + entry := NewValueEntry(value, expiration) + + encoded := entry.encode() + + decoded := &ValueEntry{} + err := decoded.decode(encoded) + if err != nil { + t.Errorf("decode failed: %v", err) + } + + if decoded.Type != entry.Type { + t.Errorf("decoded Type = %v, want %v", decoded.Type, entry.Type) + } + + if decoded.Expiration != entry.Expiration { + t.Errorf("decoded Expiration = %v, want %v", decoded.Expiration, entry.Expiration) + } + + if !reflect.DeepEqual(decoded.Value, entry.Value) { + t.Errorf("decoded Value = %v, want %v", decoded.Value, entry.Value) + } +} + +func TestParseValue(t *testing.T) { + value := str.NewString() + value.Set([]byte("test value")) + expiration := int64(1234567890) + entry := NewValueEntry(value, expiration) + encoded := entry.encode() + + parsedEntry, parsedValue, err := parseValue(encoded) + if err != nil { + t.Errorf("parseValue failed: %v", err) + } + + if !reflect.DeepEqual(parsedEntry, entry) { + t.Errorf("parsedEntry = %v, want %v", parsedEntry, entry) + } + + if parsedValue.Type() != ds.ValueType(entry.Type) { + t.Errorf("parsedValue.Type() = %v, want %v", parsedValue.Type(), entry.Type) + } + + if !reflect.DeepEqual(parsedValue.GetValue(), entry.Value) { + t.Errorf("parsedValue.GetValue() = %v, want %v", parsedValue.GetValue(), entry.Value) + } +} diff --git a/storage/memory.go b/storage/memory.go index 0a71fe5..13e68cb 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -7,10 +7,18 @@ import ( "github.com/tidwall/btree" ) +type KeyValue struct { + key *ds.Key + value ds.Value +} + type Memory struct { sync.RWMutex - keys btree.Map[string, *Key] - values btree.Map[string, ds.Value] + data btree.Map[string, KeyValue] +} + +func NewMemory() *Memory { + return &Memory{} } // Open initializes the storage. @@ -22,19 +30,21 @@ func (m *Memory) Open() error { func (m *Memory) Get(key string) (ds.Value, error) { m.RLock() defer m.RUnlock() - v, ok := m.values.Get(key) + v, ok := m.data.Get(key) if !ok { return nil, ErrKeyNotFound } - return v, nil + return v.value, nil } // Put sets a value in the storage. -func (m *Memory) Put(key *Key, value ds.Value) error { +func (m *Memory) Put(key *ds.Key, value ds.Value) error { m.Lock() defer m.Unlock() - m.values.Set(key.Name, value) - m.keys.Set(key.Name, key) + m.data.Set(key.Name, KeyValue{ + key: key, + value: value, + }) return nil } @@ -42,8 +52,7 @@ func (m *Memory) Put(key *Key, value ds.Value) error { func (m *Memory) Delete(key string) error { m.Lock() defer m.Unlock() - m.values.Delete(key) - m.keys.Delete(key) + m.data.Delete(key) return nil } @@ -58,19 +67,18 @@ func (m *Memory) Snapshot() error { } // ScanKeys returns the keys in the storage. -func (m *Memory) ScanKeys(f func(*Key) bool) { +func (m *Memory) ScanKeys(f func(*ds.Key) bool) { m.RLock() defer m.RUnlock() - m.keys.Scan(func(key string, value *Key) bool { - return f(value) + m.data.Scan(func(_ string, kv KeyValue) bool { + return f(kv.key) }) } -// Reset clears the storage. -func (m *Memory) Reset() error { +// Clear the storage. +func (m *Memory) Clear() error { m.Lock() defer m.Unlock() - m.values.Clear() - m.keys.Clear() + m.data.Clear() return nil } diff --git a/storage/pebble.go b/storage/pebble.go index c166d00..e156763 100644 --- a/storage/pebble.go +++ b/storage/pebble.go @@ -1,12 +1,17 @@ package storage import ( + "os" + "path/filepath" + "time" + "github.com/cockroachdb/pebble" "github.com/diiyw/nodis/ds" ) type Pebble struct { - db *pebble.DB + path string + db *pebble.DB } func (p *Pebble) Open(path string) error { @@ -14,6 +19,7 @@ func (p *Pebble) Open(path string) error { if err != nil { return err } + p.path = path p.db = db return nil } @@ -25,16 +31,13 @@ func (p *Pebble) Get(key string) (ds.Value, error) { return nil, err } defer closer.Close() - entry, err := parseValueEntry(v) - if err != nil { - return nil, err - } - return parseValue(entry.Value) + _, dv, err := parseValue(v) + return dv, err } // Put the value to the storage func (p *Pebble) Put(key *ds.Key, value ds.Value) error { - entry := NewValueEntry(key.Name, value, key.Expiration) + entry := NewValueEntry(value, key.Expiration) data := entry.encode() return p.db.Set([]byte(key.Name), data, pebble.Sync) } @@ -44,9 +47,22 @@ func (p *Pebble) Delete(key string) error { return p.db.Delete([]byte(key), pebble.Sync) } -// Reset the storage -func (p *Pebble) Reset() error { - return p.db.Flush() +// Clear the storage +func (p *Pebble) Clear() error { + err := p.db.Close() + if err != nil { + return err + } + err = os.RemoveAll(p.path) + if err != nil { + return err + } + db, err := pebble.Open(p.path, &pebble.Options{}) + if err != nil { + return err + } + p.db = db + return nil } // Close the storage @@ -56,7 +72,8 @@ func (p *Pebble) Close() error { // Snapshot the storage func (p *Pebble) Snapshot() error { - return nil + dstDir := time.Now().Format("20060102150405") + return p.db.Checkpoint(filepath.Join(p.path, dstDir)) } // ScanKeys returns the keys in the storage @@ -76,7 +93,7 @@ func (p *Pebble) ScanKeys(fn func(*ds.Key) bool) { continue } key := &ds.Key{ - Name: entry.Key, + Name: string(iter.Key()), Expiration: entry.Expiration, } if !fn(key) { diff --git a/storage/storage.go b/storage/storage.go index f9a6cc3..c8e0b82 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -15,7 +15,7 @@ type Storage interface { Get(key string) (ds.Value, error) Put(key *ds.Key, value ds.Value) error Delete(key string) error - Reset() error + Clear() error Close() error Snapshot() error ScanKeys(func(*ds.Key) bool) diff --git a/store.go b/store.go index 9e9678e..7e7b1b8 100644 --- a/store.go +++ b/store.go @@ -1,11 +1,13 @@ package nodis import ( - "github.com/diiyw/nodis/storage" "log" "sync" "time" + "github.com/diiyw/nodis/ds" + "github.com/diiyw/nodis/storage" + "github.com/diiyw/nodis/ds/list" "github.com/diiyw/nodis/redis" "github.com/tidwall/btree" @@ -69,7 +71,6 @@ func (s *store) gc() { return true } if m.modified() { - // sync to disk err := s.sg.Put(m.key, m.value) if err != nil { log.Println("GC: ", err) @@ -97,5 +98,5 @@ func (s *store) clear() error { s.mu.Lock() defer s.mu.Unlock() s.metadata.Clear() - return s.sg.Reset() + return s.sg.Clear() } diff --git a/store_test.go b/store_test.go index 29f7469..799c523 100644 --- a/store_test.go +++ b/store_test.go @@ -1,9 +1 @@ package nodis - -import ( - "github.com/diiyw/nodis/fs" -) - -var ( - driver = &fs.Memory{} -) diff --git a/tx.go b/tx.go index e4926c2..62b0cbc 100644 --- a/tx.go +++ b/tx.go @@ -1,7 +1,6 @@ package nodis import ( - "github.com/diiyw/nodis/storage" "sync" "time" @@ -54,7 +53,7 @@ func (tx *Tx) newKey(m *metadata, key string, newFn func() ds.Value) *metadata { m.RWMutex = new(sync.RWMutex) } value := newFn() - m.key = storage.NewKey(key, 0) + m.key = ds.NewKey(key, 0) m.setValue(value) m.state |= KeyStateModified tx.store.metadata.Set(key, m)