Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pruning and WAL capability for SS store #66

Merged
merged 14 commits into from
Jun 26, 2024
4 changes: 2 additions & 2 deletions common/utils/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func GetStateStorePath(homePath string, backend string) string {
return filepath.Join(homePath, "data", backend)
}

func GetChangelogPath(commitStorePath string) string {
return filepath.Join(commitStorePath, "changelog")
func GetChangelogPath(dbPath string) string {
return filepath.Join(dbPath, "changelog")
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type StateStoreConfig struct {
// default to empty
DBDirectory string `mapstructure:"db-directory"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
// defaults to pebbledb
Expand Down
91 changes: 86 additions & 5 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
"math"
"strings"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
errorutils "github.com/sei-protocol/sei-db/common/errors"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/sei-protocol/sei-db/ss/types"
"github.com/sei-protocol/sei-db/stream/changelog"
"golang.org/x/exp/slices"
)

Expand All @@ -40,14 +44,26 @@
)

type Database struct {
storage *pebble.DB
config config.StateStoreConfig
storage *pebble.DB
asyncWriteWG sync.WaitGroup
config config.StateStoreConfig
// Earliest version for db after pruning
earliestVersion int64

// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
storeKeyDirty sync.Map

// Changelog used to support async write
streamHandler *changelog.Stream

// Pending changes to be written to the DB
pendingChanges chan VersionedChangesets
}

type VersionedChangesets struct {
Version int64
Changesets []*proto.NamedChangeSet
}

func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
Expand Down Expand Up @@ -93,12 +109,28 @@
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

return &Database{
database := &Database{
storage: db,
asyncWriteWG: sync.WaitGroup{},
config: config,
earliestVersion: earliestVersion,
}, nil
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}
if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
Expand All @@ -108,6 +140,13 @@
}

func (db *Database) Close() error {
if db.streamHandler != nil {
db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
// Wait for the async writes to finish
db.asyncWriteWG.Wait()
err := db.storage.Close()
db.storage = nil
return err
Expand Down Expand Up @@ -252,6 +291,48 @@
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
// Write to WAL first
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Version: version,
}
entry.Changesets = changesets
entry.Upgrades = nil
err := db.streamHandler.WriteNextEntry(entry)
if err != nil {
return err
}
}
// Then write to pending changes
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
return nil
}

func (db *Database) writeAsyncInBackground() {
db.asyncWriteWG.Add(1)
defer db.asyncWriteWG.Done()
for nextChange := range db.pendingChanges {
if db.streamHandler != nil {
version := nextChange.Version
for _, cs := range nextChange.Changesets {
err := db.ApplyChangeset(version, cs)
if err != nil {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}

}

// Prune attempts to prune all versions up to and including the current version
// Get the range of keys, manually iterate over them and delete them
// We add a heuristic to skip over a module's keys during pruning if it hasn't been updated
Expand Down
46 changes: 32 additions & 14 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/sei-protocol/sei-db/ss/pruning"
"github.com/sei-protocol/sei-db/ss/types"
"github.com/sei-protocol/sei-db/stream/changelog"
)
Expand Down Expand Up @@ -34,32 +35,39 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) {
}

// NewStateStore Create a new state store with the specified backend type
func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
initializer, ok := backends[BackendType(ssConfig.Backend)]
if !ok {
return nil, fmt.Errorf("unsupported backend: %s", ssConfig.Backend)
}
db, err := initializer(homeDir, ssConfig)
stateStore, err := initializer(homeDir, ssConfig)
if err != nil {
return nil, err
}
return db, nil
// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
}
// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
return stateStore, nil
}

// RecoverStateStore will be called during initialization to recover the state from rlog
func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.StateStore) error {
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion, err := stateStore.GetLatestVersion()
if err != nil {
return err
}
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(
logger,
utils.GetChangelogPath(utils.GetChangelogPath(utils.GetCommitStorePath(homePath))),
changelog.Config{},
)
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
}
Expand All @@ -71,16 +79,26 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S
if lastOffset <= 0 || errLast != nil {
return err
}
firstEntry, errRead := streamHandler.ReadAt(firstOffset)
lastEntry, errRead := streamHandler.ReadAt(lastOffset)
if errRead != nil {
return err
}
firstVersion := firstEntry.Version
delta := uint64(firstVersion) - firstOffset
targetStartOffset := uint64(ssLatestVersion) - delta
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
// Replay from the offset where the offset where the version is larger than SS store latest version
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
return streamHandler.Replay(targetStartOffset+1, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
return streamHandler.Replay(targetStartOffset, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
// commit to state store
for _, cs := range entry.Changesets {
if err := stateStore.ApplyChangeset(entry.Version, cs); err != nil {
Expand Down
58 changes: 58 additions & 0 deletions ss/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ss

import (
"fmt"
"os"
"testing"

"github.com/cosmos/iavl"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/proto"
"github.com/stretchr/testify/require"
)

func TestNewStateStore(t *testing.T) {
tempDir := os.TempDir()
ssConfig := config.StateStoreConfig{
DedicatedChangelog: true,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 10,
KeepRecent: 100,
}
stateStore, err := NewStateStore(logger.NewNopLogger(), tempDir, ssConfig)
require.NoError(t, err)
for i := 1; i < 10; i++ {
var changesets []*proto.NamedChangeSet
kvPair := &iavl.KVPair{
Delete: false,
Key: []byte(fmt.Sprintf("key%d", i)),
Value: []byte(fmt.Sprintf("value%d", i)),
}
var pairs []*iavl.KVPair
pairs = append(pairs, kvPair)
cs := iavl.ChangeSet{Pairs: pairs}
ncs := &proto.NamedChangeSet{
Name: "storeA",
Changeset: cs,
}
changesets = append(changesets, ncs)
err := stateStore.ApplyChangesetAsync(int64(i), changesets)
require.NoError(t, err)
}
// Closing the state store without waiting for data to be fully flushed
err = stateStore.Close()
require.NoError(t, err)

// Reopen a new state store
stateStore, err = NewStateStore(logger.NewNopLogger(), tempDir, ssConfig)
require.NoError(t, err)

// Make sure key and values can be found
for i := 1; i < 10; i++ {
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("value%d", i), string(value))
}

}
3 changes: 3 additions & 0 deletions ss/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type StateStore interface {
// the version should be latest version plus one.
ApplyChangeset(version int64, cs *proto.NamedChangeSet) error

// ApplyChangesetAsync Write changesets into WAL file first and apply later for async writes
ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error

// Import the initial state of the store
Import(version int64, ch <-chan SnapshotNode) error

Expand Down
Loading
Loading