From 220d9d54d0476fac94ef8608547a1af0c910967c Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 11:37:38 -0700 Subject: [PATCH 01/13] Add pruning and WAL capability for SS store --- common/utils/path.go | 4 +- config/config.go | 5 ++- ss/pebbledb/db.go | 70 +++++++++++++++++++++++++++++++++-- ss/store.go | 45 +++++++++++++++------- ss/types/store.go | 3 ++ stream/changelog/changelog.go | 28 ++++++++++++++ 6 files changed, 135 insertions(+), 20 deletions(-) diff --git a/common/utils/path.go b/common/utils/path.go index 8d6a8b6..aad7207 100644 --- a/common/utils/path.go +++ b/common/utils/path.go @@ -10,6 +10,6 @@ func GetStateStorePath(homePath string, backend string) string { return filepath.Join(homePath, "data", backend) } -func GetChangelogPath(commitStorePath string) string { - return filepath.Join(commitStorePath, "changelog") +func GetChangelogPath(dbPath string) string { + return filepath.Join(dbPath, "changelog") } diff --git a/config/config.go b/config/config.go index fd4a7ac..c7c48bc 100644 --- a/config/config.go +++ b/config/config.go @@ -56,7 +56,10 @@ type StateStoreConfig struct { // DBDirectory defines the directory to store the state store db files // If not explicitly set, default to application home directory // default to empty - DBDirectory string `mapstructure:"db-directory"` + DBDirectory string `mapstructure:"db-dir"` + + // DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC + DedicatedChangelog bool `mapstructure:"dedicated-changelog"` // Backend defines the backend database used for state-store // Supported backends: pebbledb, rocksdb diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index d802af4..5b9d6c3 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -5,6 +5,9 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/sei-protocol/sei-db/common/logger" + "github.com/sei-protocol/sei-db/common/utils" + "github.com/sei-protocol/sei-db/stream/changelog" "math" "strings" "sync" @@ -48,6 +51,17 @@ type Database struct { // Map of module to when each was last updated // Used in pruning to skip over stores that have not been updated recently storeKeyDirty sync.Map + + // Changelog used to support async write + streamHandler *changelog.Stream + + // Pending changes to be written to the DB + pendingChanges chan VersionedChangesets +} + +type VersionedChangesets struct { + Version int64 + Changesets []*proto.NamedChangeSet } func New(dataDir string, config config.StateStoreConfig) (*Database, error) { @@ -93,12 +107,22 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { if err != nil { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } - - return &Database{ + database := &Database{ storage: db, config: config, earliestVersion: earliestVersion, - }, nil + pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), + } + if config.DedicatedChangelog { + streamHandler, _ := changelog.NewStream( + logger.NewNopLogger(), + utils.GetChangelogPath(dataDir), + changelog.Config{DisableFsync: true, ZeroCopy: true}, + ) + database.streamHandler = streamHandler + go database.writeAsync() + } + return database, nil } func NewWithDB(storage *pebble.DB) *Database { @@ -110,6 +134,11 @@ func NewWithDB(storage *pebble.DB) *Database { func (db *Database) Close() error { err := db.storage.Close() db.storage = nil + if db.streamHandler != nil { + db.streamHandler.Close() + db.streamHandler = nil + close(db.pendingChanges) + } return err } @@ -252,6 +281,41 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro return b.Write() } +func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + // Write to WAL first + if db.streamHandler != nil { + entry := proto.ChangelogEntry{ + Version: version, + } + entry.Changesets = changesets + err := db.streamHandler.WriteNextEntry(entry) + if err != nil { + return err + } + } + // Then write to pending changes + db.pendingChanges <- VersionedChangesets{ + Version: version, + Changesets: changesets, + } + return nil +} + +func (db *Database) writeAsync() { + for db.streamHandler != nil { + for nextChange := range db.pendingChanges { + version := nextChange.Version + for _, cs := range nextChange.Changesets { + err := db.ApplyChangeset(version, cs) + if err != nil { + panic(panic) + } + } + db.SetLatestVersion(version) + } + } +} + // Prune attempts to prune all versions up to and including the current version // Get the range of keys, manually iterate over them and delete them // We add a heuristic to skip over a module's keys during pruning if it hasn't been updated diff --git a/ss/store.go b/ss/store.go index e30fc23..d6f0fd6 100644 --- a/ss/store.go +++ b/ss/store.go @@ -2,6 +2,7 @@ package ss import ( "fmt" + "github.com/sei-protocol/sei-db/ss/pruning" "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/common/utils" @@ -34,20 +35,31 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) { } // NewStateStore Create a new state store with the specified backend type -func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) { +func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) { initializer, ok := backends[BackendType(ssConfig.Backend)] if !ok { return nil, fmt.Errorf("unsupported backend: %s", ssConfig.Backend) } - db, err := initializer(homeDir, ssConfig) + stateStore, err := initializer(homeDir, ssConfig) if err != nil { return nil, err } - return db, nil + // Handle auto recovery for DB running with async mode + if ssConfig.DedicatedChangelog { + changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend)) + err := RecoverStateStore(logger, changelogPath, stateStore) + if err != nil { + return nil, err + } + } + // Start the pruning manager for DB + pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds)) + pruningManager.Start() + return stateStore, nil } // RecoverStateStore will be called during initialization to recover the state from rlog -func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.StateStore) error { +func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error { ssLatestVersion, err := stateStore.GetLatestVersion() if err != nil { return err @@ -55,11 +67,7 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S if ssLatestVersion <= 0 { return nil } - streamHandler, err := changelog.NewStream( - logger, - utils.GetChangelogPath(utils.GetChangelogPath(utils.GetCommitStorePath(homePath))), - changelog.Config{}, - ) + streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{}) if err != nil { return err } @@ -71,16 +79,25 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S if lastOffset <= 0 || errLast != nil { return err } - firstEntry, errRead := streamHandler.ReadAt(firstOffset) + lastEntry, errRead := streamHandler.ReadAt(lastOffset) if errRead != nil { return err } - firstVersion := firstEntry.Version - delta := uint64(firstVersion) - firstOffset - targetStartOffset := uint64(ssLatestVersion) - delta + // Look backward to find where we should start replay from + curVersion := lastEntry.Version + curOffset := lastOffset + for curVersion > ssLatestVersion && curOffset >= firstOffset { + curOffset-- + curEntry, errRead := streamHandler.ReadAt(curOffset) + if errRead != nil { + return err + } + curVersion = curEntry.Version + } + targetStartOffset := curOffset logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset)) if targetStartOffset < lastOffset { - return streamHandler.Replay(targetStartOffset+1, lastOffset, func(index uint64, entry proto.ChangelogEntry) error { + return streamHandler.Replay(targetStartOffset, lastOffset, func(index uint64, entry proto.ChangelogEntry) error { // commit to state store for _, cs := range entry.Changesets { if err := stateStore.ApplyChangeset(entry.Version, cs); err != nil { diff --git a/ss/types/store.go b/ss/types/store.go index a5d1099..171fcfb 100644 --- a/ss/types/store.go +++ b/ss/types/store.go @@ -24,6 +24,9 @@ type StateStore interface { // the version should be latest version plus one. ApplyChangeset(version int64, cs *proto.NamedChangeSet) error + // ApplyChangesetAsync Write changesets into WAL file first and apply later for async writes + ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error + // Import the initial state of the store Import(version int64, ch <-chan SnapshotNode) error diff --git a/stream/changelog/changelog.go b/stream/changelog/changelog.go index f236ca8..8eebc73 100644 --- a/stream/changelog/changelog.go +++ b/stream/changelog/changelog.go @@ -21,6 +21,7 @@ type Stream struct { logger logger.Logger writeChannel chan *Message errSignal chan error + nextOffset uint64 } type Message struct { @@ -32,6 +33,7 @@ type Config struct { DisableFsync bool ZeroCopy bool WriteBufferSize int + KeepLast int } // NewStream creates a new changelog stream that persist the changesets in the log @@ -42,6 +44,15 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error) }) if err != nil { return nil, err + } + firstEntry, err := log.FirstIndex() + if err != nil { + return nil, err + } + if firstEntry <= 0 { + } + if config.KeepLast > 0 { + } return &Stream{ log: log, @@ -75,6 +86,18 @@ func (stream *Stream) Write(offset uint64, entry proto.ChangelogEntry) error { return nil } +// WriteNextEntry will write a new entry to the last index of the log. +// Whether the writes is in blocking or async manner depends on the buffer size. +func (stream *Stream) WriteNextEntry(entry proto.ChangelogEntry) error { + nextOffset := stream.nextOffset + err := stream.Write(nextOffset, entry) + if err != nil { + return err + } + stream.nextOffset++ + return nil +} + // startWriteGoroutine will start a goroutine to write entries to the log. // This should only be called on initialization if async write is enabled func (stream *Stream) startWriteGoroutine() { @@ -172,6 +195,11 @@ func (stream *Stream) Replay(start uint64, end uint64, processFn func(index uint return nil } +// +func (stream *Stream) Pruning() { + +} + func (stream *Stream) Close() error { if stream.writeChannel == nil { return nil From 7eedc76a550328810e38b330ef7018afff951fbb Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 12:35:48 -0700 Subject: [PATCH 02/13] Add WAL pruning logic --- stream/changelog/changelog.go | 43 +++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/stream/changelog/changelog.go b/stream/changelog/changelog.go index 8eebc73..83541cd 100644 --- a/stream/changelog/changelog.go +++ b/stream/changelog/changelog.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "time" errorutils "github.com/sei-protocol/sei-db/common/errors" "github.com/sei-protocol/sei-db/common/logger" @@ -16,12 +17,14 @@ import ( var _ types.Stream[proto.ChangelogEntry] = (*Stream)(nil) type Stream struct { + dir string log *wal.Log config Config logger logger.Logger writeChannel chan *Message errSignal chan error nextOffset uint64 + isClosed bool } type Message struct { @@ -33,7 +36,8 @@ type Config struct { DisableFsync bool ZeroCopy bool WriteBufferSize int - KeepLast int + KeepRecent uint64 + PruneInterval time.Duration } // NewStream creates a new changelog stream that persist the changesets in the log @@ -45,20 +49,22 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error) if err != nil { return nil, err } - firstEntry, err := log.FirstIndex() + stream := &Stream{ + dir: dir, + log: log, + config: config, + logger: logger, + isClosed: false, + } + startIndex, err := log.FirstIndex() if err != nil { return nil, err } - if firstEntry <= 0 { - } - if config.KeepLast > 0 { - + stream.nextOffset = startIndex + 1 + if config.KeepRecent > 0 { + go stream.StartPruning(config.KeepRecent, config.PruneInterval) } - return &Stream{ - log: log, - config: config, - logger: logger, - }, nil + return stream, nil } @@ -195,9 +201,17 @@ func (stream *Stream) Replay(start uint64, end uint64, processFn func(index uint return nil } -// -func (stream *Stream) Pruning() { - +func (stream *Stream) StartPruning(keepRecent uint64, pruneInterval time.Duration) { + for !stream.isClosed { + lastIndex, _ := stream.log.LastIndex() + firstIndex, _ := stream.log.FirstIndex() + if lastIndex > keepRecent && (lastIndex-keepRecent) > firstIndex { + prunePos := lastIndex - keepRecent + err := stream.TruncateBefore(prunePos) + stream.logger.Error(fmt.Sprintf("failed to prune changelog till index %d", prunePos), "err", err) + } + time.Sleep(pruneInterval) + } } func (stream *Stream) Close() error { @@ -209,6 +223,7 @@ func (stream *Stream) Close() error { stream.writeChannel = nil stream.errSignal = nil errClose := stream.log.Close() + stream.isClosed = true return errorutils.Join(err, errClose) } From ddd967b46b1ae23b49d35812faaacf98eaa25c88 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 12:38:08 -0700 Subject: [PATCH 03/13] Fix config --- config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index c7c48bc..4327aad 100644 --- a/config/config.go +++ b/config/config.go @@ -56,7 +56,7 @@ type StateStoreConfig struct { // DBDirectory defines the directory to store the state store db files // If not explicitly set, default to application home directory // default to empty - DBDirectory string `mapstructure:"db-dir"` + DBDirectory string `mapstructure:"db-directory"` // DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC DedicatedChangelog bool `mapstructure:"dedicated-changelog"` From 11fa0229950e13c6f7f1fe8bb0d247b63aee2b31 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 12:53:49 -0700 Subject: [PATCH 04/13] Fix edge case --- ss/pebbledb/db.go | 6 +++--- ss/store.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 5b9d6c3..88b93b7 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -5,9 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/sei-protocol/sei-db/common/logger" - "github.com/sei-protocol/sei-db/common/utils" - "github.com/sei-protocol/sei-db/stream/changelog" "math" "strings" "sync" @@ -15,9 +12,12 @@ import ( "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" errorutils "github.com/sei-protocol/sei-db/common/errors" + "github.com/sei-protocol/sei-db/common/logger" + "github.com/sei-protocol/sei-db/common/utils" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/proto" "github.com/sei-protocol/sei-db/ss/types" + "github.com/sei-protocol/sei-db/stream/changelog" "golang.org/x/exp/slices" ) diff --git a/ss/store.go b/ss/store.go index d6f0fd6..057e864 100644 --- a/ss/store.go +++ b/ss/store.go @@ -2,12 +2,12 @@ package ss import ( "fmt" - "github.com/sei-protocol/sei-db/ss/pruning" "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/common/utils" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/proto" + "github.com/sei-protocol/sei-db/ss/pruning" "github.com/sei-protocol/sei-db/ss/types" "github.com/sei-protocol/sei-db/stream/changelog" ) @@ -86,7 +86,7 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty // Look backward to find where we should start replay from curVersion := lastEntry.Version curOffset := lastOffset - for curVersion > ssLatestVersion && curOffset >= firstOffset { + for curVersion > ssLatestVersion && curOffset > firstOffset { curOffset-- curEntry, errRead := streamHandler.ReadAt(curOffset) if errRead != nil { From 1c4ddd0cf6275f7d14a4332071d29d0851b63e52 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 12:56:19 -0700 Subject: [PATCH 05/13] Add comment --- ss/store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ss/store.go b/ss/store.go index 057e864..671e428 100644 --- a/ss/store.go +++ b/ss/store.go @@ -94,6 +94,7 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty } curVersion = curEntry.Version } + // Replay from the offset where the offset where the version is larger than SS store latest version targetStartOffset := curOffset logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset)) if targetStartOffset < lastOffset { From 7e3f2f5ac845f112d26b01130e6c6af109a74e8d Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 13:05:59 -0700 Subject: [PATCH 06/13] Fix error handling --- ss/pebbledb/db.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 88b93b7..af4ddf0 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -306,10 +306,8 @@ func (db *Database) writeAsync() { for nextChange := range db.pendingChanges { version := nextChange.Version for _, cs := range nextChange.Changesets { - err := db.ApplyChangeset(version, cs) - if err != nil { - panic(panic) - } + db.ApplyChangeset(version, cs) + } db.SetLatestVersion(version) } From 40cd25b88a1e73b7c484548daae624d8db746283 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 21 Jun 2024 13:26:23 -0700 Subject: [PATCH 07/13] Add comment --- ss/pebbledb/db.go | 8 +++++++- stream/changelog/changelog.go | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index af4ddf0..877f43b 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -8,6 +8,7 @@ import ( "math" "strings" "sync" + "time" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" @@ -117,7 +118,12 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { streamHandler, _ := changelog.NewStream( logger.NewNopLogger(), utils.GetChangelogPath(dataDir), - changelog.Config{DisableFsync: true, ZeroCopy: true}, + changelog.Config{ + DisableFsync: true, + ZeroCopy: true, + KeepRecent: uint64(config.KeepRecent), + PruneInterval: 300 * time.Second, + }, ) database.streamHandler = streamHandler go database.writeAsync() diff --git a/stream/changelog/changelog.go b/stream/changelog/changelog.go index 83541cd..176a764 100644 --- a/stream/changelog/changelog.go +++ b/stream/changelog/changelog.go @@ -56,11 +56,13 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error) logger: logger, isClosed: false, } + // Finding the nextOffset to write startIndex, err := log.FirstIndex() if err != nil { return nil, err } stream.nextOffset = startIndex + 1 + // Start the auto pruning goroutine if config.KeepRecent > 0 { go stream.StartPruning(config.KeepRecent, config.PruneInterval) } From 013b484a1edd9dcd83fb3600aa825a9e24df1b04 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 25 Jun 2024 10:20:18 -0700 Subject: [PATCH 08/13] Add wait group to fix race condition --- ss/pebbledb/db.go | 30 ++++++++++++------ ss/store_test.go | 58 +++++++++++++++++++++++++++++++++++ stream/changelog/changelog.go | 4 +-- 3 files changed, 80 insertions(+), 12 deletions(-) create mode 100644 ss/store_test.go diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 877f43b..c514146 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -44,8 +44,9 @@ var ( ) type Database struct { - storage *pebble.DB - config config.StateStoreConfig + storage *pebble.DB + asyncWriteWG sync.WaitGroup + config config.StateStoreConfig // Earliest version for db after pruning earliestVersion int64 @@ -110,6 +111,7 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { } database := &Database{ storage: db, + asyncWriteWG: sync.WaitGroup{}, config: config, earliestVersion: earliestVersion, pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), @@ -126,7 +128,7 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { }, ) database.streamHandler = streamHandler - go database.writeAsync() + go database.writeAsyncInBackground() } return database, nil } @@ -138,13 +140,15 @@ func NewWithDB(storage *pebble.DB) *Database { } func (db *Database) Close() error { - err := db.storage.Close() - db.storage = nil if db.streamHandler != nil { db.streamHandler.Close() db.streamHandler = nil close(db.pendingChanges) } + // Wait for the async writes to finish + db.asyncWriteWG.Wait() + err := db.storage.Close() + db.storage = nil return err } @@ -294,6 +298,7 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named Version: version, } entry.Changesets = changesets + entry.Upgrades = nil err := db.streamHandler.WriteNextEntry(entry) if err != nil { return err @@ -307,17 +312,22 @@ func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.Named return nil } -func (db *Database) writeAsync() { - for db.streamHandler != nil { - for nextChange := range db.pendingChanges { +func (db *Database) writeAsyncInBackground() { + db.asyncWriteWG.Add(1) + defer db.asyncWriteWG.Done() + for nextChange := range db.pendingChanges { + if db.streamHandler != nil { version := nextChange.Version for _, cs := range nextChange.Changesets { - db.ApplyChangeset(version, cs) - + err := db.ApplyChangeset(version, cs) + if err != nil { + panic(err) + } } db.SetLatestVersion(version) } } + } // Prune attempts to prune all versions up to and including the current version diff --git a/ss/store_test.go b/ss/store_test.go new file mode 100644 index 0000000..861a4a1 --- /dev/null +++ b/ss/store_test.go @@ -0,0 +1,58 @@ +package ss + +import ( + "fmt" + "os" + "testing" + + "github.com/cosmos/iavl" + "github.com/sei-protocol/sei-db/common/logger" + "github.com/sei-protocol/sei-db/config" + "github.com/sei-protocol/sei-db/proto" + "github.com/stretchr/testify/require" +) + +func TestNewStateStore(t *testing.T) { + tempDir := os.TempDir() + ssConfig := config.StateStoreConfig{ + DedicatedChangelog: true, + Backend: string(PebbleDBBackend), + AsyncWriteBuffer: 10, + KeepRecent: 100, + } + stateStore, err := NewStateStore(logger.NewNopLogger(), tempDir, ssConfig) + require.NoError(t, err) + for i := 1; i < 10; i++ { + var changesets []*proto.NamedChangeSet + kvPair := &iavl.KVPair{ + Delete: false, + Key: []byte(fmt.Sprintf("key%d", i)), + Value: []byte(fmt.Sprintf("value%d", i)), + } + var pairs []*iavl.KVPair + pairs = append(pairs, kvPair) + cs := iavl.ChangeSet{Pairs: pairs} + ncs := &proto.NamedChangeSet{ + Name: "storeA", + Changeset: cs, + } + changesets = append(changesets, ncs) + err := stateStore.ApplyChangesetAsync(int64(i), changesets) + require.NoError(t, err) + } + // Closing the state store without waiting for data to be fully flushed + err = stateStore.Close() + require.NoError(t, err) + + // Reopen a new state store + stateStore, err = NewStateStore(logger.NewNopLogger(), tempDir, ssConfig) + require.NoError(t, err) + + // Make sure key and values can be found + for i := 1; i < 10; i++ { + value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i))) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("value%d", i), string(value)) + } + +} diff --git a/stream/changelog/changelog.go b/stream/changelog/changelog.go index 176a764..1b53636 100644 --- a/stream/changelog/changelog.go +++ b/stream/changelog/changelog.go @@ -57,11 +57,11 @@ func NewStream(logger logger.Logger, dir string, config Config) (*Stream, error) isClosed: false, } // Finding the nextOffset to write - startIndex, err := log.FirstIndex() + lastIndex, err := log.LastIndex() if err != nil { return nil, err } - stream.nextOffset = startIndex + 1 + stream.nextOffset = lastIndex + 1 // Start the auto pruning goroutine if config.KeepRecent > 0 { go stream.StartPruning(config.KeepRecent, config.PruneInterval) From a3be48c663c0c9501ca7b5d0dada61000497d07e Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 25 Jun 2024 10:27:36 -0700 Subject: [PATCH 09/13] Fix lint --- tools/cmd/seidb/benchmark/iteration.go | 3 ++- tools/cmd/seidb/benchmark/random_read.go | 3 ++- tools/cmd/seidb/benchmark/reverse_iteration.go | 3 ++- tools/cmd/seidb/benchmark/write.go | 3 ++- tools/cmd/seidb/operations/dump_db.go | 3 ++- tools/cmd/seidb/operations/prune.go | 3 ++- 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/tools/cmd/seidb/benchmark/iteration.go b/tools/cmd/seidb/benchmark/iteration.go index f2e24bd..fe51393 100644 --- a/tools/cmd/seidb/benchmark/iteration.go +++ b/tools/cmd/seidb/benchmark/iteration.go @@ -3,6 +3,7 @@ package benchmark import ( "fmt" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/tools/dbbackend" @@ -62,7 +63,7 @@ func DBIteration(inputKVDir string, numVersions int, outputDir string, dbBackend fmt.Printf("Iterating Over DB at %s\n", outputDir) ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(outputDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), outputDir, ssConfig) if err != nil { panic(err) } diff --git a/tools/cmd/seidb/benchmark/random_read.go b/tools/cmd/seidb/benchmark/random_read.go index 94a771c..e373f9a 100644 --- a/tools/cmd/seidb/benchmark/random_read.go +++ b/tools/cmd/seidb/benchmark/random_read.go @@ -5,6 +5,7 @@ import ( "io/fs" "os" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/tools/dbbackend" @@ -67,7 +68,7 @@ func DBRandomRead(inputKVDir string, numVersions int, outputDir string, dbBacken fmt.Printf("Reading Raw Keys and Values from %s\n", inputKVDir) ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(outputDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), outputDir, ssConfig) if err != nil { panic(err) } diff --git a/tools/cmd/seidb/benchmark/reverse_iteration.go b/tools/cmd/seidb/benchmark/reverse_iteration.go index e1ac725..863fcfd 100644 --- a/tools/cmd/seidb/benchmark/reverse_iteration.go +++ b/tools/cmd/seidb/benchmark/reverse_iteration.go @@ -3,6 +3,7 @@ package benchmark import ( "fmt" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/tools/dbbackend" @@ -62,7 +63,7 @@ func DBReverseIteration(inputKVDir string, numVersions int, outputDir string, db fmt.Printf("Iterating Over DB at %s\n", outputDir) ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(outputDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), outputDir, ssConfig) if err != nil { panic(err) } diff --git a/tools/cmd/seidb/benchmark/write.go b/tools/cmd/seidb/benchmark/write.go index 2d2c011..6f3d495 100644 --- a/tools/cmd/seidb/benchmark/write.go +++ b/tools/cmd/seidb/benchmark/write.go @@ -5,6 +5,7 @@ import ( "io/fs" "os" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/tools/dbbackend" @@ -67,7 +68,7 @@ func DBWrite(inputKVDir string, numVersions int, outputDir string, dbBackend str fmt.Printf("Reading Raw Keys and Values from %s\n", inputKVDir) ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(outputDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), outputDir, ssConfig) if err != nil { panic(err) } diff --git a/tools/cmd/seidb/operations/dump_db.go b/tools/cmd/seidb/operations/dump_db.go index f218e71..8b4728f 100644 --- a/tools/cmd/seidb/operations/dump_db.go +++ b/tools/cmd/seidb/operations/dump_db.go @@ -5,6 +5,7 @@ import ( "io/fs" "os" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" "github.com/sei-protocol/sei-db/tools/cmd/seidb/benchmark" @@ -74,7 +75,7 @@ func DumpDbData(dbBackend string, module string, outputDir string, dbDir string) // TODO: Defer Close Db ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(outputDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), outputDir, ssConfig) if err != nil { panic(err) } diff --git a/tools/cmd/seidb/operations/prune.go b/tools/cmd/seidb/operations/prune.go index 52580c8..5fd95ea 100644 --- a/tools/cmd/seidb/operations/prune.go +++ b/tools/cmd/seidb/operations/prune.go @@ -2,6 +2,7 @@ package operations import ( "fmt" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" "github.com/sei-protocol/sei-db/ss" @@ -53,7 +54,7 @@ func PruneDB(dbBackend string, dbDir string, version int64) { // TODO: Defer Close Db ssConfig := config.DefaultStateStoreConfig() ssConfig.Backend = dbBackend - backend, err := ss.NewStateStore(dbDir, ssConfig) + backend, err := ss.NewStateStore(logger.NewNopLogger(), dbDir, ssConfig) if err != nil { panic(err) } From 9959509c36edc17e58b8963a4b67870078b6ead1 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 25 Jun 2024 10:30:36 -0700 Subject: [PATCH 10/13] Fix lint --- ss/pebbledb/db.go | 5 ++++- tools/cmd/seidb/operations/prune.go | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index c514146..374f3b2 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -324,7 +324,10 @@ func (db *Database) writeAsyncInBackground() { panic(err) } } - db.SetLatestVersion(version) + err := db.SetLatestVersion(version) + if err != nil { + panic(err) + } } } diff --git a/tools/cmd/seidb/operations/prune.go b/tools/cmd/seidb/operations/prune.go index 5fd95ea..43cc795 100644 --- a/tools/cmd/seidb/operations/prune.go +++ b/tools/cmd/seidb/operations/prune.go @@ -2,6 +2,7 @@ package operations import ( "fmt" + "github.com/sei-protocol/sei-db/common/logger" "github.com/sei-protocol/sei-db/config" From 8dc4818c6dceee5fa4764457d3849500ce409a82 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 25 Jun 2024 15:20:15 -0700 Subject: [PATCH 11/13] Add debug log --- ss/pebbledb/db.go | 6 ++++++ ss/pebbledb_init.go | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 374f3b2..3e3a03b 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -279,6 +279,9 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro return err } } else { + if cs.Name == "receipt" { + fmt.Printf("[Debug] PebbleDB setting key %X, value %X\n", kvPair.Key, kvPair.Value) + } if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { return err } @@ -317,9 +320,11 @@ func (db *Database) writeAsyncInBackground() { defer db.asyncWriteWG.Done() for nextChange := range db.pendingChanges { if db.streamHandler != nil { + fmt.Printf("[Debug] Found new changes in pending writes: %d\n", nextChange.Version) version := nextChange.Version for _, cs := range nextChange.Changesets { err := db.ApplyChangeset(version, cs) + fmt.Printf("[Debug] Applied new changes: %v\n", nextChange) if err != nil { panic(err) } @@ -328,6 +333,7 @@ func (db *Database) writeAsyncInBackground() { if err != nil { panic(err) } + } } diff --git a/ss/pebbledb_init.go b/ss/pebbledb_init.go index 8202fd1..4bc7391 100644 --- a/ss/pebbledb_init.go +++ b/ss/pebbledb_init.go @@ -9,11 +9,11 @@ import ( func init() { initializer := func(dir string, configs config.StateStoreConfig) (types.StateStore, error) { - dbHome := dir + dbHome := utils.GetStateStorePath(dir, configs.Backend) if configs.DBDirectory != "" { dbHome = configs.DBDirectory } - return pebbledb.New(utils.GetStateStorePath(dbHome, configs.Backend), configs) + return pebbledb.New(dbHome, configs) } RegisterBackend(PebbleDBBackend, initializer) } From 7b5d29dd8848dd539dec225864b16b165781d175 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 25 Jun 2024 17:01:39 -0700 Subject: [PATCH 12/13] Remove debug log --- ss/pebbledb/db.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ss/pebbledb/db.go b/ss/pebbledb/db.go index 3e3a03b..374f3b2 100644 --- a/ss/pebbledb/db.go +++ b/ss/pebbledb/db.go @@ -279,9 +279,6 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro return err } } else { - if cs.Name == "receipt" { - fmt.Printf("[Debug] PebbleDB setting key %X, value %X\n", kvPair.Key, kvPair.Value) - } if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { return err } @@ -320,11 +317,9 @@ func (db *Database) writeAsyncInBackground() { defer db.asyncWriteWG.Done() for nextChange := range db.pendingChanges { if db.streamHandler != nil { - fmt.Printf("[Debug] Found new changes in pending writes: %d\n", nextChange.Version) version := nextChange.Version for _, cs := range nextChange.Changesets { err := db.ApplyChangeset(version, cs) - fmt.Printf("[Debug] Applied new changes: %v\n", nextChange) if err != nil { panic(err) } @@ -333,7 +328,6 @@ func (db *Database) writeAsyncInBackground() { if err != nil { panic(err) } - } } From aaf6f86aa64b4b895770f94328f0437b7d2d48c5 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Wed, 26 Jun 2024 07:11:28 -0700 Subject: [PATCH 13/13] Fix rocksdb and sqlite --- ss/rocksdb/db.go | 4 ++++ ss/sqlite/db.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ss/rocksdb/db.go b/ss/rocksdb/db.go index dccd9da..e1875af 100644 --- a/ss/rocksdb/db.go +++ b/ss/rocksdb/db.go @@ -179,6 +179,10 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro return b.Write() } +func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return fmt.Errorf("not implemented") +} + // Prune attempts to prune all versions up to and including the provided version. // This is done internally by updating the full_history_ts_low RocksDB value on // the column families, s.t. all versions less than full_history_ts_low will be diff --git a/ss/sqlite/db.go b/ss/sqlite/db.go index e2b27cb..f2b602c 100644 --- a/ss/sqlite/db.go +++ b/ss/sqlite/db.go @@ -210,6 +210,10 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro return b.Write() } +func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return fmt.Errorf("not implemented") +} + func (db *Database) Prune(version int64) error { stmt := "DELETE FROM state_storage WHERE version <= ? AND store_key != ?;"