Skip to content

[SeiDB] Fix MemIAVL Race Condition during snapshot reload #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,11 @@ func (db *DB) reload() error {
}

func (db *DB) reloadMultiTree(mtree *MultiTree) error {
if err := db.MultiTree.Close(); err != nil {
// catch-up the pending changes
if err := mtree.apply(db.pendingLogEntry); err != nil {
return err
}

db.MultiTree = *mtree
// catch-up the pending changes
return db.MultiTree.apply(db.pendingLogEntry)
return db.MultiTree.ReplaceWith(mtree)
}

// rewriteIfApplicable execute the snapshot rewrite strategy according to current height
Expand Down
13 changes: 10 additions & 3 deletions sc/memiavl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ func TestRewriteSnapshotBackground(t *testing.T) {
})
require.NoError(t, err)

// spin up goroutine to keep querying the tree
stopped := false
go func() {
for !stopped {
value := db.TreeByName("test").Get([]byte("hello1"))
require.True(t, value == nil || string(value) == "world1")
}
}()

for i, changes := range ChangeSets {
cs := []*proto.NamedChangeSet{
{
Expand All @@ -111,15 +120,12 @@ func TestRewriteSnapshotBackground(t *testing.T) {
require.NoError(t, err)
require.Equal(t, i+1, int(v))
require.Equal(t, RefHashes[i], db.lastCommitInfo.StoreInfos[0].CommitId.Hash)

_ = db.RewriteSnapshotBackground()
time.Sleep(time.Millisecond * 20)
}

for db.snapshotRewriteChan != nil {
require.NoError(t, db.checkAsyncTasks())
}

db.pruneSnapshotLock.Lock()
defer db.pruneSnapshotLock.Unlock()

Expand All @@ -128,6 +134,7 @@ func TestRewriteSnapshotBackground(t *testing.T) {

// three files: snapshot, current link, rlog, LOCK
require.Equal(t, 4, len(entries))
stopped = true
}

func TestRlog(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,17 @@ func (t *MultiTree) Close() error {
return errors.Join(errs...)
}

func (t *MultiTree) ReplaceWith(other *MultiTree) error {
errs := make([]error, 0, len(t.trees))
for _, entry := range t.trees {
errs = append(errs, entry.Tree.ReplaceWith(other.TreeByName(entry.Name)))
}
t.treesByName = other.treesByName
t.lastCommitInfo = other.lastCommitInfo
t.metadata = other.metadata
return errors.Join(errs...)
}

func readMetadata(dir string) (*proto.MultiTreeMetadata, error) {
// load commit info
bz, err := os.ReadFile(filepath.Join(dir, MetadataFileName))
Expand Down
40 changes: 37 additions & 3 deletions sc/memiavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Tree struct {
// when true, the get and iterator methods could return a slice pointing to mmaped blob files.
zeroCopy bool

// sync.RWMutex is used to protect the cache for thread safety
// sync.RWMutex is used to protect the tree for thread safety during snapshot reload
mtx *sync.RWMutex
}

Expand Down Expand Up @@ -93,13 +93,14 @@ func (t *Tree) SetInitialVersion(initialVersion int64) error {

// Copy returns a snapshot of the tree which won't be modified by further modifications on the main tree,
// the returned new tree can be accessed concurrently with the main tree.
func (t *Tree) Copy(cacheSize int) *Tree {
func (t *Tree) Copy(_ int) *Tree {
t.mtx.RLock()
defer t.mtx.RUnlock()
if _, ok := t.root.(*MemNode); ok {
// protect the existing `MemNode`s from get modified in-place
t.cowVersion = t.version
}
newTree := *t
// cache is not copied along because it's not thread-safe to access
newTree.mtx = &sync.RWMutex{}
return &newTree
}
Expand All @@ -116,6 +117,8 @@ func (t *Tree) ApplyChangeSet(changeSet iavl.ChangeSet) {
}

func (t *Tree) Set(key, value []byte) {
t.mtx.Lock()
defer t.mtx.Unlock()
if value == nil {
// the value could be nil when replaying changes from write-ahead-log because of protobuf decoding
value = []byte{}
Expand All @@ -124,6 +127,8 @@ func (t *Tree) Set(key, value []byte) {
}

func (t *Tree) Remove(key []byte) {
t.mtx.Lock()
defer t.mtx.Unlock()
_, t.root, _ = removeRecursive(t.root, key, t.version+1, t.cowVersion)
}

Expand All @@ -150,13 +155,17 @@ func (t *Tree) Version() int64 {
// RootHash updates the hashes and return the current root hash,
// it clones the persisted node's bytes, so the returned bytes is safe to retain.
func (t *Tree) RootHash() []byte {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.root == nil {
return emptyHash
}
return t.root.SafeHash()
}

func (t *Tree) GetWithIndex(key []byte) (int64, []byte) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.root == nil {
return 0, nil
}
Expand All @@ -169,6 +178,8 @@ func (t *Tree) GetWithIndex(key []byte) (int64, []byte) {
}

func (t *Tree) GetByIndex(index int64) ([]byte, []byte) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if index > math.MaxUint32 {
return nil, nil
}
Expand All @@ -195,12 +206,16 @@ func (t *Tree) Has(key []byte) bool {
}

func (t *Tree) Iterator(start, end []byte, ascending bool) dbm.Iterator {
t.mtx.RLock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterator is tricker, it supposed the hold the RLock before the iterator is closed

Copy link
Collaborator Author

@yzang2019 yzang2019 Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah that's a good point, iterator is a bit tricky, this lock only covers the creation of the iterator, might not be enough. I think for now it's probably fine since we don't have a lot of use cases using iterators in EVM, we definitely can add more locking coverage in the future

defer t.mtx.RUnlock()
return NewIterator(start, end, ascending, t.root, t.zeroCopy)
}

// ScanPostOrder scans the tree in post-order, and call the callback function on each node.
// If the callback function returns false, the scan will be stopped.
func (t *Tree) ScanPostOrder(callback func(node Node) bool) {
t.mtx.RLock()
defer t.mtx.RUnlock()
if t.root == nil {
return
}
Expand Down Expand Up @@ -248,6 +263,8 @@ func (t *Tree) Export() *Exporter {
}

func (t *Tree) Close() error {
t.mtx.Lock()
defer t.mtx.Unlock()
var err error
if t.snapshot != nil {
err = t.snapshot.Close()
Expand All @@ -257,6 +274,23 @@ func (t *Tree) Close() error {
return err
}

// ReplaceWith is used during reload to replace the current tree with the newly loaded snapshot
func (t *Tree) ReplaceWith(other *Tree) error {
t.mtx.Lock()
defer t.mtx.Unlock()
snapshot := t.snapshot
t.version = other.version
t.root = other.root
t.snapshot = other.snapshot
t.initialVersion = other.initialVersion
t.cowVersion = other.cowVersion
t.zeroCopy = other.zeroCopy
if snapshot != nil {
return snapshot.Close()
}
return nil
}

// nextVersionU32 is compatible with existing golang iavl implementation.
// see: https://github.com/cosmos/iavl/pull/660
func nextVersionU32(v uint32, initialVersion uint32) uint32 {
Expand Down