Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pruning and WAL capability for SS store #66

Merged
merged 14 commits into from
Jun 26, 2024
4 changes: 2 additions & 2 deletions common/utils/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func GetStateStorePath(homePath string, backend string) string {
return filepath.Join(homePath, "data", backend)
}

func GetChangelogPath(commitStorePath string) string {
return filepath.Join(commitStorePath, "changelog")
func GetChangelogPath(dbPath string) string {
return filepath.Join(dbPath, "changelog")
}
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ type StateStoreConfig struct {
// DBDirectory defines the directory to store the state store db files
// If not explicitly set, default to application home directory
// default to empty
DBDirectory string `mapstructure:"db-directory"`
DBDirectory string `mapstructure:"db-dir"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
Expand Down
70 changes: 67 additions & 3 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"encoding/binary"
"errors"
"fmt"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
"github.com/sei-protocol/sei-db/stream/changelog"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -48,6 +51,17 @@
// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
storeKeyDirty sync.Map

// Changelog used to support async write
streamHandler *changelog.Stream

// Pending changes to be written to the DB
pendingChanges chan VersionedChangesets
}

type VersionedChangesets struct {
Version int64
Changesets []*proto.NamedChangeSet
}

func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
Expand Down Expand Up @@ -93,12 +107,22 @@
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

return &Database{
database := &Database{
storage: db,
config: config,
earliestVersion: earliestVersion,
}, nil
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}
if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{DisableFsync: true, ZeroCopy: true},
)
database.streamHandler = streamHandler
go database.writeAsync()
Fixed Show fixed Hide fixed
}
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
Expand All @@ -110,6 +134,11 @@
func (db *Database) Close() error {
err := db.storage.Close()
db.storage = nil
if db.streamHandler != nil {
db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
return err
}

Expand Down Expand Up @@ -252,6 +281,41 @@
return b.Write()
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
// Write to WAL first
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Version: version,
}
entry.Changesets = changesets
err := db.streamHandler.WriteNextEntry(entry)
if err != nil {
return err
}
}
// Then write to pending changes
db.pendingChanges <- VersionedChangesets{
Version: version,
Changesets: changesets,
}
return nil
}

func (db *Database) writeAsync() {
for db.streamHandler != nil {
for nextChange := range db.pendingChanges {
version := nextChange.Version
for _, cs := range nextChange.Changesets {
err := db.ApplyChangeset(version, cs)
if err != nil {
panic(panic)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / lint

panic (built-in) must be called (typecheck)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / lint

panic (built-in) must be called) (typecheck)

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / Analyze

panic (built-in) must be called

Check failure on line 311 in ss/pebbledb/db.go

View workflow job for this annotation

GitHub Actions / tests

panic (built-in) must be called
}
}
db.SetLatestVersion(version)
}
}
}

// Prune attempts to prune all versions up to and including the current version
// Get the range of keys, manually iterate over them and delete them
// We add a heuristic to skip over a module's keys during pruning if it hasn't been updated
Expand Down
45 changes: 31 additions & 14 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ss

import (
"fmt"
"github.com/sei-protocol/sei-db/ss/pruning"

"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/common/utils"
Expand Down Expand Up @@ -34,32 +35,39 @@ func RegisterBackend(backendType BackendType, initializer BackendInitializer) {
}

// NewStateStore Create a new state store with the specified backend type
func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) {
initializer, ok := backends[BackendType(ssConfig.Backend)]
if !ok {
return nil, fmt.Errorf("unsupported backend: %s", ssConfig.Backend)
}
db, err := initializer(homeDir, ssConfig)
stateStore, err := initializer(homeDir, ssConfig)
if err != nil {
return nil, err
}
return db, nil
// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
}
// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
return stateStore, nil
}

// RecoverStateStore will be called during initialization to recover the state from rlog
func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.StateStore) error {
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion, err := stateStore.GetLatestVersion()
if err != nil {
return err
}
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(
logger,
utils.GetChangelogPath(utils.GetChangelogPath(utils.GetCommitStorePath(homePath))),
changelog.Config{},
)
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
}
Expand All @@ -71,16 +79,25 @@ func RecoverStateStore(homePath string, logger logger.Logger, stateStore types.S
if lastOffset <= 0 || errLast != nil {
return err
}
firstEntry, errRead := streamHandler.ReadAt(firstOffset)
lastEntry, errRead := streamHandler.ReadAt(lastOffset)
if errRead != nil {
return err
}
firstVersion := firstEntry.Version
delta := uint64(firstVersion) - firstOffset
targetStartOffset := uint64(ssLatestVersion) - delta
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset >= firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
return streamHandler.Replay(targetStartOffset+1, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
return streamHandler.Replay(targetStartOffset, lastOffset, func(index uint64, entry proto.ChangelogEntry) error {
// commit to state store
for _, cs := range entry.Changesets {
if err := stateStore.ApplyChangeset(entry.Version, cs); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions ss/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type StateStore interface {
// the version should be latest version plus one.
ApplyChangeset(version int64, cs *proto.NamedChangeSet) error

// ApplyChangesetAsync Write changesets into WAL file first and apply later for async writes
ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error

// Import the initial state of the store
Import(version int64, ch <-chan SnapshotNode) error

Expand Down
28 changes: 28 additions & 0 deletions stream/changelog/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
logger logger.Logger
writeChannel chan *Message
errSignal chan error
nextOffset uint64
}

type Message struct {
Expand All @@ -32,6 +33,7 @@
DisableFsync bool
ZeroCopy bool
WriteBufferSize int
KeepLast int
}

// NewStream creates a new changelog stream that persist the changesets in the log
Expand All @@ -42,6 +44,15 @@
})
if err != nil {
return nil, err
}
firstEntry, err := log.FirstIndex()
if err != nil {
return nil, err
}
if firstEntry <= 0 {

Check failure on line 52 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
}
if config.KeepLast > 0 {

Check failure on line 54 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)

}
return &Stream{
log: log,
Expand Down Expand Up @@ -75,6 +86,18 @@
return nil
}

// WriteNextEntry will write a new entry to the last index of the log.
// Whether the writes is in blocking or async manner depends on the buffer size.
func (stream *Stream) WriteNextEntry(entry proto.ChangelogEntry) error {
nextOffset := stream.nextOffset
err := stream.Write(nextOffset, entry)
if err != nil {
return err
}
stream.nextOffset++
return nil
}

// startWriteGoroutine will start a goroutine to write entries to the log.
// This should only be called on initialization if async write is enabled
func (stream *Stream) startWriteGoroutine() {
Expand Down Expand Up @@ -172,6 +195,11 @@
return nil
}

//

Check failure on line 198 in stream/changelog/changelog.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
func (stream *Stream) Pruning() {

}

func (stream *Stream) Close() error {
if stream.writeChannel == nil {
return nil
Expand Down
Loading