Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Datastore interfaces replaced with corekv #3298

Draft
wants to merge 11 commits into
base: develop
Choose a base branch
from
6 changes: 4 additions & 2 deletions cli/server_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

"github.com/spf13/cobra"

badgerkv "github.com/sourcenetwork/corekv/badger"

"github.com/sourcenetwork/defradb/config"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/logging"
Expand Down Expand Up @@ -50,10 +51,11 @@ func MakeServerDumpCmd(cfg *config.Config) *cobra.Command {
))
}
log.FeedbackInfo(cmd.Context(), "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path))
rootstore, err = badgerds.NewDatastore(cfg.Datastore.Badger.Path, cfg.Datastore.Badger.Options)
store, err := badgerkv.NewDatastore(cfg.Datastore.Badger.Path, *cfg.Datastore.Badger.Options)
if err != nil {
return errors.Wrap("could not open badger datastore", err)
}
rootstore = store.(ds.RootStore)
} else {
return errors.New("server-side dump is only supported for the Badger datastore")
}
Expand Down
19 changes: 11 additions & 8 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"strings"
"syscall"

badger "github.com/sourcenetwork/badger/v4"
badger "github.com/dgraph-io/badger/v4"
"github.com/sourcenetwork/corekv"
badgerkv "github.com/sourcenetwork/corekv/badger"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/config"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
httpapi "github.com/sourcenetwork/defradb/http"
Expand Down Expand Up @@ -187,25 +188,27 @@ func (di *defraInstance) close(ctx context.Context) {
func start(ctx context.Context, cfg *config.Config) (*defraInstance, error) {
log.FeedbackInfo(ctx, "Starting DefraDB service...")

var rootstore ds.RootStore
var store corekv.Store

var err error
if cfg.Datastore.Store == badgerDatastoreName {
log.FeedbackInfo(ctx, "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path))
rootstore, err = badgerds.NewDatastore(
store, err = badgerkv.NewDatastore(
cfg.Datastore.Badger.Path,
cfg.Datastore.Badger.Options,
*cfg.Datastore.Badger.Options,
)
} else if cfg.Datastore.Store == "memory" {
log.FeedbackInfo(ctx, "Building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
opts := badger.DefaultOptions("").WithInMemory(true)
store, err = badgerkv.NewDatastore("", opts)
}

if err != nil {
return nil, errors.Wrap("failed to open datastore", err)
}

rootstore := store.(datastore.RootStore)

options := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.Datastore.MaxTxnRetries),
Expand Down
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type DB interface {
// Peerstore returns the peerstore where known host information is stored.
//
// It sits within the rootstore returned by [Root].
Peerstore() datastore.DSBatching
Peerstore() datastore.DSReaderWriter

// Close closes the database instance and releases any resources held.
//
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ import (
"github.com/spf13/viper"
"golang.org/x/net/idna"

badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
badgerds "github.com/dgraph-io/badger/v4"

"github.com/sourcenetwork/defradb/logging"
)

Expand Down Expand Up @@ -262,7 +263,7 @@ type MemoryConfig struct {

func defaultDatastoreConfig() *DatastoreConfig {
// create a copy of the default badger options
opts := badgerds.DefaultOptions
opts := badgerds.DefaultOptions("data")
return &DatastoreConfig{
Store: "badger",
Badger: BadgerConfig{
Expand Down
4 changes: 2 additions & 2 deletions core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (base baseCRDT) setPriority(
return ErrEncodingPriority
}

return base.store.Put(ctx, prioK.ToDS(), buf[0:n])
return base.store.Set(ctx, prioK.ToDS().Bytes(), buf[0:n])
}

// get the current priority for given key
func (base baseCRDT) getPriority(ctx context.Context, key core.DataStoreKey) (uint64, error) {
pKey := key.WithPriorityFlag()
pbuf, err := base.store.Get(ctx, pKey.ToDS())
pbuf, err := base.store.Get(ctx, pKey.ToDS().Bytes())
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
return 0, nil
Expand Down
5 changes: 2 additions & 3 deletions core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ import (
"context"
"testing"

ds "github.com/ipfs/go-datastore"

"github.com/sourcenetwork/corekv/memory"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
)

func newDS() datastore.DSReaderWriter {
return datastore.AsDSReaderWriter(ds.NewMapDatastore())
return memory.NewDatastore(context.TODO())
}

func newSeededDS() datastore.DSReaderWriter {
Expand Down
26 changes: 10 additions & 16 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (

dag "github.com/ipfs/boxo/ipld/merkledag"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
ipld "github.com/ipfs/go-ipld-format"
"github.com/ugorji/go/codec"

"github.com/sourcenetwork/corekv"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
dagDelta, isDagDelta := delta.(*CompositeDAGDelta)

if isDagDelta && dagDelta.Status.IsDeleted() {
err := c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.DeletedObjectMarker})
err := c.store.Set(ctx, c.key.ToPrimaryDataStoreKey().ToDS().Bytes(), []byte{base.DeletedObjectMarker})
if err != nil {
return err
}
Expand All @@ -141,7 +141,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
// reflected in `dagDelta.Status` if sourced via P2P. Updates synced via P2P should not undelete
// the local reperesentation of the document.
versionKey := c.key.WithValueFlag().WithFieldId(core.DATASTORE_DOC_VERSION_FIELD_ID)
objectMarker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
objectMarker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS().Bytes())
hasObjectMarker := !errors.Is(err, ds.ErrNotFound)
if err != nil && hasObjectMarker {
return err
Expand All @@ -161,41 +161,35 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
schemaVersionId = c.schemaVersionKey.SchemaVersionId
}

err = c.store.Put(ctx, versionKey.ToDS(), []byte(schemaVersionId))
err = c.store.Set(ctx, versionKey.ToDS().Bytes(), []byte(schemaVersionId))
if err != nil {
return err
}

if !hasObjectMarker {
// ensure object marker exists
return c.store.Put(ctx, c.key.ToPrimaryDataStoreKey().ToDS(), []byte{base.ObjectMarker})
return c.store.Set(ctx, c.key.ToPrimaryDataStoreKey().ToDS().Bytes(), []byte{base.ObjectMarker})
}

return nil
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
q := query.Query{
Prefix: key.ToString(),
}
res, err := c.store.Query(ctx, q)
for e := range res.Next() {
if e.Error != nil {
return err
}
dsKey, err := core.NewDataStoreKey(e.Key)
iter := c.store.Iterator(ctx, corekv.IterOptions{Prefix: key.Bytes()})
for ; iter.Valid(); iter.Next() {
dsKey, err := core.NewDataStoreKey(string(iter.Key()))
if err != nil {
return err
}

if dsKey.InstanceType == core.ValueKey {
err = c.store.Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value)
err = c.store.Set(ctx, dsKey.WithDeletedFlag().ToDS().Bytes(), iter.Value())
if err != nil {
return err
}
}

err = c.store.Delete(ctx, dsKey.ToDS())
err = c.store.Delete(ctx, dsKey.ToDS().Bytes())
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewLWWRegister(
// RETURN STATE
func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
valueK := reg.key.WithValueFlag()
buf, err := reg.store.Get(ctx, valueK.ToDS())
buf, err := reg.store.Get(ctx, valueK.ToDS().Bytes())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
// else if the current value is lexicographically
// greater than the new then ignore
key := reg.key.WithValueFlag()
marker, err := reg.store.Get(ctx, reg.key.ToPrimaryDataStoreKey().ToDS())
marker, err := reg.store.Get(ctx, reg.key.ToPrimaryDataStoreKey().ToDS().Bytes())
if err != nil && !errors.Is(err, ds.ErrNotFound) {
return err
}
Expand All @@ -144,7 +144,7 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
if priority < curPrio {
return nil
} else if priority == curPrio {
curValue, err := reg.store.Get(ctx, key.ToDS())
curValue, err := reg.store.Get(ctx, key.ToDS().Bytes())
if err != nil {
return err
}
Expand All @@ -154,7 +154,7 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
}
}

err = reg.store.Put(ctx, key.ToDS(), val)
err = reg.store.Set(ctx, key.ToDS().Bytes(), val)
if err != nil {
return NewErrFailedToStoreValue(err)
}
Expand Down
8 changes: 1 addition & 7 deletions core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ import (

dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
mh "github.com/multiformats/go-multihash"
"github.com/ugorji/go/codec"

"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
)

func newMockStore() datastore.DSReaderWriter {
return datastore.AsDSReaderWriter(ds.NewMapDatastore())
}

func setupLWWRegister() LWWRegister {
store := newMockStore()
store := newDS()
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
}
Expand Down
16 changes: 0 additions & 16 deletions datastore/badger/v4/compat_logger.go

This file was deleted.

Loading
Loading