Skip to content

Commit

Permalink
Fix file ordinals
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Jun 13, 2024
1 parent 4a70abb commit 7ae1521
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 11 deletions.
39 changes: 32 additions & 7 deletions pkg/database/keyvalue/block/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io/fs"
"os"
"path/filepath"
"slices"
"sync"

"gitlab.com/accumulatenetwork/accumulate/pkg/database"
Expand All @@ -28,7 +29,8 @@ type Database struct {
records records
next uint64
fileLimit uint64
nameFn func(i int) string
nameFmt NameFormat
filterFn func(string) bool
}

type records = vmap[[32]byte, recordLocation]
Expand All @@ -55,9 +57,15 @@ func WithFileLimit(limit uint64) Option {
}
}

func WithNameFunc(fn func(i int) string) Option {
func WithNameFormat(fmt NameFormat) Option {
return func(d *Database) {
d.nameFn = fn
d.nameFmt = fmt
}
}

func FilterFiles(fn func(string) bool) Option {
return func(d *Database) {
d.filterFn = fn
}
}

Expand All @@ -83,7 +91,7 @@ func Open(path string, options ...Option) (_ *Database, err error) {

db.path = path
db.fileLimit = 1 << 30
db.nameFn = func(i int) string { return fmt.Sprintf("%d.blocks", i) }
db.nameFmt = DefaultNameFormat

for _, o := range options {
o(db)
Expand All @@ -92,14 +100,30 @@ func Open(path string, options ...Option) (_ *Database, err error) {
// Open all the files
db.files = make([]*blockFile, 0, len(entries))
for _, e := range entries {
f, err := openFile(filepath.Join(path, e.Name()), os.O_RDWR, 0)
if db.filterFn != nil && !db.filterFn(e.Name()) {
continue
}

// Extract the ordinal from the filename
number, err := db.nameFmt.Parse(e.Name())
if err != nil {
return nil, fmt.Errorf("parse index of %q: %w", e.Name(), err)
}

f, err := openFile(number, filepath.Join(path, e.Name()), os.O_RDWR, 0)
if err != nil {
return nil, err
}

db.files = append(db.files, f)
}

// Sort the files by number, since who knows what order the filesystem
// returns them in
slices.SortFunc(db.files, func(a, b *blockFile) int {
return a.number - b.number
})

// Build the block index
blocks := map[uint64]blockLocation{}
records := db.records.View()
Expand Down Expand Up @@ -177,15 +201,16 @@ func (db *Database) newFile() (*blockFile, error) {
}

// Create a new file
name := db.nameFn(len(db.files))
number := len(db.files)
name := db.nameFmt.Format(number)
if name == "" {
return nil, fmt.Errorf("invalid block file name: empty")
} else if filepath.Base(name) != name {
return nil, fmt.Errorf("invalid block file name: %q contains a slash or is empty", name)
}
name = filepath.Join(db.path, name)

f, err := openFile(name, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0600)
f, err := openFile(number, name, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/database/keyvalue/block/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
)

type blockFile struct {
file *os.File
mu *sync.RWMutex
data mmap.MMap
number int
mu *sync.RWMutex
file *os.File
data mmap.MMap
}

func openFile(name string, flag int, perm fs.FileMode) (*blockFile, error) {
func openFile(number int, name string, flag int, perm fs.FileMode) (*blockFile, error) {
f := new(blockFile)
f.number = number
f.mu = new(sync.RWMutex)

var err error
Expand Down
32 changes: 32 additions & 0 deletions pkg/database/keyvalue/block/format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package block

import (
"fmt"
"strconv"
"strings"
)

var DefaultNameFormat = simpleNameFormat{}

type NameFormat interface {
Format(int) string
Parse(string) (int, error)
}

type simpleNameFormat struct{}

func (simpleNameFormat) Format(i int) string {
return fmt.Sprintf("%d.blocks", i)
}

func (simpleNameFormat) Parse(s string) (int, error) {
s = strings.TrimSuffix(s, ".blocks")
i, err := strconv.ParseInt(s, 10, 64)
return int(i), err
}

0 comments on commit 7ae1521

Please sign in to comment.