Skip to content

Commit

Permalink
File index (saving work)
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Jun 19, 2024
1 parent dfc17dd commit 691dc0d
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 33 deletions.
20 changes: 10 additions & 10 deletions pkg/database/keyvalue/block/block_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,23 @@ type blockFile struct {
*config
file *file
header *fileHeader
dec binary.Decoder
}

type entryAndData struct {
recordEntry
Value []byte
}

func newBlockFile(c *config, name string) (*blockFile, error) {
func newBlockFile(c *config, name string) (_ *blockFile, err error) {
f := new(blockFile)
f.config = c
f.header = new(fileHeader)

var err error
f.file, err = openFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL)
if err != nil {
return nil, err
}
defer closeIfError(&err, f)

b, err := f.header.MarshalBinary()
if err != nil {
Expand All @@ -56,24 +55,24 @@ func newBlockFile(c *config, name string) (*blockFile, error) {
}
b = append(b, make([]byte, fileHeaderSize-len(b))...)

_, err = f.file.WriteAt(0, b)
_, err = f.file.WriteAt(b, 0)
if err != nil {
return nil, err
}

return f, nil
}

func (c *config) openFile(name string) (*blockFile, error) {
func (c *config) openFile(name string) (_ *blockFile, err error) {
f := new(blockFile)
f.config = c
f.header = new(fileHeader)

var err error
f.file, err = openFile(name, os.O_RDWR)
if err != nil {
return nil, err
}
defer closeIfError(&err, f)

if f.file.Len() < fileHeaderSize {
return nil, errors.New("file header is missing or corrupted")
Expand All @@ -88,9 +87,10 @@ func (c *config) openFile(name string) (*blockFile, error) {
}

func (f *blockFile) ReadHeader(l *recordLocation) (*recordEntry, error) {
f.dec.Reset(f.file.ReadRange(l.Offset, l.Offset+l.HeaderLen))
rd := f.file.ReadRange(l.Offset, l.Offset+l.HeaderLen)
dec := binary.NewDecoder(rd)
e := new(recordEntry)
err := e.UnmarshalBinaryV2(&f.dec)
err := e.UnmarshalBinaryV2(dec)
return e, err
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func (f *blockFile) writeEntries(fileIndex int, view *recordIndexView, entries [

// Write the buffer
if w.mainBuf.Len() > bufLimit {
_, err = f.file.WriteAt(offset, w.mainBuf.Bytes())
_, err = f.file.WriteAt(w.mainBuf.Bytes(), offset)
if err != nil {
return 0, err
}
Expand All @@ -180,7 +180,7 @@ func (f *blockFile) writeEntries(fileIndex int, view *recordIndexView, entries [
}

// Write the buffer
_, err = f.file.WriteAt(offset, w.mainBuf.Bytes())
_, err = f.file.WriteAt(w.mainBuf.Bytes(), offset)
if err != nil {
return 0, err
}
Expand Down
57 changes: 35 additions & 22 deletions pkg/database/keyvalue/block/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ type file struct {
func openFile(name string, flags int) (_ *file, err error) {
f := new(file)
f.mu = new(sync.RWMutex)

defer func() {
if err != nil {
_ = f.Close()
}
}()
defer closeIfError(&err, f)

f.file, err = os.OpenFile(name, flags, 0600)
if err != nil {
Expand Down Expand Up @@ -82,34 +77,52 @@ func (f *file) ReadRange(start, end int64) *fileReader {
return &fileReader{f, start, end}
}

func (f *file) WriteAt(offset int64, b []byte) (int, error) {
err := f.unmap()
if err != nil {
return 0, err
}

n := offset + int64(len(b))
err = f.file.Truncate(n)
if err != nil {
return 0, err
}
func (f *file) WriteRange(start, end int64) *fileWriter {
return &fileWriter{f, start, end}
}

f.data, err = mmap.MapRegion(f.file, int(n), mmap.RDWR, 0, 0)
func (f *file) WriteAt(b []byte, offset int64) (int, error) {
err := f.Grow(offset + int64(len(b)))
if err != nil {
return 0, err
}

f.mu.RLock()
defer f.mu.RUnlock()
m := copy(f.data[offset:], b)
return m, nil
}

func (f *file) unmap() error {
if f.data == nil {
func (f *file) Grow(size int64) error {
// Fast path
f.mu.RLock()
if len(f.data) >= int(size) {
f.mu.RUnlock()
return nil
}

err := f.data.Unmap()
f.data = nil
// Upgrade to exclusive lock
f.mu.RUnlock()
f.mu.Lock()
defer f.mu.Unlock()

if len(f.data) >= int(size) {
return nil
}

if f.data != nil {
err := f.data.Unmap()
if err != nil {
return err
}
}

err := f.file.Truncate(size)
if err != nil {
return err
}

f.data, err = mmap.MapRegion(f.file, int(size), mmap.RDWR, 0, 0)
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package block

import "io"

var _ io.ReaderAt = (*fileReader)(nil)
var _ io.WriterAt = (*fileWriter)(nil)

type fileReader struct {
*file
offset int64
Expand Down Expand Up @@ -35,7 +38,7 @@ func (r *fileReader) Read(b []byte) (int, error) {
func (r *fileReader) ReadByte() (byte, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.Len() <= 0 {
if r.Len() <= 0 || len(r.data) <= int(r.offset) {
return 0, io.EOF
}
b := r.data[r.offset]
Expand All @@ -50,3 +53,27 @@ func (r *fileReader) UnreadByte() error {
r.offset--
return nil
}

type fileWriter struct {
*file
offset int64
end int64
}

func (r *fileWriter) Len() int {
if r.end > 0 {
return int(r.end - r.offset)
}
return len(r.data) - int(r.offset)
}

func (r *fileWriter) Write(b []byte) (int, error) {
if n := r.Len(); n == 0 {
return 0, io.EOF
} else if len(b) > n {
b = b[:n]
}
n, err := r.WriteAt(b, r.offset)
r.offset += int64(n)
return n, err
}
165 changes: 165 additions & 0 deletions pkg/database/keyvalue/block/index_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package block

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"sort"
"sync/atomic"

"gitlab.com/accumulatenetwork/accumulate/pkg/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/record"
"gitlab.com/accumulatenetwork/core/schema/pkg/binary"
)

const indexFileEntrySize = 64
const indexFileEntryCount = 1 << 10
const indexFileSize = indexFileEntrySize * indexFileEntryCount

var errNoSpace = errors.New("no space left")

type indexFile struct {
file *file
count atomic.Int64
}

func newIndexFile(name string) (*indexFile, error) {
var err error
f := new(indexFile)
f.file, err = openFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL)
if err != nil {
return nil, err
}
defer closeIfError(&err, f)

// Always allocate 1024 entries
err = f.file.Grow(indexFileSize)
if err != nil {
return nil, err
}

return f, err
}

func openIndexFile(name string) (_ *indexFile, err error) {
f := new(indexFile)
f.file, err = openFile(name, os.O_RDWR)
if err != nil {
return nil, err
}
defer closeIfError(&err, f)

if len(f.file.data) != indexFileSize {
return nil, fmt.Errorf("invalid size: want %d, got %d", indexFileSize, len(f.file.data))
}

// Find the empty region at the end of the file and use that to determine
// the number of entries
f.count.Store(int64(sort.Search(indexFileEntryCount, func(i int) bool {
offset := int64(i) * indexFileEntrySize
return [32]byte(f.file.data[offset:]) == [32]byte{}
})))

return f, err
}

func (f *indexFile) Close() error {
return f.file.Close()
}

func (f *indexFile) Insert(key *record.KeyHash, loc *recordLocation) error {
if *key == [32]byte{} {
panic("cannot insert the zero key")
}

f.file.mu.RLock()
defer f.file.mu.RUnlock()

count := f.count.Load()
i := f.find(count, *key)
offset := i * indexFileEntrySize
if i < count && [32]byte(f.file.data[offset:offset+32]) == *key {
return f.writeAt(key, loc, offset)
}

// Do we need extra space?
if int64(len(f.file.data))/indexFileEntrySize <= count {
return errNoSpace
}

if i < count {
end := count * indexFileEntrySize
copy(f.file.data[offset+64:], f.file.data[offset:end])
}
f.count.Add(1)
return f.writeAt(key, loc, offset)
}

func (f *indexFile) writeAt(key *record.KeyHash, loc *recordLocation, offset int64) error {
wr := f.file.WriteRange(offset, offset+64)
return f.write(key, loc, wr)
}

func (f *indexFile) write(key *record.KeyHash, loc *recordLocation, wr io.Writer) error {
_, err := wr.Write(key[:])
if err != nil {
return err
}

enc := binary.NewEncoder(wr)
err = loc.MarshalBinaryV2(enc)
if err != nil {
if errors.Is(err, io.EOF) {
return io.ErrUnexpectedEOF
}
return err
}

_, err = wr.Write([]byte{binary.EmptyObject})
if errors.Is(err, io.EOF) {
return nil
}
return err
}

func (f *indexFile) Find(key *record.Key) (*recordLocation, error) {
hash := key.Hash()

f.file.mu.RLock()
count := f.count.Load()
index := f.find(count, hash)
f.file.mu.RUnlock()
if index >= count {
return nil, (*database.NotFoundError)(key)
}

offset := int64(index) * indexFileEntrySize
if [32]byte(f.file.data[offset:offset+32]) != hash {
return nil, (*database.NotFoundError)(key)
}

return f.readAt(offset)
}

func (f *indexFile) find(count int64, hash record.KeyHash) int64 {
return int64(sort.Search(int(count), func(i int) bool {
offset := i * indexFileEntrySize
return bytes.Compare(hash[:], f.file.data[offset:offset+32]) <= 0
}))
}

func (f *indexFile) readAt(offset int64) (*recordLocation, error) {
rd := f.file.ReadRange(offset+32, offset+64)
dec := binary.NewDecoder(rd)
loc := new(recordLocation)
err := loc.UnmarshalBinaryV2(dec)
return loc, err
}
Loading

0 comments on commit 691dc0d

Please sign in to comment.