diff --git a/cmd/accumulated-bootstrap/main.go b/cmd/accumulated-bootstrap/main.go index 6d46fdf63..22687f992 100644 --- a/cmd/accumulated-bootstrap/main.go +++ b/cmd/accumulated-bootstrap/main.go @@ -14,7 +14,6 @@ import ( "github.com/spf13/cobra" . "gitlab.com/accumulatenetwork/accumulate/cmd/accumulated/run" . "gitlab.com/accumulatenetwork/accumulate/internal/util/cmd" - cmdutil "gitlab.com/accumulatenetwork/accumulate/internal/util/cmd" ) func main() { @@ -35,7 +34,7 @@ var flag = struct { Peers []multiaddr.Multiaddr External multiaddr.Multiaddr }{ - Key: cmdutil.PrivateKeyFlag{Value: &TransientPrivateKey{}}, + Key: PrivateKeyFlag{Value: &TransientPrivateKey{}}, PromListen: []multiaddr.Multiaddr{ multiaddr.StringCast("/ip4/0.0.0.0/tcp/8081/http"), }, @@ -65,7 +64,7 @@ func run(*cobra.Command, []string) { }, } - ctx := cmdutil.ContextForMainProcess(context.Background()) + ctx := ContextForMainProcess(context.Background()) inst, err := Start(ctx, cfg) Check(err) diff --git a/cmd/accumulated/cmd_init_devnet.go b/cmd/accumulated/cmd_init_devnet.go index bb2cf12ab..eba35b970 100644 --- a/cmd/accumulated/cmd_init_devnet.go +++ b/cmd/accumulated/cmd_init_devnet.go @@ -7,17 +7,20 @@ package main import ( + "bufio" "crypto/ed25519" "crypto/rand" "fmt" "os" "path/filepath" + "strings" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/cmd/accumulated/run" "gitlab.com/accumulatenetwork/accumulate/pkg/types/address" "golang.org/x/exp/slices" + "golang.org/x/term" ) func initDevNet(cmd *cobra.Command) *run.Config { @@ -56,7 +59,20 @@ func initDevNet(cmd *cobra.Command) *run.Config { cmd.Flag("followers").Changed && dev.Followers != uint64(flagRunDevnet.NumFollowers) || cmd.Flag("globals").Changed && !flagRunDevnet.Globals.Equal(dev.Globals) if wantReset && !flagMain.Reset { - fatalf("the configuration and flags do not match; use --reset if you wish to override (and reset) the existing configuration") + if !term.IsTerminal(int(os.Stdout.Fd())) || !term.IsTerminal(int(os.Stderr.Fd())) { + fatalf("the configuration and flags do not match; use --reset if you wish to override (and reset) the existing configuration") + } + fmt.Fprint(os.Stderr, "Configuration and flags do not match. Reset? [yN] ") + s, err := bufio.NewReader(os.Stdin).ReadString('\n') + check(err) + s = strings.TrimSpace(s) + s = strings.ToLower(s) + switch s { + case "y", "yes": + flagMain.Reset = true + default: + os.Exit(0) + } } applyDevNetFlags(cmd, cfg, dev, true) diff --git a/exp/ioutil/mapped_file.go b/exp/ioutil/mapped_file.go new file mode 100644 index 000000000..95d04f266 --- /dev/null +++ b/exp/ioutil/mapped_file.go @@ -0,0 +1,261 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package ioutil + +import ( + "errors" + "io" + "io/fs" + "os" + "sync" + "sync/atomic" + + "github.com/edsrzf/mmap-go" + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" +) + +type MappedFile struct { + mu *sync.RWMutex + file *os.File + data mmap.MMap + pool *binary.Pool[*MappedFileRange] +} + +func OpenMappedFile(name string, flags int, perm fs.FileMode) (_ *MappedFile, err error) { + f := new(MappedFile) + f.mu = new(sync.RWMutex) + f.pool = binary.NewPointerPool[MappedFileRange]() + defer func() { + if err != nil { + _ = f.Close() + } + }() + + f.file, err = os.OpenFile(name, flags, 0600) + if err != nil { + return nil, err + } + + st, err := f.file.Stat() + if err != nil { + return nil, err + } + + if st.Size() == 0 { + return f, nil + } + + f.data, err = mmap.MapRegion(f.file, int(st.Size()), mmap.RDWR, 0, 0) + if err != nil { + return nil, err + } + + return f, nil +} + +func (f *MappedFile) Name() string { + return f.file.Name() +} + +func (f *MappedFile) Close() error { + f.mu.Lock() + defer f.mu.Unlock() + + var errs []error + if f.data != nil { + errs = append(errs, f.data.Unmap()) + } + if f.file != nil { + errs = append(errs, f.file.Close()) + } + + f.data = nil + f.file = nil + return errors.Join(errs...) +} + +type MappedFileRange struct { + f *MappedFile + released atomic.Bool + Offset int64 + End int64 +} + +func (f *MappedFile) acquire() *MappedFileRange { + f.mu.RLock() + h := f.pool.Get() + h.f = f + h.released.Store(false) + return h +} + +func (f *MappedFile) Acquire() *MappedFileRange { + h := f.acquire() + h.SetRange(0, int64(len(f.data))) + return h +} + +func (f *MappedFile) AcquireRange(start, end int64) *MappedFileRange { + h := f.acquire() + h.SetRange(start, end) + return h +} + +func (f *MappedFileRange) AcquireRange(start, end int64) *MappedFileRange { + return f.f.AcquireRange(start, end) +} + +func (f *MappedFileRange) Release() { + // Only release once + if f.released.Swap(true) { + return + } + + f.f.mu.RUnlock() + f.f.pool.Put(f) + f.f = nil +} + +func (f *MappedFileRange) SetRange(start, end int64) { + f.Offset = max(start, 0) + f.End = min(end, int64(len(f.f.data))) +} + +func (f *MappedFileRange) Len() int { + return int(f.End - f.Offset) +} + +func (f *MappedFileRange) Raw() []byte { + return f.f.data[f.Offset:f.End] +} + +func (f *MappedFileRange) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekStart: + // Ok + case io.SeekCurrent: + offset += f.Offset + case io.SeekEnd: + offset += f.End + default: + return 0, errors.New("invalid whence") + } + if offset < 0 || offset > f.End { + return 0, io.EOF + } + f.Offset = offset + return offset, nil +} + +func (f *MappedFileRange) Read(b []byte) (int, error) { + n, err := f.ReadAt(b, f.Offset) + f.Offset += int64(n) + return n, err +} +func (f *MappedFileRange) Write(b []byte) (int, error) { + n, err := f.WriteAt(b, f.Offset) + f.Offset += int64(n) + return n, err +} + +func (f *MappedFileRange) ReadAt(b []byte, offset int64) (int, error) { + if offset > int64(len(f.f.data)) { + return 0, io.EOF + } + + n := copy(b, f.f.data[offset:]) + if n < len(b) { + return n, io.EOF + } + return n, nil +} + +func (f *MappedFileRange) WriteAt(b []byte, offset int64) (int, error) { + err := f.Truncate(offset + int64(len(b))) + if err != nil { + return 0, err + } + + m := copy(f.f.data[offset:], b) + return m, nil +} + +func (f *MappedFileRange) ReadByte() (byte, error) { + if f.Offset >= f.End { + return 0, io.EOF + } + b := f.f.data[f.Offset] + f.Offset++ + return b, nil +} + +func (f *MappedFileRange) UnreadByte() error { + if f.Offset <= 0 { + return io.EOF + } + f.Offset-- + return nil +} + +func (f *MappedFileRange) Truncate(size int64) error { + for size > int64(len(f.f.data)) { + err := f.truncateInner(size) + if err != nil { + return err + } + } + return nil +} + +func (f *MappedFileRange) truncateInner(size int64) error { + // Upgrade the lock + demote := f.promote() + defer demote() + + // Is the mapped region smaller than we want? + if size <= int64(len(f.f.data)) { + return nil + } + + // Is the file smaller than we want? + st, err := f.f.file.Stat() + if err != nil { + return err + } + + if st.Size() >= size { + size = st.Size() + } else { + + // Expand the file + err = f.f.file.Truncate(size) + if err != nil { + return err + } + } + + // Remove the old mapping + if f.f.data != nil { + err = f.f.data.Unmap() + if err != nil { + return err + } + } + + // Remap + f.f.data, err = mmap.MapRegion(f.f.file, int(size), mmap.RDWR, 0, 0) + return err +} + +func (f *MappedFileRange) promote() (demote func()) { + f.f.mu.RUnlock() + f.f.mu.Lock() + return func() { + f.f.mu.Unlock() + f.f.mu.RLock() + } +} diff --git a/go.mod b/go.mod index b49ee4c60..437b2a5b2 100644 --- a/go.mod +++ b/go.mod @@ -71,7 +71,7 @@ require ( github.com/sergi/go-diff v1.2.0 github.com/ulikunitz/xz v0.5.11 github.com/vektra/mockery/v2 v2.42.3 - gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240618183058-91669372cfa3 + gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240620224759-19369030b361 gitlab.com/firelizzard/go-script v0.0.0-20240404234115-d5f0a716003d go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 diff --git a/go.sum b/go.sum index 1f9ae0157..362691836 100644 --- a/go.sum +++ b/go.sum @@ -1166,8 +1166,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240618183058-91669372cfa3 h1:RyNDul6B2nebcjA8+KSBR79MRUftOWWO0NRqN3x0Vh4= -gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240618183058-91669372cfa3/go.mod h1:FTl7W44SWhDenzAtvKkLu30Cin8DAr249mH4eg7BNLY= +gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240620224759-19369030b361 h1:J9FcjE7osYPinvoEzWrkpagukyqDwzLqH9fplznBA+M= +gitlab.com/accumulatenetwork/core/schema v0.1.1-0.20240620224759-19369030b361/go.mod h1:FTl7W44SWhDenzAtvKkLu30Cin8DAr249mH4eg7BNLY= gitlab.com/bosi/decorder v0.4.1 h1:VdsdfxhstabyhZovHafFw+9eJ6eU0d2CkFNJcZz/NU4= gitlab.com/bosi/decorder v0.4.1/go.mod h1:jecSqWUew6Yle1pCr2eLWTensJMmsxHsBwt+PVbkAqA= gitlab.com/ethan.reesor/vscode-notebooks/go-playbooks v0.0.0-20220417214602-1121b9fae118 h1:UnyYFTz6dWVMBzLUyqHPIQwMrdpiuE+CE7p/5kUfvmk= diff --git a/pkg/database/bpt/bpt.go b/pkg/database/bpt/bpt.go index f0d088c96..42a26d0b6 100644 --- a/pkg/database/bpt/bpt.go +++ b/pkg/database/bpt/bpt.go @@ -19,7 +19,7 @@ type KeyValuePair struct { } // New returns a new BPT. -func New(parent database.Record, logger log.Logger, store database.Store, key *database.Key) *BPT { +func New(_ database.Record, logger log.Logger, store database.Store, key *database.Key) *BPT { b := new(BPT) b.logger.Set(logger) b.store = store diff --git a/pkg/database/bpt/params.go b/pkg/database/bpt/params.go index 4c43b1cf1..c8af47d26 100644 --- a/pkg/database/bpt/params.go +++ b/pkg/database/bpt/params.go @@ -20,12 +20,13 @@ const paramsV2Magic = "\xC0\xFF\xEE" // paramsStateSize is the marshaled size of [parameters]. const paramsStateSize = 1 + 2 + 2 + 32 -func (b *BPT) SetParams(p parameters) error { +func (b *BPT) SetParams(p Parameters) error { if b.loadedState == nil { s, err := b.getState().Get() switch { case err == nil: b.loadedState = s + s.Mask = s.Power - 1 case errors.Is(err, errors.NotFound): b.loadedState = new(stateData) default: @@ -33,12 +34,12 @@ func (b *BPT) SetParams(p parameters) error { } } - if b.loadedState.parameters == (parameters{}) { - b.loadedState.parameters = p + if b.loadedState.Parameters == (Parameters{}) { + b.loadedState.Parameters = p return b.storeState() } - if !b.loadedState.parameters.Equal(&p) { + if !b.loadedState.Parameters.Equal(&p) { return errors.BadRequest.With("BPT parameters cannot be modified") } return nil @@ -55,6 +56,7 @@ func (b *BPT) loadState() (*stateData, error) { switch { case err == nil: b.loadedState = s + s.Mask = s.Power - 1 return s, nil case !errors.Is(err, errors.NotFound): return nil, errors.UnknownError.Wrap(err) diff --git a/pkg/database/bpt/schema.yml b/pkg/database/bpt/schema.yml index 13030d975..d52724f77 100644 --- a/pkg/database/bpt/schema.yml +++ b/pkg/database/bpt/schema.yml @@ -16,16 +16,18 @@ stateData: - name: MaxHeight type: uint - type: - name: parameters + name: Parameters class: composite fields: - name: Power type: uint - - name: Mask - type: uint - name: ArbitraryValues type: boolean + transients: + - name: Mask + type: uint + node: class: union discriminator: diff --git a/pkg/database/bpt/schema_gen.go b/pkg/database/bpt/schema_gen.go index 4014c1ef4..69405477a 100644 --- a/pkg/database/bpt/schema_gen.go +++ b/pkg/database/bpt/schema_gen.go @@ -10,18 +10,40 @@ import ( ) var ( + sParameters schema.Methods[*Parameters, *Parameters, *schema.CompositeType] sbranch schema.Methods[*branch, *branch, *schema.CompositeType] semptyNode schema.Methods[*emptyNode, *emptyNode, *schema.CompositeType] sleaf schema.Methods[*leaf, *leaf, *schema.CompositeType] snode schema.Methods[node, *node, *schema.UnionType] snodeType schema.EnumMethods[nodeType] - sparameters schema.Methods[*parameters, *parameters, *schema.CompositeType] sstateData schema.Methods[*stateData, *stateData, *schema.CompositeType] ) func init() { var deferredTypes schema.ResolverSet + sParameters = schema.WithMethods[*Parameters, *Parameters](&schema.CompositeType{ + TypeBase: schema.TypeBase{ + Name: "Parameters", + }, + Fields: []*schema.Field{ + { + Name: "Power", + Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + }, + { + Name: "ArbitraryValues", + Type: &schema.SimpleType{Type: schema.SimpleTypeBool}, + }, + }, + Transients: []*schema.Field{ + { + Name: "Mask", + Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + }, + }, + }).SetGoType() + sbranch = schema.WithMethods[*branch, *branch](&schema.CompositeType{ TypeBase: schema.TypeBase{ Name: "branch", @@ -175,26 +197,6 @@ func init() { }, }).SetGoType() - sparameters = schema.WithMethods[*parameters, *parameters](&schema.CompositeType{ - TypeBase: schema.TypeBase{ - Name: "parameters", - }, - Fields: []*schema.Field{ - { - Name: "Power", - Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, - }, - { - Name: "Mask", - Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, - }, - { - Name: "ArbitraryValues", - Type: &schema.SimpleType{Type: schema.SimpleTypeBool}, - }, - }, - }).SetGoType() - sstateData = schema.WithMethods[*stateData, *stateData](&schema.CompositeType{ TypeBase: schema.TypeBase{ Name: "stateData", @@ -208,17 +210,17 @@ func init() { Name: "MaxHeight", Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, }, - (&schema.Field{}).ResolveTo(&deferredTypes, "parameters"), + (&schema.Field{}).ResolveTo(&deferredTypes, "Parameters"), }, }).SetGoType() s, err := schema.New( + sParameters.Type, sbranch.Type, semptyNode.Type, sleaf.Type, snode.Type, snodeType.Type, - sparameters.Type, sstateData.Type, ) if err != nil { diff --git a/pkg/database/bpt/types_gen.go b/pkg/database/bpt/types_gen.go index e57270bf7..365c5cd01 100644 --- a/pkg/database/bpt/types_gen.go +++ b/pkg/database/bpt/types_gen.go @@ -7,6 +7,29 @@ import ( "gitlab.com/accumulatenetwork/core/schema/pkg/widget" ) +type Parameters struct { + Power uint64 + ArbitraryValues bool + Mask uint64 +} + +var wParameters = widget.ForCompositePtr(widget.Fields[Parameters]{ + {Name: "power", ID: 1, Widget: widget.ForUint(func(v *Parameters) *uint64 { return &v.Power })}, + {Name: "arbitraryValues", ID: 2, Widget: widget.ForBool(func(v *Parameters) *bool { return &v.ArbitraryValues })}, +}, widget.Identity[**Parameters]) + +// Copy returns a copy of the Parameters. +func (v *Parameters) Copy() *Parameters { + var u *Parameters + wParameters.CopyTo(&u, &v) + return u +} + +// EqualParameters returns true if V is equal to U. +func (v *Parameters) Equal(u *Parameters) bool { + return wParameters.Equal(&v, &u) +} + type branch struct { Height uint64 Key [32]byte @@ -144,40 +167,16 @@ func (v nodeType) String() string { return snodeType.String(v) } -type parameters struct { - Power uint64 - Mask uint64 - ArbitraryValues bool -} - -var wparameters = widget.ForCompositePtr(widget.Fields[parameters]{ - {Name: "power", ID: 1, Widget: widget.ForUint(func(v *parameters) *uint64 { return &v.Power })}, - {Name: "mask", ID: 2, Widget: widget.ForUint(func(v *parameters) *uint64 { return &v.Mask })}, - {Name: "arbitraryValues", ID: 3, Widget: widget.ForBool(func(v *parameters) *bool { return &v.ArbitraryValues })}, -}, widget.Identity[**parameters]) - -// Copy returns a copy of the parameters. -func (v *parameters) Copy() *parameters { - var u *parameters - wparameters.CopyTo(&u, &v) - return u -} - -// Equalparameters returns true if V is equal to U. -func (v *parameters) Equal(u *parameters) bool { - return wparameters.Equal(&v, &u) -} - type stateData struct { RootHash [32]byte MaxHeight uint64 - parameters + Parameters } var wstateData = widget.ForCompositePtr(widget.Fields[stateData]{ {Name: "rootHash", ID: 1, Widget: widget.ForHash(func(v *stateData) *[32]byte { return &v.RootHash })}, {Name: "maxHeight", ID: 2, Widget: widget.ForUint(func(v *stateData) *uint64 { return &v.MaxHeight })}, - {ID: 3, Widget: widget.ForComposite(wparameters.Fields, func(v *stateData) *parameters { return &v.parameters })}, + {ID: 3, Widget: widget.ForComposite(wParameters.Fields, func(v *stateData) *Parameters { return &v.Parameters })}, }, widget.Identity[**stateData]) // Copy returns a copy of the stateData. diff --git a/pkg/database/bpt/values_test.go b/pkg/database/bpt/values_test.go index 3956bf908..81dbaf482 100644 --- a/pkg/database/bpt/values_test.go +++ b/pkg/database/bpt/values_test.go @@ -29,7 +29,7 @@ func TestNonHashValues(t *testing.T) { store := memory.New(nil).Begin(nil, true) model := new(ChangeSet) model.store = keyvalue.RecordStore{Store: store} - err := model.BPT().SetParams(parameters{ + err := model.BPT().SetParams(Parameters{ ArbitraryValues: true, }) require.NoError(t, err) diff --git a/pkg/database/keyvalue/badger/v4_test.go b/pkg/database/keyvalue/badger/v4_test.go index 6d0db51b9..08ca4435c 100644 --- a/pkg/database/keyvalue/badger/v4_test.go +++ b/pkg/database/keyvalue/badger/v4_test.go @@ -17,6 +17,10 @@ func BenchmarkV4Commit(b *testing.B) { kvtest.BenchmarkCommit(b, newOpenerV4(b)) } +func BenchmarkV4Open(b *testing.B) { + kvtest.BenchmarkOpen(b, newOpenerV4(b)) +} + func BenchmarkV4ReadRandom(b *testing.B) { kvtest.BenchmarkReadRandom(b, newOpenerV4(b)) } diff --git a/pkg/database/keyvalue/block/database.go b/pkg/database/keyvalue/block/database.go index 6c91d9875..53fb4313f 100644 --- a/pkg/database/keyvalue/block/database.go +++ b/pkg/database/keyvalue/block/database.go @@ -7,53 +7,43 @@ package block import ( - "fmt" - "io" - "io/fs" - "log/slog" - "os" - "path/filepath" + "bytes" + "errors" "slices" "sync" + "sync/atomic" - "gitlab.com/accumulatenetwork/accumulate/pkg/database" "gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue" "gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/memory" - "gitlab.com/accumulatenetwork/accumulate/pkg/errors" "gitlab.com/accumulatenetwork/accumulate/pkg/types/record" - binary2 "gitlab.com/accumulatenetwork/core/schema/pkg/binary" + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" ) +var poolEncoder = binary.NewPointerPool[binary.Encoder]() +var poolDecoder = binary.NewPointerPool[binary.Decoder]() +var poolLocation = binary.NewPointerPool[recordLocation]() +var poolBuffer = binary.NewPointerPool[bytes.Buffer]() + type Database struct { + config + commitMu sync.Mutex + records *recordFileSet + indexFiles *indexFileTree + blockIndex vmap[blockID, int] + recordLocation vmap[[32]byte, *recordLocation] +} + +type config struct { path string - commitMu sync.Mutex - files []*blockFile - records records - next uint64 - fileLimit uint64 + nextBlock atomic.Uint64 + fileLimit int64 nameFmt NameFormat filterFn func(string) bool } -type records = vmap[[32]byte, recordLocation] -type recordsView = vmapView[[32]byte, recordLocation] - -type blockLocation struct { - file uint - offset int64 -} - -type recordLocation struct { - file uint - block uint64 - header int64 - offset int64 - length int64 -} - type Option func(*Database) -func WithFileLimit(limit uint64) Option { +func WithFileLimit(limit int64) Option { return func(d *Database) { d.fileLimit = limit } @@ -72,18 +62,6 @@ func FilterFiles(fn func(string) bool) Option { } func Open(path string, options ...Option) (_ *Database, err error) { - // List all the entries - entries, err := os.ReadDir(path) - switch { - case err == nil, - errors.Is(err, fs.ErrNotExist): - // Directory exists, or doesn't - - default: - // Some other error - return nil, err - } - db := new(Database) defer func() { if err != nil { @@ -99,151 +77,75 @@ func Open(path string, options ...Option) (_ *Database, err error) { o(db) } - // Open all the files - db.files = make([]*blockFile, 0, len(entries)) - for _, e := range entries { - if db.filterFn != nil && !db.filterFn(e.Name()) { - continue - } - - // Extract the ordinal from the filename - number, err := db.nameFmt.Parse(e.Name()) - if err != nil { - return nil, fmt.Errorf("parse index of %q: %w", e.Name(), err) - } - - f, err := openFile(number, filepath.Join(path, e.Name()), os.O_RDWR, 0) - if err != nil { - return nil, err - } - - db.files = append(db.files, f) + // Load record files + db.records, err = openRecordFileSet(&db.config) + if err != nil { + return nil, err } - // Sort the files by number, since who knows what order the filesystem - // returns them in - slices.SortFunc(db.files, func(a, b *blockFile) int { - return a.number - b.number - }) - - // Build the block index - blocks := map[uint64]blockLocation{} - records := db.records.View() - buf := new(buffer) - dec := new(binary2.Decoder) - for fileNo, f := range db.files { - slog.Info("Indexing", "ordinal", f.number, "module", "database") - var offset int64 - var block *uint64 - for { - e, n, err := readEntryAt(f, offset, buf, dec) - if err != nil { - if errors.Is(err, io.EOF) { - break - } - return nil, fmt.Errorf("reading entries from %v: %w", f.file.Name(), err) - } - - // b, _ := json.Marshal(e) - // fmt.Println(string(b)) - - start := offset - offset += int64(n) - - switch e := e.(type) { - case *startBlockEntry: - if block != nil { - return nil, fmt.Errorf("%v is corrupted", f.file.Name()) - } - if _, ok := blocks[e.ID]; ok { - return nil, fmt.Errorf("duplicate block %d", e.ID) - } - blocks[e.ID] = blockLocation{file: uint(fileNo), offset: start} - block = &e.ID - - if db.next < e.ID { - db.next = e.ID - } - - case *endBlockEntry: - if block == nil { - return nil, fmt.Errorf("%v is corrupted", f.file.Name()) - } - block = nil - - case *recordEntry: - if block == nil { - return nil, fmt.Errorf("%v is corrupted", f.file.Name()) - } - - records.Put(e.KeyHash, recordLocation{ - file: uint(fileNo), - block: *block, - header: start + 2, // The header has a 2 byte length prefix - offset: offset, - length: e.Length, - }) - - if e.Length <= 0 { - continue - } - - // Skip the record data - offset += e.Length - } - } - } - err = records.Commit() + // Load index files + db.indexFiles, err = openIndexFileTree(&db.config) if err != nil { return nil, err } - return db, nil -} + db.recordLocation.fn.forEach = db.indexFiles.ForEach + db.recordLocation.fn.commit = db.indexFiles.Commit -func (db *Database) newFile() (*blockFile, error) { - // Ensure the directory exists - err := os.Mkdir(db.path, 0700) - if err != nil && !errors.Is(err, fs.ErrExist) { - return nil, err + // Determine the next block number + if len(db.records.files) > 0 { + it := db.records.files[len(db.records.files)-1].entries(func(typ entryType) bool { + return typ == entryTypeStartBlock + }) + it.Range(func(_ int, item recordPos) bool { + s := item.Entry.(*startBlockEntry) + if db.nextBlock.Load() < s.ID { + db.nextBlock.Store(s.ID) + } + return true + }) + if it.err != nil { + return nil, it.err + } } - // Create a new file - number := len(db.files) - name := db.nameFmt.Format(number) - if name == "" { - return nil, fmt.Errorf("invalid block file name: empty") - } else if filepath.Base(name) != name { - return nil, fmt.Errorf("invalid block file name: %q contains a slash or is empty", name) + // Index blocks + err = db.indexBlocks() + if err != nil { + return nil, err } - name = filepath.Join(db.path, name) - f, err := openFile(number, name, os.O_RDWR|os.O_EXCL|os.O_CREATE, 0600) + // Build the block index + err = db.indexRecords() if err != nil { return nil, err } - db.files = append(db.files, f) - return f, nil + return db, nil } func (db *Database) Close() error { - var errs []error - for _, f := range db.files { - errs = append(errs, f.Close()) - } - return errors.Join(errs...) + return errors.Join( + db.records.Close(), + db.indexFiles.Close(), + ) } // Begin begins a change set. func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { - view := d.records.View() + view := &databaseView{ + recordFiles: d.records, + indexFiles: d.indexFiles, + blocks: d.blockIndex.View(), + records: d.recordLocation.View(), + } + get := func(key *record.Key) ([]byte, error) { - return d.get(view, key) + return view.Get(key) } forEach := func(fn func(*record.Key, []byte) error) error { - return d.forEach(view, fn) + return view.ForEach(fn) } var commit memory.CommitFunc @@ -264,198 +166,40 @@ func (d *Database) Begin(prefix *record.Key, writable bool) keyvalue.ChangeSet { }) } -func (d *Database) get(view *recordsView, key *record.Key) ([]byte, error) { - loc, ok := view.Get(key.Hash()) - if !ok || loc.length < 0 { - return nil, (*database.NotFoundError)(key) - } - if !d.validLoc(loc) { - return nil, errors.InternalError.WithFormat("record is corrupted") - } - return d.read(loc) -} - -func (d *Database) forEach(view *recordsView, fn func(*record.Key, []byte) error) error { - return view.ForEach(func(key [32]byte, loc recordLocation) error { - // Skip deleted entries - if loc.length < 0 { - return nil - } - if !d.validLoc(loc) { - return errors.InternalError.WithFormat("record is corrupted") - } - - header, err := d.readHeader(loc) - if err != nil { - return err - } - - b, err := d.read(loc) - if err != nil { - return err - } - - return fn(header.Key, b) - }) -} - -func (d *Database) validLoc(loc recordLocation) bool { - // If any of these conditions fail, there's a bug. Record locations are - // initialized from disk, so any issue there indicates corruption or an - // initialization bug. Record locations are only ever added by Commit, which - // writes the records to disk and remaps them before committing the record - // locations, so any issue there indicates a bug. - switch { - case loc.header < 0 || loc.offset < 0 || loc.header > loc.offset: - // Corrupted offsets - return false - - case loc.file >= uint(len(d.files)): - // loc.file is invalid - return false - - case loc.offset+loc.length > int64(d.files[loc.file].Len()): - // File is not memory mapped or requested range is outside the memory - // mapped region - return false - } - return true -} - -func (d *Database) read(loc recordLocation) ([]byte, error) { - b := make([]byte, loc.length) - _, err := d.files[loc.file].ReadAt(b, loc.offset) - if errors.Is(err, io.EOF) { - return b, io.ErrUnexpectedEOF - } - return b, err -} - -func (d *Database) readHeader(loc recordLocation) (*recordEntry, error) { - b := make([]byte, loc.offset-loc.header) - _, err := d.files[loc.file].ReadAt(b, loc.header) - if err != nil { - return nil, err - } - - e := new(recordEntry) - err = e.UnmarshalBinary(b) - return e, err -} - -func (d *Database) commit(view *recordsView, entries map[[32]byte]memory.Entry) error { +func (d *Database) commit(view *databaseView, entries map[[32]byte]memory.Entry) error { defer view.Discard() // Commits must be serialized d.commitMu.Lock() defer d.commitMu.Unlock() - // Seek to the end of the newest file or create a new file - fileNo := len(d.files) - 1 - var f *blockFile - var offset int64 - var err error - if fileNo < 0 { - fileNo = 0 - f, err = d.newFile() - } else { - f = d.files[fileNo] - offset, err = f.file.Seek(0, io.SeekEnd) - } - if err != nil { - return err - } - - var block uint64 - var haveBlock bool - for kh, e := range entries { - // Time for a new file? - if offset >= int64(d.fileLimit) { - // Close the block - if haveBlock { - haveBlock = false - _, err = writeEntry(f.file, &endBlockEntry{}) - if err != nil { - return err - } - } - - // Remap the file - err := f.Remap() - if err != nil { - return err - } - - // Open a new file - offset = 0 - fileNo++ - f, err = d.newFile() - if err != nil { - return err - } - } - - // Time for a new block? - if !haveBlock { - // Open the block - d.next++ - b := new(startBlockEntry) - b.ID = d.next - b.Parent = block - block = b.ID - haveBlock = true - - n, err := writeEntry(f.file, b) - if err != nil { - return err - } - offset += int64(n) - } - - l := int64(len(e.Value)) - if e.Delete { - l = -1 - } - - // Write the entry - n, err := writeEntry(f.file, &recordEntry{Key: e.Key, Length: l, KeyHash: e.Key.Hash()}) - if err != nil { - return err - } - offset += int64(n) - view.Put(kh, recordLocation{file: uint(fileNo), block: block, offset: offset, length: l}) - - if e.Delete { - continue - } - - // Write the data - n, err = f.file.Write(e.Value) - if err != nil { - return err - } - offset += int64(n) - } - err = view.Commit() - if err != nil { - return err - } - - if !haveBlock { - return nil - } - - // Close the block - _, err = writeEntry(f.file, &endBlockEntry{}) - if err != nil { - return err - } + // Construct an ordered list of entries + list := make([]*entryAndData, 0, len(entries)) + for keyHash, entry := range entries { + n := int64(len(entry.Value)) + if entry.Delete { + n = -1 + } + list = append(list, &entryAndData{ + recordEntry{ + Key: entry.Key, + KeyHash: keyHash, + Length: n, + }, + entry.Value, + }) + } + + // Sort to remove non-deterministic weirdness + slices.SortFunc(list, func(a, b *entryAndData) int { + return bytes.Compare(a.KeyHash[:], b.KeyHash[:]) + }) - // Remap the file - err = f.Remap() + // Write all the entries + err := d.records.Commit(view, list) if err != nil { return err } - return nil + return view.Commit() } diff --git a/pkg/database/keyvalue/block/database_test.go b/pkg/database/keyvalue/block/database_test.go index da89fcc26..d2b6722c6 100644 --- a/pkg/database/keyvalue/block/database_test.go +++ b/pkg/database/keyvalue/block/database_test.go @@ -11,6 +11,8 @@ import ( "fmt" "math/big" "os" + "path/filepath" + "strings" "testing" "time" @@ -20,6 +22,13 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/types/record" ) +func TestFoo(t *testing.T) { + t.Skip("Manual") + db, err := Open("../../../../.nodes/devnet/bvn1-1/bvnn/data/accumulate.db") + require.NoError(t, err) + require.NoError(t, db.Close()) +} + func BenchmarkCommit(b *testing.B) { kvtest.BenchmarkCommit(b, newOpener(b)) } @@ -84,13 +93,13 @@ func TestDelete(t *testing.T) { func newOpener(t testing.TB) kvtest.Opener { path := t.TempDir() return func() (keyvalue.Beginner, error) { - return Open(path) + return Open(filepath.Join(path, "test.db")) } } func TestFileLimit(t *testing.T) { dir := t.TempDir() - db, err := Open(dir, WithFileLimit(1<<10)) + db, err := Open(dir, WithFileLimit(2<<10)) require.NoError(t, err) defer db.Close() @@ -98,24 +107,43 @@ func TestFileLimit(t *testing.T) { defer batch.Discard() const N = 16 + var keys []string for i := 0; i < N; i++ { k := record.NewKey(i) + keys = append(keys, k.String()) v := make([]byte, 128) _, _ = rand.Read(v) err = batch.Put(k, v) require.NoError(t, err, "Put") } require.NoError(t, batch.Commit()) + require.NoError(t, db.Close()) var files []string ent, err := os.ReadDir(dir) require.NoError(t, err) for _, ent := range ent { - files = append(files, ent.Name()) + if strings.HasSuffix(ent.Name(), dotBlocks) { + files = append(files, ent.Name()) + } } - require.Equal(t, []string{ - "0.blocks", + require.ElementsMatch(t, []string{ "1.blocks", - "2.blocks", + "1-1.blocks", + "1-2.blocks", }, files) + + db, err = Open(dir, WithFileLimit(1<<10)) + require.NoError(t, err) + defer db.Close() + + batch = db.Begin(nil, true) + defer batch.Discard() + + var keys2 []string + require.NoError(t, batch.ForEach(func(key *record.Key, _ []byte) error { + keys2 = append(keys2, key.String()) + return nil + })) + require.ElementsMatch(t, keys, keys2) } diff --git a/pkg/database/keyvalue/block/database_view.go b/pkg/database/keyvalue/block/database_view.go new file mode 100644 index 000000000..708025043 --- /dev/null +++ b/pkg/database/keyvalue/block/database_view.go @@ -0,0 +1,186 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "fmt" + "path/filepath" + + "gitlab.com/accumulatenetwork/accumulate/pkg/database" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/record" + "golang.org/x/exp/slog" +) + +type databaseView struct { + recordFiles *recordFileSet + indexFiles *indexFileTree + blocks *vmapView[blockID, int] + records *vmapView[[32]byte, *recordLocation] +} + +func (db *Database) indexBlocks() error { + blocks := db.blockIndex.View() + it := db.records.entries(func(typ entryType) bool { + return typ == entryTypeStartBlock + }) + var prev *recordFile + it.Range(func(_ int, item recordFilePos) bool { + if item.File != prev { + slog.Info("Indexing blocks", "file", filepath.Base(item.File.file.Name()), "module", "database") + prev = item.File + } + + s := item.Entry.(*startBlockEntry) + if _, ok := blocks.Get(s.blockID); ok { + it.err = fmt.Errorf("%v is corrupted: duplicate block %v", filepath.Base(item.File.file.Name()), s.blockID.String()) + return false + } + + blocks.Put(s.blockID, item.FileIndex) + return true + }) + if it.err != nil { + return it.err + } + + return blocks.Commit() +} + +func (db *Database) indexRecords() error { + // Assume the database has already been indexed + if _, ok := db.indexFiles.root.(emptyIndexFileTree); !ok { + return nil + } + + records := db.recordLocation.View() + it := db.records.entries(nil) + + var prev *recordFile + var block *blockID + it.Range(func(_ int, item recordFilePos) bool { + if item.File != prev { + slog.Info("Indexing entries", "file", filepath.Base(item.File.file.Name()), "module", "database") + block = nil + prev = item.File + } + + switch e := item.Entry.(type) { + case *startBlockEntry: + if block != nil { + it.err = fmt.Errorf("%v is corrupted", filepath.Base(item.File.file.Name())) + return false + } + block = &e.blockID + slog.Info("Indexing block", "file", filepath.Base(item.File.file.Name()), "block", block, "module", "database") + + case *endBlockEntry: + if block == nil { + it.err = fmt.Errorf("%v is corrupted", filepath.Base(item.File.file.Name())) + return false + } + block = nil + + // Commit at the end of each block to keep the memory usage under + // control for large databases + err := records.Commit() + if err != nil { + it.err = err + return false + } + records = db.recordLocation.View() + + case *recordEntry: + if block == nil { + it.err = fmt.Errorf("%v is corrupted", filepath.Base(item.File.file.Name())) + return false + } + + records.Put(e.KeyHash, &recordLocation{ + Block: block, + Offset: item.Start, + HeaderLen: item.End - item.Start, + RecordLen: e.Length, + }) + } + return true + }) + if it.err != nil { + return it.err + } + + return records.Commit() +} + +func (r *databaseView) Get(key *record.Key) ([]byte, error) { + loc, ok := r.records.Get(key.Hash()) + if !ok { + loc, ok = r.indexFiles.Get(key.Hash()) + if ok { + defer poolLocation.Put(loc) + } + } + if !ok || loc.RecordLen < 0 { + return nil, (*database.NotFoundError)(key) + } + + f, err := r.getFile(loc) + if err != nil { + return nil, err + } + + return f.ReadRecord(loc) +} + +func (r *databaseView) ForEach(fn func(*record.Key, []byte) error) error { + return r.records.ForEach(func(key [32]byte, loc *recordLocation) error { + // Skip deleted entries + if loc.RecordLen < 0 { + return nil + } + + f, err := r.getFile(loc) + if err != nil { + return err + } + + header, err := f.ReadHeader(loc) + if err != nil { + return err + } + + record, err := f.ReadRecord(loc) + if err != nil { + return err + } + + return fn(header.Key, record) + }) +} + +func (r *databaseView) getFile(l *recordLocation) (*recordFile, error) { + i, ok := r.blocks.Get(*l.Block) + if !ok { + return nil, errors.InternalError.WithFormat("corrupted: cannot locate block %v", l.Block) + } + if i >= len(r.recordFiles.files) || r.recordFiles.files[i] == nil { + return nil, errors.InternalError.With("corrupted: invalid block index entry") + } + return r.recordFiles.files[i], nil +} + +func (r *databaseView) Discard() { + r.blocks.Discard() + r.records.Discard() +} + +func (r *databaseView) Commit() error { + return errors.Join( + r.blocks.Commit(), + r.records.Commit(), + ) +} diff --git a/pkg/database/keyvalue/block/entries.go b/pkg/database/keyvalue/block/entries.go deleted file mode 100644 index 2aab78845..000000000 --- a/pkg/database/keyvalue/block/entries.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2024 The Accumulate Authors -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -package block - -import ( - "encoding/binary" - "errors" - "io" - "math" - - binary2 "gitlab.com/accumulatenetwork/core/schema/pkg/binary" -) - -func readEntryAt(rd io.ReaderAt, offset int64, buf *buffer, dec *binary2.Decoder) (entry, int, error) { - var lb [2]byte - n, err := rd.ReadAt(lb[:], offset) - switch { - case err == nil: - // Ok - case !errors.Is(err, io.EOF): - return nil, n, err - case n == 0: - return nil, n, io.EOF - default: - return nil, n, io.ErrUnexpectedEOF - } - - l := int(binary.BigEndian.Uint16(lb[:])) - err = buf.load(rd, l, offset+2) - n += l - if err != nil { - if errors.Is(err, io.EOF) { - return nil, n, io.EOF - } - return nil, n, err - } - - dec.Reset(buf) - e, err := unmarshalEntryBinaryV2(dec) - return e, n, err -} - -func writeEntry(wr io.Writer, e entry) (int, error) { - b, err := e.MarshalBinary() - if err != nil { - return 0, err - } - if len(b) > math.MaxUint16 { - panic("entry is larger than 64 KiB") - } - - var lenBuf [2]byte - binary.BigEndian.PutUint16(lenBuf[:], uint16(len(b))) - n, err := wr.Write(lenBuf[:]) - if err != nil { - return n, err - } - - m, err := wr.Write(b) - return n + m, err -} - -type buffer struct { - bytes []byte - offset int -} - -func (b *buffer) load(rd io.ReaderAt, n int, offset int64) error { - b.bytes = b.bytes[:cap(b.bytes)] - b.offset = 0 - - if n <= len(b.bytes) { - b.bytes = b.bytes[:n] - - } else if n <= 64 { - b.bytes = make([]byte, n, 64) - - } else if n < 2*len(b.bytes) { - b.bytes = append(b.bytes, make([]byte, len(b.bytes))...) - b.bytes = b.bytes[:n] - - } else { - b.bytes = append(b.bytes, make([]byte, n-len(b.bytes))...) - b.bytes = b.bytes[:n] - } - - _, err := rd.ReadAt(b.bytes, offset) - return err -} - -func (b *buffer) reset() { - b.bytes = b.bytes[:0] - b.offset = 0 -} - -func (b *buffer) Read(c []byte) (int, error) { - if len(b.bytes) <= b.offset { - b.reset() - if len(c) == 0 { - return 0, nil - } - return 0, io.EOF - } - n := copy(c, b.bytes[b.offset:]) - b.offset += n - return n, nil -} - -func (b *buffer) ReadByte() (byte, error) { - if len(b.bytes) <= b.offset { - b.reset() - return 0, io.EOF - } - c := b.bytes[b.offset] - b.offset++ - return c, nil -} diff --git a/pkg/database/keyvalue/block/enums.yml b/pkg/database/keyvalue/block/enums.yml deleted file mode 100644 index b6dab1648..000000000 --- a/pkg/database/keyvalue/block/enums.yml +++ /dev/null @@ -1,7 +0,0 @@ -entryType: - StartBlock: - value: 1 - Record: - value: 2 - EndBlock: - value: 3 diff --git a/pkg/database/keyvalue/block/file.go b/pkg/database/keyvalue/block/file.go deleted file mode 100644 index 669299c92..000000000 --- a/pkg/database/keyvalue/block/file.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2024 The Accumulate Authors -// -// Use of this source code is governed by an MIT-style -// license that can be found in the LICENSE file or at -// https://opensource.org/licenses/MIT. - -package block - -import ( - "errors" - "io" - "io/fs" - "os" - "sync" - - "github.com/edsrzf/mmap-go" -) - -type blockFile struct { - number int - mu *sync.RWMutex - file *os.File - data mmap.MMap -} - -func openFile(number int, name string, flag int, perm fs.FileMode) (*blockFile, error) { - f := new(blockFile) - f.number = number - f.mu = new(sync.RWMutex) - - var err error - f.file, err = os.OpenFile(name, flag, perm) - if err != nil { - return nil, err - } - - if flag&os.O_CREATE == 0 { - f.data, err = mmap.Map(f.file, mmap.RDWR, 0) - if err != nil { - return nil, err - } - } - - return f, nil -} - -func (f *blockFile) Len() int { - f.mu.RLock() - defer f.mu.RUnlock() - return len(f.data) -} - -func (f *blockFile) ReadAt(b []byte, off int64) (int, error) { - f.mu.RLock() - defer f.mu.RUnlock() - - if int(off) >= len(f.data) { - return 0, io.EOF - } - - n := copy(b, f.data[off:]) - if n < len(b) { - return n, io.EOF - } - return n, nil -} - -func (f *blockFile) Remap() error { - f.mu.Lock() - defer f.mu.Unlock() - - var err error - if f.data != nil { - err = f.data.Unmap() - f.data = nil - } - if err != nil { - return err - } - - f.data, err = mmap.Map(f.file, mmap.RDWR, 0) - return err -} - -func (f *blockFile) Close() error { - f.mu.Lock() - defer f.mu.Unlock() - - var errs []error - if f.data != nil { - errs = append(errs, f.data.Unmap()) - } - errs = append(errs, f.file.Close()) - - f.data = nil - f.file = nil - return errors.Join(errs...) -} diff --git a/pkg/database/keyvalue/block/format.go b/pkg/database/keyvalue/block/format.go index 05cbb55bc..10765ec34 100644 --- a/pkg/database/keyvalue/block/format.go +++ b/pkg/database/keyvalue/block/format.go @@ -15,18 +15,42 @@ import ( var DefaultNameFormat = simpleNameFormat{} type NameFormat interface { - Format(int) string - Parse(string) (int, error) + Format(*blockID) string + Parse(string) (*blockID, error) } type simpleNameFormat struct{} -func (simpleNameFormat) Format(i int) string { - return fmt.Sprintf("%d.blocks", i) +const dotBlocks = ".blocks" + +func (simpleNameFormat) Format(b *blockID) string { + if b.Part == 0 { + return fmt.Sprintf("%d"+dotBlocks, b.ID) + } + return fmt.Sprintf("%d-%d"+dotBlocks, b.ID, b.Part) } -func (simpleNameFormat) Parse(s string) (int, error) { - s = strings.TrimSuffix(s, ".blocks") - i, err := strconv.ParseInt(s, 10, 64) - return int(i), err +func (simpleNameFormat) Parse(s string) (*blockID, error) { + if !strings.HasSuffix(s, dotBlocks) { + return nil, fmt.Errorf("%q is not a block file", s) + } + + b := strings.Split(s[:len(s)-len(dotBlocks)], "-") + if len(b) > 2 { + return nil, fmt.Errorf("%q is not a block file", s) + } + + i, err := strconv.ParseUint(b[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("%q is not a block file: %w", s, err) + } + if len(b) == 1 { + return &blockID{ID: i}, nil + } + + j, err := strconv.ParseUint(b[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("%q is not a block file: %w", s, err) + } + return &blockID{ID: i, Part: j}, nil } diff --git a/pkg/database/keyvalue/block/index_file.go b/pkg/database/keyvalue/block/index_file.go new file mode 100644 index 000000000..1cc4e4f1b --- /dev/null +++ b/pkg/database/keyvalue/block/index_file.go @@ -0,0 +1,258 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + stdbin "encoding/binary" + "fmt" + "io/fs" + "os" + "path/filepath" + "sort" + "sync/atomic" + + "gitlab.com/accumulatenetwork/accumulate/exp/ioutil" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" +) + +type indexFile struct { + level int + file *ioutil.MappedFile + count atomic.Int64 +} + +func newIndexFile(name string, level int) (_ *indexFile, err error) { + // Ensure the directory exists + err = os.Mkdir(filepath.Dir(name), 0700) + if err != nil && !errors.Is(err, fs.ErrExist) { + return nil, err + } + + f := new(indexFile) + f.level = level + f.file, err = ioutil.OpenMappedFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return nil, err + } + defer closeIfError(&err, f) + + // Always allocate 1024 entries + h := f.file.Acquire() + defer h.Release() + err = h.Truncate(indexFileSize) + if err != nil { + return nil, err + } + + return f, err +} + +func openIndexFile(name string, level int) (_ *indexFile, err error) { + f := new(indexFile) + f.level = level + f.file, err = ioutil.OpenMappedFile(name, os.O_RDWR, 0600) + if err != nil { + return nil, err + } + defer closeIfError(&err, f) + + h := f.file.Acquire() + defer h.Release() + if h.Len() != indexFileSize { + return nil, fmt.Errorf("invalid size: want %d, got %d", indexFileSize, h.Len()) + } + + // Find the empty region at the end of the file and use that to determine + // the number of entries + data := h.Raw() + f.count.Store(int64(sort.Search(indexFileEntryCount, func(i int) bool { + offset := int64(i) * indexFileEntrySize + return [32]byte(data[offset:]) == [32]byte{} + }))) + + return f, err +} + +func (f *indexFile) Close() error { + return f.file.Close() +} + +func (f *indexFile) get(hash [32]byte) (*recordLocation, bool) { + h := f.file.Acquire() + defer h.Release() + data := h.Raw() + + count := f.count.Load() + index, match := searchIndex(data, 0, count, hash) + if !match { + return nil, false + } + + dec := poolDecoder.Get() + defer poolDecoder.Put(dec) + + loc := poolLocation.Get() + offset := index * indexFileEntrySize + readLocation((*[32]byte)(data[offset+32:]), loc) + return loc, true +} + +func (f *indexFile) forEach(fn func([32]byte, *recordLocation) error) error { + h := f.file.Acquire() + defer h.Release() + + dec := poolDecoder.Get() + defer poolDecoder.Put(dec) + + var hash [64]byte + loc := new(recordLocation) + rd := h.AcquireRange(0, 0) + defer rd.Release() + for i, n := 0, f.count.Load(); i < int(n); i++ { + offset := int64(i) * indexFileEntrySize + _, err := h.ReadAt(hash[:], offset) + if err != nil { + return err + } + + readLocation((*[32]byte)(hash[32:]), loc) + err = fn([32]byte(hash[:]), loc) + if err != nil { + return err + } + } + + return nil +} + +func (f *indexFile) commit(entries []*locationAndHash) (indexFileNode, error) { + h := f.file.Acquire() + defer h.Release() + data := h.Raw() + + // Overwrite existing entries and determine where to insert new ones + indices, entries := f.findAndWrite(data, entries) + if len(entries) == 0 { + return f, nil + } + + // Does the file need to be split? + if int(f.count.Load())+len(indices) <= len(data)/indexFileEntrySize { + // No, insert the new entries + f.insert(data, indices, entries) + return f, nil + } + + set, err := newIndexFileSet(f) + if err != nil { + return nil, err + } + + // We have to release before closing the file to avoid a deadlock + h.Release() + + path := f.file.Name() + err = errors.Join( + f.Close(), + os.Remove(path), + ) + if err != nil { + return nil, err + } + + return set.commit(entries) +} + +func (f *indexFile) findAndWrite(data []byte, entries []*locationAndHash) ([]int64, []*locationAndHash) { + var indices []int64 + var insert []*locationAndHash + count := f.count.Load() + for _, e := range entries { + // Find the entry or the insertion point + index, matches := searchIndex(data, 0, count, e.Hash) + if matches { + // Overwrite the previous value + offset := index * indexFileEntrySize + writeLocation((*[32]byte)(data[offset+32:]), e.Location) + continue + } + + // Sanity check + if len(indices) > 0 && index < indices[len(indices)-1] { + // The entries and/or the index is out of order + panic("data is not sane") + } + + // Value must be inserted + indices = append(indices, index) + insert = append(insert, e) + } + return indices, insert +} + +func (f *indexFile) insert(data []byte, indices []int64, entries []*locationAndHash) { + // Relocate existing entries to make space for the new ones + var ranges [][2]int64 + count := f.count.Load() + for _, i := range indices { + if i >= count { + continue + } + if len(ranges) == 0 || ranges[len(ranges)-1][0] != i { + ranges = append(ranges, [2]int64{i, 1}) + } else { + ranges[len(ranges)-1][1]++ + } + } + + // b := unsafe.Slice((*[64]byte)(unsafe.Pointer(unsafe.SliceData(data))), len(data)/64) + // _ = b + + if len(ranges) > 0 { + for i := range ranges[1:] { + ranges[i+1][1] += ranges[i][1] + } + } + + // Aggregate neighbors + last := count + for i := len(ranges) - 1; i >= 0; i-- { + x, n := ranges[i][0], ranges[i][1] + start, end := x*indexFileEntrySize, last*indexFileEntrySize + dest := n * indexFileEntrySize + copy(data[start+dest:end+dest], data[start:end]) + last = x + } + + // Insert the new entries + for i, e := range entries { + offset := (int64(i) + indices[i]) * indexFileEntrySize + copy(data[offset:], e.Hash[:]) + writeLocation((*[32]byte)(data[offset+32:]), e.Location) + } + + f.count.Add(int64(len(entries))) +} + +func writeLocation(buf *[32]byte, loc *recordLocation) { + stdbin.BigEndian.PutUint64(buf[:], loc.Block.ID) + stdbin.BigEndian.PutUint64(buf[8:], uint64(loc.Offset)) + stdbin.BigEndian.PutUint16(buf[16:], uint16(loc.Block.Part)) + stdbin.BigEndian.PutUint16(buf[18:], uint16(loc.HeaderLen)) + stdbin.BigEndian.PutUint32(buf[20:], uint32(loc.RecordLen)) +} + +func readLocation(buf *[32]byte, loc *recordLocation) { + if loc.Block == nil { + loc.Block = new(blockID) + } + loc.Block.ID = stdbin.BigEndian.Uint64(buf[:]) + loc.Offset = int64(stdbin.BigEndian.Uint64(buf[8:])) + loc.Block.Part = uint64(stdbin.BigEndian.Uint16(buf[16:])) + loc.HeaderLen = int64(int16(stdbin.BigEndian.Uint16(buf[18:]))) + loc.RecordLen = int64(int32(stdbin.BigEndian.Uint32(buf[20:]))) +} diff --git a/pkg/database/keyvalue/block/index_file_set.go b/pkg/database/keyvalue/block/index_file_set.go new file mode 100644 index 000000000..26e4dc768 --- /dev/null +++ b/pkg/database/keyvalue/block/index_file_set.go @@ -0,0 +1,131 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "errors" + "fmt" + "path/filepath" + "strings" +) + +type indexFileSet struct { + level int + children [16]indexFileNode +} + +func newIndexFileSet(f *indexFile) (_ *indexFileSet, err error) { + dir := filepath.Dir(f.file.Name()) + prefix := strings.TrimPrefix(filepath.Base(f.file.Name()), indexFilePrefix) + set := new(indexFileSet) + set.level = f.level + defer closeIfError(&err, set) + + src := f.file.Acquire() + defer src.Release() + + var j int + var entry [64]byte + var entryOk bool + count := f.count.Load() + + for i := range set.children { + name := fmt.Sprintf("%s%s%x", indexFilePrefix, prefix, i) + g, err := newIndexFile(filepath.Join(dir, name), f.level+1) + if err != nil { + return nil, err + } + set.children[i] = g + + dst := g.file.Acquire() + defer dst.Release() + + for j < int(count) { + if !entryOk { + entryOk = true + j++ + _, err = src.Read(entry[:]) + if err != nil { + return nil, err + } + } + + if indexFor(entry[:], f.level) == byte(i) { + entryOk = false + g.count.Add(1) + _, err = dst.Write(entry[:]) + if err != nil { + return nil, err + } + } else { + break + } + } + } + + return set, nil +} + +func (s *indexFileSet) Close() error { + var errs []error + for _, n := range s.children { + if n != nil { + errs = append(errs, n.Close()) + } + } + return errors.Join(errs...) +} + +func (s *indexFileSet) get(key [32]byte) (*recordLocation, bool) { + i := indexFor(key[:], s.level) + return s.children[i].get(key) +} + +func (s *indexFileSet) forEach(fn func([32]byte, *recordLocation) error) error { + for _, n := range s.children { + err := n.forEach(fn) + if err != nil { + return err + } + } + return nil +} + +func (s *indexFileSet) commit(entries []*locationAndHash) (indexFileNode, error) { + type Range struct { + Index byte + Start int + End int + } + + var ranges []Range + for i, e := range entries { + x := indexFor(e.Hash[:], s.level) + if len(ranges) == 0 || ranges[len(ranges)-1].Index != x { + ranges = append(ranges, Range{Index: x, Start: i, End: i + 1}) + } else { + ranges[len(ranges)-1].End = i + 1 + } + } + + for _, r := range ranges { + n, err := s.children[r.Index].commit(entries[r.Start:r.End]) + if err != nil { + return nil, err + } + s.children[r.Index] = n + } + return s, nil +} + +func indexFor(hash []byte, level int) byte { + b := hash[level/2] + if level%2 == 0 { + return b >> 4 + } + return b & 0xF +} diff --git a/pkg/database/keyvalue/block/index_file_test.go b/pkg/database/keyvalue/block/index_file_test.go new file mode 100644 index 000000000..83b256f18 --- /dev/null +++ b/pkg/database/keyvalue/block/index_file_test.go @@ -0,0 +1,130 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "bytes" + "fmt" + "path/filepath" + "slices" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/accumulatenetwork/accumulate/internal/database/smt/common" +) + +func TestIndex(t *testing.T) { + dir := t.TempDir() + x, err := openIndexFileTree(&config{path: dir}) + require.NoError(t, err) + + var rh common.RandHash + const N = 20000 + hashes := make([][32]byte, 0, N) + for len(hashes) < N { + // Make a new hash or pick a random old hash + var hash [32]byte + if len(hashes) > N/10 && rh.GetIntN(3) == 0 { + hash = hashes[rh.GetIntN(len(hashes))] + } else { + hash = rh.NextA() + hashes = append(hashes, hash) + } + + err = x.Commit(map[[32]byte]*recordLocation{hash: { + Block: &blockID{}, + Offset: int64(hash[0]), + }}) + if err != nil { + break + } + } + require.NoError(t, err) + + for _, h := range hashes { + loc, ok := x.Get(h) + require.Truef(t, ok, "Get %x", h) + require.EqualValuesf(t, h[0], loc.Offset, "Check %x") + } + + require.NoError(t, x.Close()) + x, err = openIndexFileTree(&config{path: dir}) + require.NoError(t, err) + + for _, h := range hashes { + loc, ok := x.Get(h) + require.True(t, ok) + require.Truef(t, ok, "Get %x", h) + require.EqualValuesf(t, h[0], loc.Offset, "Check %x") + } + require.NoError(t, x.Close()) +} + +func BenchmarkPut(b *testing.B) { + N := []int{7, 8, 9, 10} + for _, N := range N { + N := 1 << N + b.Run(fmt.Sprint(N), func(b *testing.B) { + var rh common.RandHash + for i := range b.N { + f, err := newIndexFile(filepath.Join(b.TempDir(), fmt.Sprintf("index-%d", i)), 0) + require.NoError(b, err) + + b.SetBytes(int64(N)) + for range N { + _, err = f.commit([]*locationAndHash{{ + Hash: rh.NextA(), + Location: &recordLocation{Block: &blockID{}}, + }}) + if err != nil { + break + } + } + require.NoError(b, err) + } + }) + } +} + +func BenchmarkGet(b *testing.B) { + N := []int{1, 10, 100, 1000} + for _, N := range N { + b.Run(fmt.Sprint(N), func(b *testing.B) { + var rh common.RandHash + hashes := make([][32]byte, N) + for i := range hashes { + hashes[i] = rh.NextA() + } + + f, err := newIndexFile(filepath.Join(b.TempDir(), "index"), 0) + require.NoError(b, err) + + slices.SortFunc(hashes, func(a, b [32]byte) int { + return bytes.Compare(a[:], b[:]) + }) + + for _, h := range hashes { + _, err = f.commit([]*locationAndHash{{ + Hash: h, + Location: &recordLocation{Block: &blockID{}}, + }}) + require.NoError(b, err) + } + + b.ResetTimer() + var ok bool + for range b.N { + h := hashes[rh.GetIntN(N)] + _, ok = f.get(h) + if !ok { + break + } + } + require.True(b, ok) + }) + } +} diff --git a/pkg/database/keyvalue/block/index_file_tree.go b/pkg/database/keyvalue/block/index_file_tree.go new file mode 100644 index 000000000..474e265b3 --- /dev/null +++ b/pkg/database/keyvalue/block/index_file_tree.go @@ -0,0 +1,188 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "bytes" + "io/fs" + "os" + "path/filepath" + "regexp" + "slices" + "strconv" + "strings" + "sync" + + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" +) + +const indexFileEntrySize = 64 +const indexFileEntryCount = 1 << 10 +const indexFileSize = indexFileEntrySize * indexFileEntryCount +const indexFilePrefix = "index-" + +type indexFileTree struct { + mu *sync.RWMutex + config *config + root indexFileNode +} + +type indexFileNode interface { + Close() error + get(key [32]byte) (*recordLocation, bool) + forEach(fn func([32]byte, *recordLocation) error) error + commit(centries []*locationAndHash) (indexFileNode, error) +} + +type emptyIndexFileTree string + +type locationAndHash struct { + Hash [32]byte + Location *recordLocation +} + +var reHex = regexp.MustCompile(`^-?[0-9a-f]*`) + +func openIndexFileTree(cfg *config) (_ *indexFileTree, err error) { + s := new(indexFileTree) + s.mu = new(sync.RWMutex) + s.config = cfg + defer closeIfError(&err, s) + + entries, err := os.ReadDir(cfg.path) + switch { + case err == nil, + errors.Is(err, fs.ErrNotExist): + // Directory exists, or doesn't + + default: + // Some other error + return nil, err + } + + emap := map[string]bool{} + for _, e := range entries { + s := strings.TrimPrefix(e.Name(), indexFilePrefix) + if s == e.Name() || !reHex.MatchString(s) { + continue + } + emap[s] = true + } + + s.root, err = openIndexFileNode(cfg.path, 0, emap) + if err != nil { + return nil, err + } + + return s, nil +} + +func openIndexFileNode(dir string, level int, files map[string]bool) (indexFileNode, error) { + if level == 0 && len(files) == 0 { + return emptyIndexFileTree(dir), nil + } + + if len(files) == 1 { + var name string + for name = range files { + } + return openIndexFile(filepath.Join(dir, indexFilePrefix+name), level) + } + + if len(files) < 16 { + return nil, errors.InternalError.With("corrupted index") + } + + // Partition the file list + parts := [16]map[string]bool{} + for i := range parts { + parts[i] = map[string]bool{} + } + for file := range files { + // If there's an old index file hanging around that didn't get deleted + // somehow, just ignore it + if len(file) <= level { + continue + } + + i, err := strconv.ParseUint(file[level:level+1], 16, 4) + if err != nil { + panic("internal error: invalid file name") + } + parts[i][file] = true + } + + s := &indexFileSet{level: level} + for i, files := range parts { + var err error + s.children[i], err = openIndexFileNode(dir, level+1, files) + if err != nil { + return nil, err + } + } + return s, nil +} + +func (s *indexFileTree) Close() error { + if s == nil || s.root == nil { + return nil + } + return s.root.Close() +} + +func (s *indexFileTree) Get(key [32]byte) (*recordLocation, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + return s.root.get(key) +} + +func (s *indexFileTree) ForEach(fn func([32]byte, *recordLocation) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.root.forEach(fn) +} + +func (s *indexFileTree) Commit(entries map[[32]byte]*recordLocation) error { + if len(entries) == 0 { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + list := make([]*locationAndHash, 0, len(entries)) + for hash, loc := range entries { + if loc.Block == nil { + panic("invalid location: missing block") + } + list = append(list, &locationAndHash{Hash: hash, Location: loc}) + } + + slices.SortFunc(list, func(a, b *locationAndHash) int { + return bytes.Compare(a.Hash[:], b.Hash[:]) + }) + + n, err := s.root.commit(list) + if err != nil { + return err + } + + s.root = n + return nil +} + +func (emptyIndexFileTree) Close() error { return nil } +func (emptyIndexFileTree) get(key [32]byte) (*recordLocation, bool) { return nil, false } +func (emptyIndexFileTree) forEach(fn func([32]byte, *recordLocation) error) error { return nil } + +func (e emptyIndexFileTree) commit(entries []*locationAndHash) (indexFileNode, error) { + f, err := newIndexFile(filepath.Join(string(e), indexFilePrefix), 0) + if err != nil { + return nil, err + } + return f.commit(entries) +} diff --git a/pkg/database/keyvalue/block/record_file.go b/pkg/database/keyvalue/block/record_file.go new file mode 100644 index 000000000..4172cb348 --- /dev/null +++ b/pkg/database/keyvalue/block/record_file.go @@ -0,0 +1,126 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + stdbin "encoding/binary" + "os" + "sync/atomic" + + "gitlab.com/accumulatenetwork/accumulate/exp/ioutil" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" +) + +const recordFileHeaderSize = 1024 +const recordFilePageSize = 1 << 16 + +type recordFile struct { + file *ioutil.MappedFile + used atomic.Int64 +} + +func newRecordFile(name string) (_ *recordFile, err error) { + f := new(recordFile) + f.file, err = ioutil.OpenMappedFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) + if err != nil { + return nil, err + } + defer closeIfError(&err, f) + + // Write the header + h := f.file.Acquire() + defer h.Release() + err = h.Truncate(recordFileHeaderSize) + if err != nil { + return nil, err + } + + f.used.Store(recordFileHeaderSize) + return f, (&recordFileWriter{&f.used, h}).Close() +} + +func openRecordFile(name string) (_ *recordFile, err error) { + f := new(recordFile) + f.file, err = ioutil.OpenMappedFile(name, os.O_RDWR, 0600) + if err != nil { + return nil, err + } + defer closeIfError(&err, f) + + // Decode the header + b := make([]byte, recordFileHeaderSize) + h := f.file.Acquire() + defer h.Release() + _, err = h.ReadAt(b, 0) + if err != nil { + return nil, err + } + header := new(fileHeader) + err = header.UnmarshalBinary(b) + if err != nil { + return nil, err + } + + // Determine the end of the file + h.Offset = recordFileHeaderSize + for h.Len() > 4 { + n := stdbin.BigEndian.Uint32(h.Raw()) + if n == 0 { + break + } + h.Offset += int64(4 + n) + } + f.used.Store(h.Offset) + + return f, nil +} + +func (f *recordFile) Close() error { + return f.file.Close() +} + +func (f *recordFile) ReadHeader(l *recordLocation) (*recordEntry, error) { + h := f.file.Acquire() + defer h.Release() + if l.end() >= f.used.Load() { + return nil, errors.InternalError.With("corrupted: record is past the end of the file") + } + + rd := h.AcquireRange(l.Offset, l.Offset+l.HeaderLen) + defer rd.Release() + + dec := poolDecoder.Get() + defer poolDecoder.Put(dec) + dec.Reset(rd, binary.LeaveTrailing()) + + e := new(recordEntry) + err := e.UnmarshalBinaryV2(dec) + return e, err +} + +func (f *recordFile) ReadRecord(l *recordLocation) ([]byte, error) { + if l.RecordLen < 0 { + panic("record was deleted") + } + h := f.file.Acquire() + defer h.Release() + if l.end() >= f.used.Load() { + return nil, errors.InternalError.With("corrupted: record is past the end of the file") + } + b := make([]byte, l.RecordLen) + _, err := h.ReadAt(b, l.Offset+l.HeaderLen) + return b, err +} + +func (l *recordLocation) end() int64 { + x := l.Offset + l.HeaderLen + if l.RecordLen > 0 { + x += l.RecordLen + } + return x +} diff --git a/pkg/database/keyvalue/block/record_file_set.go b/pkg/database/keyvalue/block/record_file_set.go new file mode 100644 index 000000000..7d172dc59 --- /dev/null +++ b/pkg/database/keyvalue/block/record_file_set.go @@ -0,0 +1,186 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" +) + +type recordFileSet struct { + config *config + files []*recordFile +} + +type entryAndData struct { + recordEntry + Value []byte +} + +func openRecordFileSet(cfg *config) (_ *recordFileSet, err error) { + s := new(recordFileSet) + s.config = cfg + defer closeIfError(&err, s) + + // List all the entries + entries, err := os.ReadDir(cfg.path) + switch { + case err == nil, + errors.Is(err, fs.ErrNotExist): + // Directory exists, or doesn't + + default: + // Some other error + return nil, err + } + + // Open each file + for _, e := range entries { + if cfg.filterFn != nil && !cfg.filterFn(e.Name()) { + continue + } + + // Skip files that aren't block files + _, err := cfg.nameFmt.Parse(e.Name()) + if err != nil { + continue + } + + f, err := openRecordFile(filepath.Join(cfg.path, e.Name())) + if err != nil { + return nil, err + } + + s.files = append(s.files, f) + } + return s, nil +} + +func (s *recordFileSet) Close() error { + if s == nil { + return nil + } + var errs []error + for _, f := range s.files { + errs = append(errs, f.Close()) + } + return errors.Join(errs...) +} + +func (s *recordFileSet) Commit(view *databaseView, entries []*entryAndData) error { + rw := new(recordWriter) + rw.encBuf = poolBuffer.Get() + rw.enc = poolEncoder.Get() + defer poolBuffer.Put(rw.encBuf) + defer poolEncoder.Put(rw.enc) + + block := new(blockID) + block.ID = s.config.nextBlock.Add(1) + + write := func(f *recordFile) (int, error) { + // Check the limit + if f.used.Load() > s.config.fileLimit { + return 0, nil + } + + // Write the start of block marker + h := f.file.Acquire() + defer h.Release() + fw := &recordFileWriter{&f.used, h} + _, err := rw.Write(fw, &startBlockEntry{blockID: *block}, nil) + if err != nil { + return 0, err + } + + var n int + for ; n < len(entries) && f.used.Load() < s.config.fileLimit; n++ { + e := entries[n] + + // Write the entry + loc, err := rw.Write(fw, &e.recordEntry, e.Value) + if err != nil { + return 0, err + } + + // Update the index + loc.RecordLen = e.Length + loc.Block = block.Copy() + view.records.Put(e.KeyHash, loc) + } + + // Close the block + _, err = rw.Write(fw, &endBlockEntry{}, nil) + if err != nil { + return 0, err + } + + // Update the header + err = fw.Close() + if err != nil { + return 0, err + } + + return n, nil + } + + var file *recordFile + for len(entries) > 0 { + // If it's the first time and there are existing files, get the last + // file. Otherwise make a new file. + var err error + if file == nil && len(s.files) > 0 { + file = s.files[len(s.files)-1] + + } else { + file, err = s.new(block) + if err != nil { + return err + } + } + + n, err := write(file) + if err != nil { + return err + } + if n == 0 { + continue + } + + view.blocks.Put(*block, len(s.files)-1) + entries = entries[n:] + block.Part++ + } + + return nil +} + +func (s *recordFileSet) new(block *blockID) (*recordFile, error) { + // Ensure the directory exists + err := os.Mkdir(s.config.path, 0700) + if err != nil && !errors.Is(err, fs.ErrExist) { + return nil, err + } + + // Create a new file + name := s.config.nameFmt.Format(block) + if name == "" { + return nil, fmt.Errorf("invalid file name: empty") + } else if filepath.Base(name) != name { + return nil, fmt.Errorf("invalid file name: %q contains a slash or is empty", name) + } + + f, err := newRecordFile(filepath.Join(s.config.path, name)) + if err != nil { + return nil, err + } + + s.files = append(s.files, f) + return f, nil +} diff --git a/pkg/database/keyvalue/block/record_file_writer.go b/pkg/database/keyvalue/block/record_file_writer.go new file mode 100644 index 000000000..15c4ac91d --- /dev/null +++ b/pkg/database/keyvalue/block/record_file_writer.go @@ -0,0 +1,60 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "fmt" + "sync/atomic" + + "gitlab.com/accumulatenetwork/accumulate/exp/ioutil" + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" +) + +type recordFileWriter struct { + used *atomic.Int64 + file *ioutil.MappedFileRange +} + +func (w *recordFileWriter) Len() int { + return int(w.used.Load()) +} + +func (w *recordFileWriter) Write(b []byte) (int, error) { + offset := w.used.Load() + size := offset + int64(len(b)) + if r := size % recordFilePageSize; r != 0 { + size += recordFilePageSize - r + } + + err := w.file.Truncate(size) + if err != nil { + return 0, err + } + + m, err := w.file.WriteAt(b, offset) + if err != nil { + return 0, err + } + + w.used.Add(int64(m)) + return m, nil +} + +func (w *recordFileWriter) Close() error { + header := &fileHeader{} + b, err := header.MarshalBinary() + if err != nil { + panic(fmt.Errorf("encode header: %w", err)) + } + b = append(b, binary.EmptyObject) + if len(b) > recordFileHeaderSize { + panic("header is too big") + } + + _, err = w.file.WriteAt(b, 0) + return err +} diff --git a/pkg/database/keyvalue/block/record_iterator.go b/pkg/database/keyvalue/block/record_iterator.go new file mode 100644 index 000000000..cc2e38c7a --- /dev/null +++ b/pkg/database/keyvalue/block/record_iterator.go @@ -0,0 +1,134 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + stdbin "encoding/binary" + "errors" + "io" + + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" +) + +type recordFileSetIterator struct { + set *recordFileSet + want func(entryType) bool + err error +} + +type recordFileIterator struct { + file *recordFile + want func(entryType) bool + err error +} + +type recordFilePos struct { + File *recordFile + FileIndex int + recordPos +} + +type recordPos struct { + Entry entry + Start int64 + End int64 +} + +func (f *recordFileSet) entries(want func(typ entryType) bool) *recordFileSetIterator { + return &recordFileSetIterator{f, want, nil} +} + +func (f *recordFile) entries(want func(typ entryType) bool) *recordFileIterator { + return &recordFileIterator{f, want, nil} +} + +func (it *recordFileSetIterator) Range(yield func(int, recordFilePos) bool) { + var j int + for i, file := range it.set.files { + fit := file.entries(it.want) + cont := fit.Range(func(_ int, pos recordPos) bool { + cont := yield(j, recordFilePos{file, i, pos}) + j++ + return cont + }) + if !cont || fit.err != nil { + it.err = fit.err + break + } + } +} + +func (it *recordFileIterator) Range(yield func(int, recordPos) bool) bool { + rd := it.file.file.AcquireRange(0, recordFileHeaderSize) + defer rd.Release() + + dec := poolDecoder.Get() + defer poolDecoder.Put(dec) + + var typ entryType + for i := 0; rd.Offset < it.file.used.Load(); i++ { + rd.Offset = rd.End + rd.End += 4 + + // Read the length + n := stdbin.BigEndian.Uint32(rd.Raw()) + rd.Offset = rd.End + start := rd.Offset + rd.End = rd.Offset + int64(n) + + if it.want != nil { + // Read the entry type + dec.Reset(rd, binary.LeaveTrailing()) + err := dec.StartObject() + if err != nil { + it.err = err + return false + } + id, err := dec.Field() + switch { + case err == nil && id == 1: + // Ok + case err == nil /* and id != 1 */ || errors.Is(err, io.EOF): + it.err = errors.New("field Type is missing") + return false + default: + it.err = err + return false + } + + err = typ.UnmarshalBinaryV2(dec) + if err != nil { + it.err = err + return false + } + + if !it.want(typ) { + continue + } + } + + // Read the entry + rd.Offset = start + dec.Reset(rd, binary.LeaveTrailing()) + entry, err := unmarshalEntryBinaryV2(dec) + if err != nil { + it.err = err + return false + } + + // Yield + pos := recordPos{ + Entry: entry, + Start: start, + End: rd.Offset, + } + if !yield(i, pos) { + return false + } + } + return true +} diff --git a/pkg/database/keyvalue/block/record_writer.go b/pkg/database/keyvalue/block/record_writer.go new file mode 100644 index 000000000..2d778e1a0 --- /dev/null +++ b/pkg/database/keyvalue/block/record_writer.go @@ -0,0 +1,53 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "bytes" + stdbin "encoding/binary" + "errors" + "io" + + "gitlab.com/accumulatenetwork/core/schema/pkg/binary" +) + +type lenWriter interface { + io.Writer + Len() int +} + +type recordWriter struct { + encBuf *bytes.Buffer + enc *binary.Encoder + lenBuf [4]byte +} + +func (w *recordWriter) Write(wr lenWriter, e entry, record []byte) (*recordLocation, error) { + // Encode the entry + w.encBuf.Reset() + w.enc.Reset(w.encBuf) + err := w.enc.Encode(e) + if err != nil { + return nil, err + } + if len(record) > 0 { + _ = w.encBuf.WriteByte(binary.EmptyObject) + } + + // Encode the length + loc := new(recordLocation) + loc.HeaderLen = int64(w.encBuf.Len()) + loc.RecordLen = int64(len(record)) + stdbin.BigEndian.PutUint32(w.lenBuf[:], uint32(loc.HeaderLen+loc.RecordLen)) + + // Write the length + _, e1 := wr.Write(w.lenBuf[:]) + loc.Offset = int64(wr.Len()) + _, e2 := io.Copy(wr, w.encBuf) + _, e3 := wr.Write(record) + return loc, errors.Join(e1, e2, e3) +} diff --git a/pkg/database/keyvalue/block/schema.yml b/pkg/database/keyvalue/block/schema.yml index 85ee89b65..dd9b23bba 100644 --- a/pkg/database/keyvalue/block/schema.yml +++ b/pkg/database/keyvalue/block/schema.yml @@ -2,7 +2,6 @@ $generate: widgets: true methods: - json: true binary: true import: @@ -12,6 +11,23 @@ $generate: schema: s widget: w +fileHeader: + class: composite + fields: [] + +recordLocation: + class: composite + generate: { methods: { binary: false } } + fields: + - name: Block + type: '*blockID' + - name: Offset + type: int + - name: HeaderLen + type: int + - name: RecordLen + type: int + entry: class: union discriminator: @@ -32,10 +48,14 @@ entry: - name: startBlockEntry class: composite fields: - - name: ID - type: uint - - name: Parent - type: uint + - type: + name: blockID + class: composite + fields: + - name: ID + type: uint + - name: Part + type: uint - name: recordEntry class: composite diff --git a/pkg/database/keyvalue/block/schema_gen.go b/pkg/database/keyvalue/block/schema_gen.go index 1aa618f42..7a6de6acd 100644 --- a/pkg/database/keyvalue/block/schema_gen.go +++ b/pkg/database/keyvalue/block/schema_gen.go @@ -10,16 +10,35 @@ import ( ) var ( + sblockID schema.Methods[*blockID, *blockID, *schema.CompositeType] sendBlockEntry schema.Methods[*endBlockEntry, *endBlockEntry, *schema.CompositeType] sentry schema.Methods[entry, *entry, *schema.UnionType] sentryType schema.EnumMethods[entryType] + sfileHeader schema.Methods[*fileHeader, *fileHeader, *schema.CompositeType] srecordEntry schema.Methods[*recordEntry, *recordEntry, *schema.CompositeType] + srecordLocation schema.Methods[*recordLocation, *recordLocation, *schema.CompositeType] sstartBlockEntry schema.Methods[*startBlockEntry, *startBlockEntry, *schema.CompositeType] ) func init() { var deferredTypes schema.ResolverSet + sblockID = schema.WithMethods[*blockID, *blockID](&schema.CompositeType{ + TypeBase: schema.TypeBase{ + Name: "blockID", + }, + Fields: []*schema.Field{ + { + Name: "ID", + Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + }, + { + Name: "Part", + Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + }, + }, + }).SetGoType() + sendBlockEntry = schema.WithMethods[*endBlockEntry, *endBlockEntry](&schema.CompositeType{ TypeBase: schema.TypeBase{ Name: "endBlockEntry", @@ -71,6 +90,12 @@ func init() { }, }).SetGoType() + sfileHeader = schema.WithMethods[*fileHeader, *fileHeader](&schema.CompositeType{ + TypeBase: schema.TypeBase{ + Name: "fileHeader", + }, + }).SetGoType() + srecordEntry = schema.WithMethods[*recordEntry, *recordEntry](&schema.CompositeType{ TypeBase: schema.TypeBase{ Name: "recordEntry", @@ -94,27 +119,55 @@ func init() { }, }).SetGoType() - sstartBlockEntry = schema.WithMethods[*startBlockEntry, *startBlockEntry](&schema.CompositeType{ + srecordLocation = schema.WithMethods[*recordLocation, *recordLocation](&schema.CompositeType{ TypeBase: schema.TypeBase{ - Name: "startBlockEntry", + Name: "recordLocation", + Generate: schema.MapValue{ + "methods": schema.MapValue{ + "binary": schema.BooleanValue(false), + }, + }, }, Fields: []*schema.Field{ { - Name: "ID", - Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + Name: "Block", + Type: (&schema.PointerType{ + TypeBase: schema.TypeBase{}, + }). + ResolveElemTo(&deferredTypes, "blockID"), }, { - Name: "Parent", - Type: &schema.SimpleType{Type: schema.SimpleTypeUint}, + Name: "Offset", + Type: &schema.SimpleType{Type: schema.SimpleTypeInt}, + }, + { + Name: "HeaderLen", + Type: &schema.SimpleType{Type: schema.SimpleTypeInt}, + }, + { + Name: "RecordLen", + Type: &schema.SimpleType{Type: schema.SimpleTypeInt}, }, }, }).SetGoType() + sstartBlockEntry = schema.WithMethods[*startBlockEntry, *startBlockEntry](&schema.CompositeType{ + TypeBase: schema.TypeBase{ + Name: "startBlockEntry", + }, + Fields: []*schema.Field{ + (&schema.Field{}).ResolveTo(&deferredTypes, "blockID"), + }, + }).SetGoType() + s, err := schema.New( + sblockID.Type, sendBlockEntry.Type, sentry.Type, sentryType.Type, + sfileHeader.Type, srecordEntry.Type, + srecordLocation.Type, sstartBlockEntry.Type, ) if err != nil { @@ -127,7 +180,6 @@ func init() { }, "methods": schema.MapValue{ "binary": schema.BooleanValue(true), - "json": schema.BooleanValue(true), }, "varPrefix": schema.MapValue{ "schema": schema.StringValue("s"), diff --git a/pkg/database/keyvalue/block/types.go b/pkg/database/keyvalue/block/types.go index b584bd109..3fba1f1b6 100644 --- a/pkg/database/keyvalue/block/types.go +++ b/pkg/database/keyvalue/block/types.go @@ -6,7 +6,10 @@ package block -import "encoding" +import ( + "encoding" + "fmt" +) //go:generate go run gitlab.com/accumulatenetwork/core/schema/cmd/generate schema schema.yml -w schema_gen.go //go:generate go run gitlab.com/accumulatenetwork/core/schema/cmd/generate types schema.yml -w types_gen.go @@ -16,3 +19,17 @@ type entry interface { Type() entryType encoding.BinaryMarshaler } + +func (b *blockID) String() string { + if b.Part == 0 { + return fmt.Sprint(b.ID) + } + return fmt.Sprintf("%d{%d}", b.ID, b.Part) +} + +func (b *blockID) Compare(c *blockID) int { + if b.ID != c.ID { + return int(b.ID - c.ID) + } + return int(b.Part - c.Part) +} diff --git a/pkg/database/keyvalue/block/types.yml b/pkg/database/keyvalue/block/types.yml deleted file mode 100644 index 25e6bcbd3..000000000 --- a/pkg/database/keyvalue/block/types.yml +++ /dev/null @@ -1,21 +0,0 @@ -startBlockEntry: - union: { type: entry, private: true } - fields: - - name: ID - type: uint - - name: Parent - type: uint - -recordEntry: - union: { type: entry, private: true } - fields: - - name: Key - type: record.Key - marshal-as: reference - pointer: true - - name: Length - type: int - -endBlockEntry: - union: { type: entry, private: true } - fields: diff --git a/pkg/database/keyvalue/block/types_gen.go b/pkg/database/keyvalue/block/types_gen.go index 3d5d29634..da563d4c1 100644 --- a/pkg/database/keyvalue/block/types_gen.go +++ b/pkg/database/keyvalue/block/types_gen.go @@ -8,55 +8,87 @@ import ( "gitlab.com/accumulatenetwork/core/schema/pkg/widget" ) +type blockID struct { + ID uint64 + Part uint64 +} + +var wblockID = widget.ForComposite(widget.Fields[blockID]{ + {Name: "iD", ID: 1, Widget: widget.ForUint(func(v *blockID) *uint64 { return &v.ID })}, + {Name: "part", ID: 2, Widget: widget.ForUint(func(v *blockID) *uint64 { return &v.Part })}, +}, widget.Identity[*blockID]) + +// Copy returns a copy of the blockID. +func (v *blockID) Copy() *blockID { + var u = new(blockID) + wblockID.CopyTo(u, v) + return u +} + +// EqualblockID returns true if V is equal to U. +func (v *blockID) Equal(u *blockID) bool { + return wblockID.Equal(v, u) +} + +// MarshalBinary marshals the blockID to bytes using [binary]. +func (v *blockID) MarshalBinary() ([]byte, error) { + return widget.MarshalBinary(v, wblockID) +} + +// MarshalBinary marshals the blockID to a [binary.Encoder]. +func (v *blockID) MarshalBinaryV2(enc *binary.Encoder) error { + return wblockID.MarshalBinary(enc, v) +} + +// UnmarshalBinary unmarshals the blockID from bytes using [binary]. +func (v *blockID) UnmarshalBinary(b []byte) error { + return widget.UnmarshalBinary(v, wblockID, b) +} + +// UnmarshalBinary unmarshals the blockID from a [binary.Decoder]. +func (v *blockID) UnmarshalBinaryV2(dec *binary.Decoder) error { + return wblockID.UnmarshalBinary(dec, v) +} + type endBlockEntry struct { } -var wendBlockEntry = widget.ForCompositePtr(widget.Fields[endBlockEntry]{ +var wendBlockEntry = widget.ForComposite(widget.Fields[endBlockEntry]{ {Name: "type", ID: 1, Widget: widget.ForTag[*entryType]("type", (*endBlockEntry).Type)}, -}, widget.Identity[**endBlockEntry]) +}, widget.Identity[*endBlockEntry]) func (endBlockEntry) Type() entryType { return entryTypeEndBlock } // Copy returns a copy of the endBlockEntry. func (v *endBlockEntry) Copy() *endBlockEntry { - var u *endBlockEntry - wendBlockEntry.CopyTo(&u, &v) + var u = new(endBlockEntry) + wendBlockEntry.CopyTo(u, v) return u } // EqualendBlockEntry returns true if V is equal to U. func (v *endBlockEntry) Equal(u *endBlockEntry) bool { - return wendBlockEntry.Equal(&v, &u) -} - -// MarshalBinary marshals the endBlockEntry to JSON. -func (v *endBlockEntry) MarshalJSON() ([]byte, error) { - return widget.MarshalJSON(&v, wendBlockEntry) -} - -// UnmarshalJSON unmarshals the endBlockEntry from JSON. -func (v *endBlockEntry) UnmarshalJSON(b []byte) error { - return widget.UnmarshalJSON(&v, wendBlockEntry, b) + return wendBlockEntry.Equal(v, u) } // MarshalBinary marshals the endBlockEntry to bytes using [binary]. func (v *endBlockEntry) MarshalBinary() ([]byte, error) { - return widget.MarshalBinary(&v, wendBlockEntry) + return widget.MarshalBinary(v, wendBlockEntry) } // MarshalBinary marshals the endBlockEntry to a [binary.Encoder]. func (v *endBlockEntry) MarshalBinaryV2(enc *binary.Encoder) error { - return wendBlockEntry.MarshalBinary(enc, &v) + return wendBlockEntry.MarshalBinary(enc, v) } // UnmarshalBinary unmarshals the endBlockEntry from bytes using [binary]. func (v *endBlockEntry) UnmarshalBinary(b []byte) error { - return widget.UnmarshalBinary(&v, wendBlockEntry, b) + return widget.UnmarshalBinary(v, wendBlockEntry, b) } // UnmarshalBinary unmarshals the endBlockEntry from a [binary.Decoder]. func (v *endBlockEntry) UnmarshalBinaryV2(dec *binary.Decoder) error { - return wendBlockEntry.UnmarshalBinary(dec, &v) + return wendBlockEntry.UnmarshalBinary(dec, v) } // TODO type entry interface {} @@ -89,13 +121,6 @@ func equalEntry(a, b entry) bool { return wentry.Equal(&a, &b) } -// unmarshalEntryJSON unmarshals a entry from JSON. -func unmarshalEntryJSON(b []byte) (entry, error) { - var v entry - err := widget.UnmarshalJSON(&v, wentry, b) - return v, err -} - // unmarshalEntryBinary unmarshals a entry from bytes using [binary]. func unmarshalEntryBinary(b []byte) (entry, error) { var v entry @@ -137,16 +162,6 @@ func (v entryType) String() string { return sentryType.String(v) } -// MarshalBinary marshals the entryType to JSON. -func (v entryType) MarshalJSON() ([]byte, error) { - return widget.MarshalJSON(&v, wentryType) -} - -// UnmarshalJSON unmarshals the entryType from JSON. -func (v *entryType) UnmarshalJSON(b []byte) error { - return widget.UnmarshalJSON(v, wentryType, b) -} - // MarshalBinary marshals the entryType to bytes using [binary]. func (v entryType) MarshalBinary() ([]byte, error) { return widget.MarshalBinary(&v, wentryType) @@ -167,114 +182,155 @@ func (v *entryType) UnmarshalBinaryV2(dec *binary.Decoder) error { return wentryType.UnmarshalBinary(dec, v) } +type fileHeader struct { +} + +var wfileHeader = widget.ForComposite(widget.Fields[fileHeader]{}, widget.Identity[*fileHeader]) + +// Copy returns a copy of the fileHeader. +func (v *fileHeader) Copy() *fileHeader { + var u = new(fileHeader) + wfileHeader.CopyTo(u, v) + return u +} + +// EqualfileHeader returns true if V is equal to U. +func (v *fileHeader) Equal(u *fileHeader) bool { + return wfileHeader.Equal(v, u) +} + +// MarshalBinary marshals the fileHeader to bytes using [binary]. +func (v *fileHeader) MarshalBinary() ([]byte, error) { + return widget.MarshalBinary(v, wfileHeader) +} + +// MarshalBinary marshals the fileHeader to a [binary.Encoder]. +func (v *fileHeader) MarshalBinaryV2(enc *binary.Encoder) error { + return wfileHeader.MarshalBinary(enc, v) +} + +// UnmarshalBinary unmarshals the fileHeader from bytes using [binary]. +func (v *fileHeader) UnmarshalBinary(b []byte) error { + return widget.UnmarshalBinary(v, wfileHeader, b) +} + +// UnmarshalBinary unmarshals the fileHeader from a [binary.Decoder]. +func (v *fileHeader) UnmarshalBinaryV2(dec *binary.Decoder) error { + return wfileHeader.UnmarshalBinary(dec, v) +} + type recordEntry struct { Key *record.Key KeyHash [32]byte Length int64 } -var wrecordEntry = widget.ForCompositePtr(widget.Fields[recordEntry]{ +var wrecordEntry = widget.ForComposite(widget.Fields[recordEntry]{ {Name: "type", ID: 1, Widget: widget.ForTag[*entryType]("type", (*recordEntry).Type)}, {Name: "key", ID: 2, Widget: widget.ForValue(func(v *recordEntry) **record.Key { return &v.Key })}, {Name: "keyHash", ID: 3, Widget: widget.ForHash(func(v *recordEntry) *[32]byte { return &v.KeyHash })}, {Name: "length", ID: 4, Widget: widget.ForInt(func(v *recordEntry) *int64 { return &v.Length })}, -}, widget.Identity[**recordEntry]) +}, widget.Identity[*recordEntry]) func (recordEntry) Type() entryType { return entryTypeRecord } // Copy returns a copy of the recordEntry. func (v *recordEntry) Copy() *recordEntry { - var u *recordEntry - wrecordEntry.CopyTo(&u, &v) + var u = new(recordEntry) + wrecordEntry.CopyTo(u, v) return u } // EqualrecordEntry returns true if V is equal to U. func (v *recordEntry) Equal(u *recordEntry) bool { - return wrecordEntry.Equal(&v, &u) -} - -// MarshalBinary marshals the recordEntry to JSON. -func (v *recordEntry) MarshalJSON() ([]byte, error) { - return widget.MarshalJSON(&v, wrecordEntry) -} - -// UnmarshalJSON unmarshals the recordEntry from JSON. -func (v *recordEntry) UnmarshalJSON(b []byte) error { - return widget.UnmarshalJSON(&v, wrecordEntry, b) + return wrecordEntry.Equal(v, u) } // MarshalBinary marshals the recordEntry to bytes using [binary]. func (v *recordEntry) MarshalBinary() ([]byte, error) { - return widget.MarshalBinary(&v, wrecordEntry) + return widget.MarshalBinary(v, wrecordEntry) } // MarshalBinary marshals the recordEntry to a [binary.Encoder]. func (v *recordEntry) MarshalBinaryV2(enc *binary.Encoder) error { - return wrecordEntry.MarshalBinary(enc, &v) + return wrecordEntry.MarshalBinary(enc, v) } // UnmarshalBinary unmarshals the recordEntry from bytes using [binary]. func (v *recordEntry) UnmarshalBinary(b []byte) error { - return widget.UnmarshalBinary(&v, wrecordEntry, b) + return widget.UnmarshalBinary(v, wrecordEntry, b) } // UnmarshalBinary unmarshals the recordEntry from a [binary.Decoder]. func (v *recordEntry) UnmarshalBinaryV2(dec *binary.Decoder) error { - return wrecordEntry.UnmarshalBinary(dec, &v) + return wrecordEntry.UnmarshalBinary(dec, v) +} + +type recordLocation struct { + Block *blockID + Offset int64 + HeaderLen int64 + RecordLen int64 +} + +var wrecordLocation = widget.ForComposite(widget.Fields[recordLocation]{ + {Name: "block", ID: 1, Widget: widget.ForCompositePtr(wblockID.Fields, func(v *recordLocation) **blockID { return &v.Block })}, + {Name: "offset", ID: 2, Widget: widget.ForInt(func(v *recordLocation) *int64 { return &v.Offset })}, + {Name: "headerLen", ID: 3, Widget: widget.ForInt(func(v *recordLocation) *int64 { return &v.HeaderLen })}, + {Name: "recordLen", ID: 4, Widget: widget.ForInt(func(v *recordLocation) *int64 { return &v.RecordLen })}, +}, widget.Identity[*recordLocation]) + +// Copy returns a copy of the recordLocation. +func (v *recordLocation) Copy() *recordLocation { + var u = new(recordLocation) + wrecordLocation.CopyTo(u, v) + return u +} + +// EqualrecordLocation returns true if V is equal to U. +func (v *recordLocation) Equal(u *recordLocation) bool { + return wrecordLocation.Equal(v, u) } type startBlockEntry struct { - ID uint64 - Parent uint64 + blockID } -var wstartBlockEntry = widget.ForCompositePtr(widget.Fields[startBlockEntry]{ +var wstartBlockEntry = widget.ForComposite(widget.Fields[startBlockEntry]{ {Name: "type", ID: 1, Widget: widget.ForTag[*entryType]("type", (*startBlockEntry).Type)}, - {Name: "iD", ID: 2, Widget: widget.ForUint(func(v *startBlockEntry) *uint64 { return &v.ID })}, - {Name: "parent", ID: 3, Widget: widget.ForUint(func(v *startBlockEntry) *uint64 { return &v.Parent })}, -}, widget.Identity[**startBlockEntry]) + {ID: 2, Widget: widget.ForComposite(wblockID.Fields, func(v *startBlockEntry) *blockID { return &v.blockID })}, +}, widget.Identity[*startBlockEntry]) func (startBlockEntry) Type() entryType { return entryTypeStartBlock } // Copy returns a copy of the startBlockEntry. func (v *startBlockEntry) Copy() *startBlockEntry { - var u *startBlockEntry - wstartBlockEntry.CopyTo(&u, &v) + var u = new(startBlockEntry) + wstartBlockEntry.CopyTo(u, v) return u } // EqualstartBlockEntry returns true if V is equal to U. func (v *startBlockEntry) Equal(u *startBlockEntry) bool { - return wstartBlockEntry.Equal(&v, &u) -} - -// MarshalBinary marshals the startBlockEntry to JSON. -func (v *startBlockEntry) MarshalJSON() ([]byte, error) { - return widget.MarshalJSON(&v, wstartBlockEntry) -} - -// UnmarshalJSON unmarshals the startBlockEntry from JSON. -func (v *startBlockEntry) UnmarshalJSON(b []byte) error { - return widget.UnmarshalJSON(&v, wstartBlockEntry, b) + return wstartBlockEntry.Equal(v, u) } // MarshalBinary marshals the startBlockEntry to bytes using [binary]. func (v *startBlockEntry) MarshalBinary() ([]byte, error) { - return widget.MarshalBinary(&v, wstartBlockEntry) + return widget.MarshalBinary(v, wstartBlockEntry) } // MarshalBinary marshals the startBlockEntry to a [binary.Encoder]. func (v *startBlockEntry) MarshalBinaryV2(enc *binary.Encoder) error { - return wstartBlockEntry.MarshalBinary(enc, &v) + return wstartBlockEntry.MarshalBinary(enc, v) } // UnmarshalBinary unmarshals the startBlockEntry from bytes using [binary]. func (v *startBlockEntry) UnmarshalBinary(b []byte) error { - return widget.UnmarshalBinary(&v, wstartBlockEntry, b) + return widget.UnmarshalBinary(v, wstartBlockEntry, b) } // UnmarshalBinary unmarshals the startBlockEntry from a [binary.Decoder]. func (v *startBlockEntry) UnmarshalBinaryV2(dec *binary.Decoder) error { - return wstartBlockEntry.UnmarshalBinary(dec, &v) + return wstartBlockEntry.UnmarshalBinary(dec, v) } diff --git a/pkg/database/keyvalue/block/utils.go b/pkg/database/keyvalue/block/utils.go new file mode 100644 index 000000000..4e347cec2 --- /dev/null +++ b/pkg/database/keyvalue/block/utils.go @@ -0,0 +1,31 @@ +// Copyright 2024 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package block + +import ( + "bytes" + "io" + "sort" +) + +func closeIfError(err *error, closer io.Closer) { + if *err != nil { + _ = closer.Close() + } +} + +func searchIndex(index []byte, start, end int64, hash [32]byte) (int64, bool) { + i := start + int64(sort.Search(int(end-start), func(i int) bool { + offset := (start + int64(i)) * indexFileEntrySize + return bytes.Compare(hash[:], index[offset:offset+32]) <= 0 + })) + offset := i * indexFileEntrySize + if i >= end { + return i, false + } + return i, [32]byte(index[offset:offset+32]) == hash +} diff --git a/pkg/database/keyvalue/block/vmap.go b/pkg/database/keyvalue/block/vmap.go index b487286af..4047aba50 100644 --- a/pkg/database/keyvalue/block/vmap.go +++ b/pkg/database/keyvalue/block/vmap.go @@ -15,6 +15,87 @@ type vmap[K comparable, V any] struct { mu sync.Mutex stack []map[K]V refs []int + fn struct { + forEach func(func(K, V) error) error + commit func(map[K]V) error + } +} + +func (v *vmap[K, V]) View() *vmapView[K, V] { + v.mu.Lock() + l := len(v.stack) - 1 + if l < 0 { + l = 0 + v.stack = append(v.stack, map[K]V{}) + v.refs = append(v.refs, 0) + } + v.refs[l]++ + v.mu.Unlock() + + u := new(vmapView[K, V]) + u.vm = v + u.level = l + u.mine = map[K]V{} + return u +} + +func (v *vmap[K, V]) commit() error { + if v.fn.commit == nil { + return nil + } + + v.mu.Lock() + defer v.mu.Unlock() + + // TODO: I think it's ok to commit to disk as long as the ref count on the + // base layer is zero, regardless of whether there are higher layers + if len(v.stack) != 1 || v.refs[0] > 0 { + return nil + } + + values := v.stack[0] + v.stack[0] = nil + v.stack = v.stack[:0] + v.refs = v.refs[:0] + return v.fn.commit(values) +} + +func (v *vmap[K, V]) get(level int, k K) (V, bool) { + for i := level; i >= 0; i-- { + if v, ok := v.stack[i][k]; ok { + return v, true + } + } + + var z V + return z, false +} + +func (v *vmap[K, V]) forEach(level int, seen map[K]bool, fn func(K, V) error) error { + for i := level; i >= 0; i-- { + for k, v := range v.stack[i] { + if seen[k] { + continue + } + seen[k] = true + err := fn(k, v) + if err != nil { + return err + } + } + } + + if v.fn.forEach != nil { + return v.fn.forEach(func(k K, v V) error { + if seen[k] { + return nil + } + seen[k] = true + return fn(k, v) + }) + } + + return nil } func (v *vmap[K, V]) release(level int, values map[K]V) { @@ -64,24 +145,6 @@ func (v *vmap[K, V]) release(level int, values map[K]V) { v.refs = v.refs[:i] } -func (v *vmap[K, V]) View() *vmapView[K, V] { - v.mu.Lock() - l := len(v.stack) - 1 - if l < 0 { - l = 0 - v.stack = append(v.stack, map[K]V{}) - v.refs = append(v.refs, 0) - } - v.refs[l]++ - v.mu.Unlock() - - u := new(vmapView[K, V]) - u.vm = v - u.level = l - u.mine = map[K]V{} - return u -} - // vmapView is a view into a specific version of a vmap. type vmapView[K comparable, V any] struct { vm *vmap[K, V] @@ -94,13 +157,7 @@ func (v *vmapView[K, V]) Get(k K) (V, bool) { if v, ok := v.mine[k]; ok { return v, true } - for i := v.level; i >= 0; i-- { - if v, ok := v.vm.stack[i][k]; ok { - return v, true - } - } - var z V - return z, false + return v.vm.get(v.level, k) } func (v *vmapView[K, V]) ForEach(fn func(K, V) error) error { @@ -113,19 +170,7 @@ func (v *vmapView[K, V]) ForEach(fn func(K, V) error) error { } } - for i := v.level; i >= 0; i-- { - for k, v := range v.vm.stack[i] { - if seen[k] { - continue - } - seen[k] = true - err := fn(k, v) - if err != nil { - return err - } - } - } - return nil + return v.vm.forEach(v.level, seen, fn) } func (v *vmapView[K, V]) Put(k K, u V) { @@ -138,5 +183,5 @@ func (v *vmapView[K, V]) Discard() { func (v *vmapView[K, V]) Commit() error { v.done.Do(func() { v.vm.release(v.level, v.mine) }) - return nil + return v.vm.commit() } diff --git a/pkg/database/keyvalue/kvtest/test.go b/pkg/database/keyvalue/kvtest/test.go index b1e4dc519..b778904ff 100644 --- a/pkg/database/keyvalue/kvtest/test.go +++ b/pkg/database/keyvalue/kvtest/test.go @@ -39,6 +39,7 @@ func (c *closableDb) Close() { } func openDb(t testing.TB, open Opener) *closableDb { + t.Helper() db, err := open() require.NoError(t, err) c := &closableDb{db, t, false} @@ -47,65 +48,79 @@ func openDb(t testing.TB, open Opener) *closableDb { } func TestDatabase(t *testing.T, open Opener) { - const N = 10000 + const N, M = 100, 100 // Open and write changes db := openDb(t, open) - batch := db.Begin(nil, true) - defer batch.Discard() - // Read when nothing exists - _, err := batch.Get(record.NewKey("answer", 0)) - require.Error(t, err) - require.ErrorAs(t, err, new(*database.NotFoundError)) + doBatch(t, db, func(batch keyvalue.ChangeSet) { + _, err := batch.Get(record.NewKey("answer", 0)) + require.Error(t, err) + require.ErrorAs(t, err, new(*database.NotFoundError)) + }) // Write values := map[record.KeyHash]string{} - for i := 0; i < N; i++ { - key := record.NewKey("answer", i) - value := fmt.Sprintf("%x this much data ", i) - values[key.Hash()] = value - err := batch.Put(key, []byte(value)) - require.NoError(t, err, "Put") + for i := range N { + if i > 0 { + doBatch(t, db, func(batch keyvalue.ChangeSet) { + _, err := batch.Get(record.NewKey("answer", i-1, 0)) + require.NoError(t, err) + }) + } + doBatch(t, db, func(batch keyvalue.ChangeSet) { + for j := range M { + key := record.NewKey("answer", i, j) + value := fmt.Sprintf("%x/%x this much data ", i, j) + values[key.Hash()] = value + err := batch.Put(key, []byte(value)) + require.NoError(t, err, "Put") + } + }) + if i > 0 { + doBatch(t, db, func(batch keyvalue.ChangeSet) { + _, err := batch.Get(record.NewKey("answer", i-1, 0)) + require.NoError(t, err) + }) + } } - // Commit - require.NoError(t, batch.Commit()) - // Verify with a new batch - batch = db.Begin(nil, false) - defer batch.Discard() - - for i := 0; i < N; i++ { - val, err := batch.Get(record.NewKey("answer", i)) - require.NoError(t, err, "Get") - require.Equal(t, fmt.Sprintf("%x this much data ", i), string(val)) - } - - batch.Discard() + doBatch(t, db, func(batch keyvalue.ChangeSet) { + for i := range N { + for j := range M { + val, err := batch.Get(record.NewKey("answer", i, j)) + require.NoError(t, err, "Get") + require.Equal(t, fmt.Sprintf("%x/%x this much data ", i, j), string(val)) + } + } + }) // Verify with a fresh instance db.Close() db = openDb(t, open) - batch = db.Begin(nil, false) - defer batch.Discard() - - for i := 0; i < N; i++ { - val, err := batch.Get(record.NewKey("answer", i)) - require.NoError(t, err, "Get") - require.Equal(t, fmt.Sprintf("%x this much data ", i), string(val)) - } + doBatch(t, db, func(batch keyvalue.ChangeSet) { + for i := range N { + for j := range M { + val, err := batch.Get(record.NewKey("answer", i, j)) + require.NoError(t, err, "Get") + require.Equal(t, fmt.Sprintf("%x/%x this much data ", i, j), string(val)) + } + } + }) // Verify ForEach - require.NoError(t, batch.ForEach(func(key *record.Key, value []byte) error { - expect, ok := values[key.Hash()] - require.Truef(t, ok, "%v should exist", key) - require.Equalf(t, expect, string(value), "%v should match", key) - delete(values, key.Hash()) - return nil - })) + doBatch(t, db, func(batch keyvalue.ChangeSet) { + require.NoError(t, batch.ForEach(func(key *record.Key, value []byte) error { + expect, ok := values[key.Hash()] + require.Truef(t, ok, "%v should exist", key) + require.Equalf(t, expect, string(value), "%v should match", key) + delete(values, key.Hash()) + return nil + })) + }) require.Empty(t, values, "All values should be iterated over") } @@ -226,3 +241,11 @@ func TestDelete(t *testing.T, open Opener) { _, err = batch.Get(record.NewKey("foo")) require.ErrorIs(t, err, errors.NotFound) } + +func doBatch(t testing.TB, db keyvalue.Beginner, fn func(batch keyvalue.ChangeSet)) { + t.Helper() + batch := db.Begin(nil, true) + defer batch.Discard() + fn(batch) + require.NoError(t, batch.Commit()) +} diff --git a/pkg/database/keyvalue/leveldb/database.go b/pkg/database/keyvalue/leveldb/database.go index ac1b1785e..ae3a6736c 100644 --- a/pkg/database/keyvalue/leveldb/database.go +++ b/pkg/database/keyvalue/leveldb/database.go @@ -29,7 +29,7 @@ type Database struct { type opts struct { } -type Option func(*opts) error +type Option func(*opts) func Open(filepath string, o ...Option) (*Database, error) { // Make sure all directories exist @@ -43,17 +43,18 @@ func Open(filepath string, o ...Option) (*Database, error) { return nil, errors.UnknownError.WithFormat("open %q: %w", filepath, err) } + return New(db, o...), nil +} + +func New(db *leveldb.DB, o ...Option) *Database { d := new(Database) d.leveldb = db d.open = new(sync.WaitGroup) for _, o := range o { - err = o(&d.opts) - if err != nil { - return nil, errors.UnknownError.Wrap(err) - } + o(&d.opts) } - return d, nil + return d } func (d *Database) key(key *record.Key) []byte { diff --git a/pkg/types/record/key.go b/pkg/types/record/key.go index cbc1465db..fe58c3202 100644 --- a/pkg/types/record/key.go +++ b/pkg/types/record/key.go @@ -218,12 +218,57 @@ func (k *Key) UnmarshalBinary(b []byte) error { return k.UnmarshalBinaryFrom(bytes.NewBuffer(b)) } +func (k *Key) MarshalBinaryV2(enc *binary2.Encoder) error { + err := enc.StartObject() + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + + enc.WithOptions(binary2.IgnoreFieldOrder()) + + // Write the key length (not prefixed with a field number) + err = enc.NoField() + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + err = enc.EncodeUint(uint64(len(k.values))) + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + + // Write each field using the encoding writer, but prefix values with their + // type code instead of with a field number. This is an abuse but 🤷 it + // works. + for _, v := range k.values { + p, err := asKeyPart(v) + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + err = enc.Field(uint(p.Type())) + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + err = p.WriteBinary2(enc) + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + } + + err = enc.EndObject() + if err != nil { + return errors.UnknownError.WithFormat("encode Key: %w", err) + } + return nil +} + func (k *Key) UnmarshalBinaryV2(dec *binary2.Decoder) error { err := dec.StartObject() if err != nil { - return err + return errors.UnknownError.WithFormat("decode Key: %w", err) } + dec.WithOptions(binary2.IgnoreFieldOrder()) + // Read the key length (not prefixed with a field number) err = dec.NoField() if err != nil { @@ -241,14 +286,9 @@ func (k *Key) UnmarshalBinaryV2(dec *binary2.Decoder) error { // reader expects values to be prefixed with field numbers, and has certain // requirements for those field numbers, so this approach requires a certain // amount of hackiness. This is an abuse but 🤷 it works. - for i := range k.values { // Read the type code - err = dec.NoField() - if err != nil { - return errors.UnknownError.WithFormat("decode Key: %w", err) - } - v, err := dec.DecodeUint() + v, err := dec.Field() if err != nil { return errors.UnknownError.WithFormat("decode Key: %w", err) } @@ -260,10 +300,6 @@ func (k *Key) UnmarshalBinaryV2(dec *binary2.Decoder) error { } // Read the value using the encoding reader - err = dec.NoField() - if err != nil { - return errors.UnknownError.WithFormat("decode Key: %w", err) - } err = p.ReadBinary2(dec) if err != nil { return errors.UnknownError.WithFormat("decode Key: %w", err) diff --git a/pkg/types/record/key_part.go b/pkg/types/record/key_part.go index 1960d8bed..c438d25cc 100644 --- a/pkg/types/record/key_part.go +++ b/pkg/types/record/key_part.go @@ -34,7 +34,7 @@ type keyPart interface { // treating the type code as if it was a field number. This is a violation // of the field-number tagged encoding scheme, but the writer doesn't pay // any attention to the field numbers so this still works. - WriteBinary(w *enc.Writer) + WriteBinary(*enc.Writer) // ReadBinary reads the key part from the binary reader. // @@ -45,12 +45,14 @@ type keyPart interface { // number logic. [Key.UnmarshalBinaryFrom] handles reading the type codes, // so here we tell the binary reader we want to read field zero, which // instructs it to skip the field number logic and read the value directly. - ReadBinary(r *enc.Reader) + ReadBinary(*enc.Reader) - ReadBinary2(r *binary2.Decoder) error + WriteBinary2(*binary2.Encoder) error + ReadBinary2(*binary2.Decoder) error } -type dec = binary2.Decoder +type enc2 = binary2.Encoder +type dec2 = binary2.Decoder // These methods are here to keep them close to the explanation above. @@ -72,23 +74,32 @@ func (k *urlKeyPart) ReadBinary(r *enc.Reader) { ReadBinary(&k.URL, r.ReadUrl func (k *txidKeyPart) ReadBinary(r *enc.Reader) { ReadBinary(&k.TxID, r.ReadTxid) } func (k *timeKeyPart) ReadBinary(r *enc.Reader) { ReadBinary(&k.Time, r.ReadTime) } -func (k *intKeyPart) ReadBinary2(r *dec) error { return rdBin2((*int64)(k), r.DecodeInt) } -func (k *uintKeyPart) ReadBinary2(r *dec) error { return rdBin2((*uint64)(k), r.DecodeUint) } -func (k *stringKeyPart) ReadBinary2(r *dec) error { return rdBin2((*string)(k), r.DecodeString) } -func (k *hashKeyPart) ReadBinary2(r *dec) error { return rdBin2((*[32]byte)(k), r.DecodeHash) } -func (k *bytesKeyPart) ReadBinary2(r *dec) error { return rdBin2((*[]byte)(k), r.DecodeBytes) } - -func (k *urlKeyPart) ReadBinary2(r *dec) error { +func (k intKeyPart) WriteBinary2(w *enc2) error { return w.EncodeInt(int64(k)) } +func (k uintKeyPart) WriteBinary2(w *enc2) error { return w.EncodeUint(uint64(k)) } +func (k stringKeyPart) WriteBinary2(w *enc2) error { return w.EncodeString(string(k)) } +func (k hashKeyPart) WriteBinary2(w *enc2) error { return w.EncodeHash(k) } +func (k bytesKeyPart) WriteBinary2(w *enc2) error { return w.EncodeBytes(k) } +func (k urlKeyPart) WriteBinary2(w *enc2) error { return w.EncodeString(k.URL.String()) } +func (k txidKeyPart) WriteBinary2(w *enc2) error { return w.EncodeString(k.TxID.String()) } +func (k timeKeyPart) WriteBinary2(w *enc2) error { return w.EncodeInt(k.Time.UTC().Unix()) } + +func (k *intKeyPart) ReadBinary2(r *dec2) error { return rdBin2((*int64)(k), r.DecodeInt) } +func (k *uintKeyPart) ReadBinary2(r *dec2) error { return rdBin2((*uint64)(k), r.DecodeUint) } +func (k *stringKeyPart) ReadBinary2(r *dec2) error { return rdBin2((*string)(k), r.DecodeString) } +func (k *hashKeyPart) ReadBinary2(r *dec2) error { return rdBin2((*[32]byte)(k), r.DecodeHash) } +func (k *bytesKeyPart) ReadBinary2(r *dec2) error { return rdBin2((*[]byte)(k), r.DecodeBytes) } + +func (k *urlKeyPart) ReadBinary2(r *dec2) error { k.URL = new(url.URL) return r.DecodeValueV2(k.URL) } -func (k *txidKeyPart) ReadBinary2(r *dec) error { +func (k *txidKeyPart) ReadBinary2(r *dec2) error { k.TxID = new(url.TxID) return r.DecodeValueV2(k.TxID) } -func (k *timeKeyPart) ReadBinary2(r *dec) error { +func (k *timeKeyPart) ReadBinary2(r *dec2) error { v, err := r.DecodeInt() k.Time = time.Unix(v, 0).UTC() return err diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go index 62d5fae7b..97ab67756 100644 --- a/tools/cmd/debug/heal_common.go +++ b/tools/cmd/debug/heal_common.go @@ -127,7 +127,7 @@ func (h *healer) heal(args []string) { node, err := p2p.New(p2p.Options{ Network: ni.Network, - BootstrapPeers: accumulate.BootstrapServers, + BootstrapPeers: bootstrap, PeerDatabase: peerDb, EnablePeerTracker: true, diff --git a/tools/cmd/debug/main.go b/tools/cmd/debug/main.go index 08bb03886..aca55f3f7 100644 --- a/tools/cmd/debug/main.go +++ b/tools/cmd/debug/main.go @@ -15,6 +15,8 @@ import ( "time" "github.com/spf13/cobra" + . "gitlab.com/accumulatenetwork/accumulate/internal/util/cmd" + "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" "gitlab.com/accumulatenetwork/accumulate/pkg/errors" ) @@ -31,6 +33,7 @@ var ( only string pprof string healSinceDuration time.Duration + bootstrap = accumulate.BootstrapServers ) var cmd = &cobra.Command{ @@ -38,6 +41,10 @@ var cmd = &cobra.Command{ Short: "Accumulate debug utilities", } +func init() { + cmd.PersistentFlags().Var((*MultiaddrSliceFlag)(&bootstrap), "bootstrap", "Set the bootstrap servers") +} + var currentUser = func() *user.User { u, err := user.Current() if err != nil { diff --git a/tools/cmd/debug/peer_db.go b/tools/cmd/debug/peer_db.go index 99f69ea93..90e5b115b 100644 --- a/tools/cmd/debug/peer_db.go +++ b/tools/cmd/debug/peer_db.go @@ -16,7 +16,6 @@ import ( "time" "github.com/spf13/cobra" - "gitlab.com/accumulatenetwork/accumulate/pkg/accumulate" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p/dial" @@ -61,7 +60,7 @@ func scanPeers(_ *cobra.Command, args []string) { node, err := p2p.New(p2p.Options{ Key: sk, Network: args[1], - BootstrapPeers: accumulate.BootstrapServers, + BootstrapPeers: bootstrap, PeerDatabase: args[0], PeerScanFrequency: -1, }) diff --git a/tools/cmd/debug/sequence.go b/tools/cmd/debug/sequence.go index f2cd02123..4940675e8 100644 --- a/tools/cmd/debug/sequence.go +++ b/tools/cmd/debug/sequence.go @@ -63,7 +63,7 @@ func sequence(cmd *cobra.Command, args []string) { node, err := p2p.New(p2p.Options{ Network: ni.Network, - BootstrapPeers: accumulate.BootstrapServers, + BootstrapPeers: bootstrap, }) check(err) defer func() { _ = node.Close() }()