Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Jun 17, 2024
1 parent b2179ac commit 3ccaf7a
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 192 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ require (
github.com/sergi/go-diff v1.2.0
github.com/ulikunitz/xz v0.5.11
github.com/vektra/mockery/v2 v2.42.3
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240613214617-0be27c3bb950
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240617223016-4f5f89dbb8cd
gitlab.com/firelizzard/go-script v0.0.0-20240404234115-d5f0a716003d
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240613214617-0be27c3bb950 h1:dlYE/sYVb60AYrO2Tk4oP3UfmHDn3UVzhZGVXj/HjQs=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240613214617-0be27c3bb950/go.mod h1:FTl7W44SWhDenzAtvKkLu30Cin8DAr249mH4eg7BNLY=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240617214518-1eeb92c1340d h1:lvjgVrdQ3CPbP2QAW2kre7vUpu93EU2YWTWvKDCkOZI=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240617214518-1eeb92c1340d/go.mod h1:FTl7W44SWhDenzAtvKkLu30Cin8DAr249mH4eg7BNLY=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240617223016-4f5f89dbb8cd h1:hA4RkCHZNg9lkEMXchnKnwcPjOraa42T/DyFPHt8+7c=
gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240617223016-4f5f89dbb8cd/go.mod h1:FTl7W44SWhDenzAtvKkLu30Cin8DAr249mH4eg7BNLY=
gitlab.com/bosi/decorder v0.4.1 h1:VdsdfxhstabyhZovHafFw+9eJ6eU0d2CkFNJcZz/NU4=
gitlab.com/bosi/decorder v0.4.1/go.mod h1:jecSqWUew6Yle1pCr2eLWTensJMmsxHsBwt+PVbkAqA=
gitlab.com/ethan.reesor/vscode-notebooks/go-playbooks v0.0.0-20220417214602-1121b9fae118 h1:UnyYFTz6dWVMBzLUyqHPIQwMrdpiuE+CE7p/5kUfvmk=
Expand Down
233 changes: 89 additions & 144 deletions pkg/database/keyvalue/block/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,37 @@
package block

import (
"encoding/binary"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"slices"
"sort"
"sync"
"sync/atomic"

"gitlab.com/accumulatenetwork/accumulate/pkg/database"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/memory"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/record"
binary2 "gitlab.com/accumulatenetwork/core/schema/pkg/binary"
"golang.org/x/exp/slog"
)

type Database struct {
path string
commitMu sync.Mutex
files []*blockFile
records records
nextFile uint64
nextBlock uint64
fileLimit uint64
config
path string
commitMu sync.Mutex
files []*blockFile
records records
}

type config struct {
nextFile atomic.Uint64
nextBlock atomic.Uint64
fileLimit int64
nameFmt NameFormat
filterFn func(string) bool
}
Expand All @@ -40,12 +46,12 @@ type records = vmap[[32]byte, recordLocation]
type recordsView = vmapView[[32]byte, recordLocation]

type blockLocation struct {
file uint
file int
offset int64
}

type recordLocation struct {
file uint
file int
block uint64
header int64
offset int64
Expand All @@ -54,7 +60,7 @@ type recordLocation struct {

type Option func(*Database)

func WithFileLimit(limit uint64) Option {
func WithFileLimit(limit int64) Option {
return func(d *Database) {
d.fileLimit = limit
}
Expand Down Expand Up @@ -107,7 +113,7 @@ func Open(path string, options ...Option) (_ *Database, err error) {
continue
}

f, err := openFile(filepath.Join(path, e.Name()))
f, err := db.openFile(filepath.Join(path, e.Name()))
if err != nil {
return nil, err
}
Expand All @@ -116,8 +122,8 @@ func Open(path string, options ...Option) (_ *Database, err error) {
return nil, fmt.Errorf("%s does not have an ordinal", f.file.Name())
}

if f.header.Ordinal > db.nextFile {
db.nextFile = f.header.Ordinal
if db.nextFile.Load() < f.header.Ordinal {
db.nextFile.Store(f.header.Ordinal)
}

db.files = append(db.files, f)
Expand All @@ -144,68 +150,55 @@ func Open(path string, options ...Option) (_ *Database, err error) {
// Build the block index
blocks := map[uint64]blockLocation{}
records := db.records.View()
buf := new(buffer)
dec := new(binary2.Decoder)
for fileNo, f := range db.files {
for i, f := range db.files {
slog.Info("Indexing", "ordinal", f.header.Ordinal, "module", "database")
var offset int64
var block *uint64
for {
e, n, err := readEntryAt(f, offset, buf, dec)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("reading entries from %v: %w", f.file.Name(), err)
}

// b, _ := json.Marshal(e)
// fmt.Println(string(b))

start := offset
offset += int64(n)

switch e := e.(type) {
it := f.entries()
it.Range(func(_ int, item entryPos) bool {
switch e := item.entry.(type) {
case *startBlockEntry:
if block != nil {
return nil, fmt.Errorf("%v is corrupted", f.file.Name())
it.err = fmt.Errorf("%v is corrupted", f.file.Name())
return false
}
if _, ok := blocks[e.ID]; ok {
return nil, fmt.Errorf("duplicate block %d", e.ID)
it.err = fmt.Errorf("duplicate block %d", e.ID)
return false
}
blocks[e.ID] = blockLocation{file: uint(fileNo), offset: start}
blocks[e.ID] = blockLocation{file: i, offset: item.Start}
block = &e.ID

if db.nextBlock < e.ID {
db.nextBlock = e.ID
if db.nextBlock.Load() < e.ID {
db.nextBlock.Store(e.ID)
}

case *endBlockEntry:
if block == nil {
return nil, fmt.Errorf("%v is corrupted", f.file.Name())
it.err = fmt.Errorf("%v is corrupted", f.file.Name())
return false
}
block = nil

case *recordEntry:
if block == nil {
return nil, fmt.Errorf("%v is corrupted", f.file.Name())
it.err = fmt.Errorf("%v is corrupted", f.file.Name())
return false
}

records.Put(e.KeyHash, recordLocation{
file: uint(fileNo),
file: i,
block: *block,
header: start + 2, // The header has a 2 byte length prefix
offset: offset,
header: item.Start,
offset: item.End,
length: e.Length,
})

if e.Length <= 0 {
continue
}

// Skip the record data
offset += e.Length
}
return true
})

if it.err != nil {
return nil, it.err
}
}
err = records.Commit()
Expand All @@ -224,16 +217,15 @@ func (db *Database) newFile() (*blockFile, error) {
}

// Create a new file
db.nextFile++
ordinal := db.nextFile
ordinal := db.nextFile.Add(1)
name := db.nameFmt.Format(ordinal)
if name == "" {
return nil, fmt.Errorf("invalid block file name: empty")
} else if filepath.Base(name) != name {
return nil, fmt.Errorf("invalid block file name: %q contains a slash or is empty", name)
}

f, err := newFile(ordinal, filepath.Join(db.path, name))
f, err := db.config.newFile(ordinal, filepath.Join(db.path, name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -325,7 +317,7 @@ func (d *Database) validLoc(loc recordLocation) bool {
// Corrupted offsets
return false

case loc.file >= uint(len(d.files)):
case loc.file >= len(d.files):
// loc.file is invalid
return false

Expand Down Expand Up @@ -365,112 +357,65 @@ func (d *Database) commit(view *recordsView, entries map[[32]byte]memory.Entry)
d.commitMu.Lock()
defer d.commitMu.Unlock()

// Seek to the end of the newest file or create a new file
fileNo := len(d.files) - 1
var f *blockFile
var offset int64
var err error
if fileNo < 0 {
fileNo = 0
f, err = d.newFile()
} else {
f = d.files[fileNo]
offset, err = f.file.Seek(0, io.SeekEnd)
}
if err != nil {
return err
// Construct an ordered list of entries
list := make([]*entryAndData, 0, len(entries))
for keyHash, entry := range entries {
n := int64(len(entry.Value))
if entry.Delete {
n = -1
}
list = append(list, &entryAndData{
recordEntry{
Key: entry.Key,
KeyHash: keyHash,
Length: n,
},
entry.Value,
})
}

var block uint64
var haveBlock bool
for kh, e := range entries {
// Time for a new file?
if offset >= int64(d.fileLimit) {
// Close the block
if haveBlock {
haveBlock = false
_, err = writeEntry(f.file, &endBlockEntry{})
if err != nil {
return err
}
}

// Remap the file
err := f.Remap()
if err != nil {
return err
}

// Open a new file
offset = 0
fileNo++
f, err = d.newFile()
if err != nil {
return err
// Sort without allocation (hopefully)
sort.Slice(list, func(i, j int) bool {
for b := 0; b < 32; b += 8 {
x := binary.BigEndian.Uint64(list[i].KeyHash[b : b+8])
y := binary.BigEndian.Uint64(list[j].KeyHash[b : b+8])
z := x - y
if z != 0 {
return z < 0
}
}
return false
})

// Time for a new block?
if !haveBlock {
// Open the block
d.nextBlock++
b := new(startBlockEntry)
b.ID = d.nextBlock
b.Parent = block
block = b.ID
haveBlock = true

n, err := writeEntry(f.file, b)
// Write all the entries
var file *blockFile
block := new(startBlockEntry)
block.ID = d.nextBlock.Add(1)
for len(list) > 0 {
// If it's the first time and there are existing files, get the last
// file. Otherwise make a new file.
var err error
if file == nil && len(d.files) > 0 {
file = d.files[len(d.files)-1]

} else {
file, err = d.newFile()
if err != nil {
return err
}
offset += int64(n)
}

l := int64(len(e.Value))
if e.Delete {
l = -1
}

// Write the entry
n, err := writeEntry(f.file, &recordEntry{Key: e.Key, Length: l, KeyHash: e.Key.Hash()})
n, err := file.writeEntries(len(d.files)-1, view, list, block)
if err != nil {
return err
}
offset += int64(n)
view.Put(kh, recordLocation{file: uint(fileNo), block: block, offset: offset, length: l})

if e.Delete {
if n == 0 {
continue
}

// Write the data
n, err = f.file.Write(e.Value)
if err != nil {
return err
}
offset += int64(n)
}
err = view.Commit()
if err != nil {
return err
}

if !haveBlock {
return nil
}

// Close the block
_, err = writeEntry(f.file, &endBlockEntry{})
if err != nil {
return err
}

// Remap the file
err = f.Remap()
if err != nil {
return err
list = list[n:]
block.Part++
}

return nil
return view.Commit()
}
2 changes: 1 addition & 1 deletion pkg/database/keyvalue/block/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func TestFileLimit(t *testing.T) {
files = append(files, ent.Name())
}
require.Equal(t, []string{
"0.blocks",
"1.blocks",
"2.blocks",
"3.blocks",
}, files)
}
Loading

0 comments on commit 3ccaf7a

Please sign in to comment.