Skip to content

Commit

Permalink
new storage
Browse files Browse the repository at this point in the history
  • Loading branch information
diiyw committed Aug 11, 2024
1 parent 93f1523 commit 6f66847
Show file tree
Hide file tree
Showing 4 changed files with 467 additions and 0 deletions.
254 changes: 254 additions & 0 deletions storage/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package storage

import (
"encoding/binary"
"io"
"log"
"os"
"path/filepath"
"strconv"

"github.com/diiyw/nodis/ds"
"github.com/tidwall/btree"
)

type Key struct {
offset int64
size uint32
fileId uint16
}

type Disk struct {
fileSize int64
fileId uint16
path string
current string
indexFile string
aof *os.File
keys btree.Map[string, *Key]
}

func NewDisk(path string, fileSize int64) *Disk {
return &Disk{
path: path,
fileSize: fileSize,
indexFile: filepath.Join(path, "nodid.index"),
}
}

// Open initializes the storage.
func (d *Disk) Open() error {
err := os.MkdirAll(d.path, 0755)
if err != nil {
return err
}
return nil
}

// Get returns a value from the storage.
func (d *Disk) Get(key string) (ds.Value, error) {
k, ok := d.keys.Get(key)
if !ok {
return nil, ErrKeyNotFound
}
if k.fileId == d.fileId {
data := make([]byte, k.size)
_, err := d.aof.ReadAt(data, k.offset)
if err != nil {
return nil, err
}
return parseValue(data)
}
// read from other file
file := filepath.Join(d.path, "nodid."+strconv.Itoa(int(k.fileId))+".aof")
f, err := os.OpenFile(file, os.O_RDONLY, 0755)
if err != nil {
return nil, err
}
defer func() {
err = f.Close()
if err != nil {
log.Println("Close file error: ", err)
}
}()
data := make([]byte, k.size)
_, err = f.ReadAt(data, k.offset)
if err != nil {
return nil, err
}
return parseValue(data)
}

// Put sets a value in the storage.
func (d *Disk) Put(key string, value ds.Value, expiration int64) error {
data := NewValueEntry(key, value, expiration).encode()
offset, err := d.check()
if err != nil {
return err
}
k := &Key{
fileId: d.fileId,
offset: offset,
size: uint32(len(data)),
}
d.keys.Set(key, k)
_, err = d.aof.Write(data)
if err != nil {
return err
}
return nil
}

// Delete removes a value from the storage.
func (d *Disk) Delete(key string) error {
d.keys.Delete(key)
return nil
}

func (d *Disk) GetIndex() []byte {
fi, err := os.OpenFile(d.indexFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
panic(err)
}
defer fi.Close()
data, err := d.readAll(fi)
if err != nil {
panic(err)
}
if len(data) > 2 {
d.fileId = binary.LittleEndian.Uint16(data[:2])
data = data[2:]
}
d.current = filepath.Join(d.path, "nodid."+strconv.Itoa(int(d.fileId))+".aof")
d.aof, err = os.OpenFile(d.current, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0755)
if err != nil {
panic(err)
}
return data
}

// PutIndex sets the index.
func (d *Disk) PutIndex(index []byte) error {
idxFile, err := os.OpenFile(d.indexFile+"~", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0755)
if err != nil {
return err
}
var header = make([]byte, 2)
binary.LittleEndian.PutUint16(header, d.fileId)
_, err = idxFile.Write(header)
if err != nil {
return err
}
defer func() {
if err = idxFile.Close(); err != nil {
log.Println("Close sync error: ", err)
}
}()
n, err := idxFile.Write(index)
if err != nil {
return err
}
if n != len(index) {
return io.ErrShortWrite
}
err = os.Rename(d.indexFile+"~", d.indexFile)
if err != nil {
return err
}
return nil
}

// Close closes the storage.
func (d *Disk) Close() error {
err := d.aof.Close()
if err != nil {
return err
}
return nil
}

func (d *Disk) readAll(fi *os.File) ([]byte, error) {
l, err := d.getFileSize(fi)
if err != nil {
return nil, err
}
size := int(l)
if size == 0 {
return nil, nil
}
data := make([]byte, size)
_, err = d.readAt(fi, data, 0)
if err != nil {
return nil, err
}
return data, nil
}

func (d *Disk) readAt(fi *os.File, b []byte, off int64) (n int, err error) {
n, err = fi.ReadAt(b, off)
if err == io.EOF {
err = nil
}
return n, err
}

func (d *Disk) getFileSize(fi *os.File) (int64, error) {
fs, err := fi.Stat()
if err != nil {
return 0, err
}
return fs.Size(), nil
}

func (d *Disk) check() (int64, error) {
var offset, err = d.getFileSize(d.aof)
if err != nil {
return 0, err
}
if offset >= d.fileSize {
err = d.aof.Close()
if err != nil {
return 0, err
}
// open file with new file id
d.fileId++
d.current = filepath.Join(d.path, "nodid."+strconv.Itoa(int(d.fileId))+".aof")
d.aof, err = os.OpenFile(d.current, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0755)
if err != nil {
return 0, err
}
// update index file
idxFi, err := os.OpenFile(d.indexFile, os.O_CREATE|os.O_RDWR, 0755)
if err != nil {
return 0, err
}
defer func() {
err := idxFi.Close()
if err != nil {
log.Println("Close index file error: ", err)
}
}()
var header = make([]byte, 4)
binary.LittleEndian.PutUint16(header, d.fileId)
_, err = idxFi.WriteAt(header, 0)
if err != nil {
return 0, err
}
offset = 0
}
return offset, nil
}

func (d *Disk) Reset() error {
err := d.aof.Close()
if err != nil {
return err
}
d.keys.Clear()
err = os.Truncate(d.aof.Name(), 0)
if err != nil {
return err
}
d.aof, err = os.OpenFile(d.aof.Name(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
return err
}
116 changes: 116 additions & 0 deletions storage/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package storage

import (
"encoding/binary"
"errors"
"hash/crc32"

"github.com/diiyw/nodis/ds"
"github.com/diiyw/nodis/ds/hash"
"github.com/diiyw/nodis/ds/list"
"github.com/diiyw/nodis/ds/set"
"github.com/diiyw/nodis/ds/str"
"github.com/diiyw/nodis/ds/zset"
)

var (
ErrCorruptedData = errors.New("corrupted data")
)

// ValueEntry is the entry of the value
type ValueEntry struct {
Type uint8
Expiration int64
Key string
Value []byte
}

func (v *ValueEntry) encode() []byte {
var keyLen = len(v.Key)
var b = make([]byte, 1+8+1+keyLen+len(v.Value))
b[0] = v.Type
n := binary.PutVarint(b[1:], v.Expiration)
b[n+1+1] = byte(keyLen)
copy(b[n+1+1+1:], v.Key)
copy(b[n+1+1+1+keyLen:], v.Value)
b = b[:n+1+1+1+keyLen+len(v.Value)]
c32 := crc32.ChecksumIEEE(b)
var buf = make([]byte, len(b)+4)
binary.LittleEndian.PutUint32(buf, c32)
copy(buf[4:], b)
return buf
}

func (v *ValueEntry) decode(b []byte) error {
if len(b) < 4 {
return ErrCorruptedData
}
c32 := binary.LittleEndian.Uint32(b)
b = b[4:]
if c32 != crc32.ChecksumIEEE(b) {
return ErrCorruptedData
}
v.Type = b[0]
i, n := binary.Varint(b[1:])
v.Expiration = i
// type+expiration+keyIndex
keyLen := int(b[1+n+1])
if len(b) < keyLen {
return ErrCorruptedData
}
v.Key = string(b[1+n+1+1 : 1+n+1+1+keyLen])
v.Value = b[1+n+1+1+keyLen:]
return nil
}

// NewValueEntry creates a new entity
func NewValueEntry(key string, v ds.Value, expiration int64) *ValueEntry {
e := &ValueEntry{
Key: key,
Expiration: expiration,
Type: uint8(v.Type()),
}
e.Value = v.GetValue()
return e
}

func parseValueEntry(data []byte) (*ValueEntry, error) {
var entry = &ValueEntry{}
if err := entry.decode(data); err != nil {
return nil, err
}
return entry, nil
}

func parseValue(data []byte) (ds.Value, error) {
var entry, err = parseValueEntry(data)
if err != nil {
return nil, err
}
var value ds.Value
switch ds.ValueType(entry.Type) {
case ds.String:
v := str.NewString()
v.SetValue(entry.Value)
value = v
case ds.ZSet:
z := zset.NewSortedSet()
z.SetValue(entry.Value)
value = z
case ds.List:
l := list.NewLinkedList()
l.SetValue(entry.Value)
value = l
case ds.Hash:
h := hash.NewHashMap()
h.SetValue(entry.Value)
value = h
case ds.Set:
v := set.NewSet()
v.SetValue(entry.Value)
value = v
default:
panic("unhandled default case")
}
return value, nil
}
Loading

0 comments on commit 6f66847

Please sign in to comment.