Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pruning without batch #2944

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion libs/iavl/mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func (tree *MutableTree) deleteVersion(batch dbm.Batch, version int64, versions
return err
}

tree.ndb.deleteVersion(batch, version, true)
tree.ndb.deleteVersion(batch, version, true, false)

return nil
}
Expand Down
1 change: 1 addition & 0 deletions libs/iavl/mutable_tree_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (tm *TreeMap) addNewTree(tree *MutableTree) {
defer tm.mtx.Unlock()
if _, ok := tm.mutableTreeSavedMap[tree.GetModuleName()]; !ok {
tm.mutableTreeSavedMap[tree.GetModuleName()] = tree
tree.ndb.cleanPruningInDB()
go tree.commitSchedule()
if EnablePruningHistoryState {
go tree.pruningSchedule()
Expand Down
3 changes: 2 additions & 1 deletion libs/iavl/mutable_tree_oec.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,10 @@ func (tree *MutableTree) pruningSchedule() {
for event := range tree.pruneCh {
if event.version >= 0 {
trc := trace.NewTracer("pruningSchedule")
noBatch := IavlCommitAsyncNoBatch
batch := tree.ndb.NewBatch()
trc.Pin("deleteVersion")
tree.ndb.deleteVersion(batch, event.version, true)
tree.ndb.deleteVersion(batch, event.version, true, noBatch)
trc.Pin("Commit")
if err := tree.ndb.Commit(batch); err != nil {
panic(err)
Expand Down
57 changes: 54 additions & 3 deletions libs/iavl/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
// Using semantic versioning: https://semver.org/
defaultStorageVersionValue = "1.0.0"
fastStorageVersionValue = "1.1.0"

pruningVersionKey = "pruning_version"
)

var (
Expand Down Expand Up @@ -629,7 +631,7 @@ func (ndb *nodeDB) deleteOrphans(batch dbm.Batch, version int64) {
if predecessor < fromVersion || fromVersion == toVersion {
ndb.log(IavlDebug, "DELETE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash)
batch.Delete(ndb.nodeKey(hash))
ndb.syncUnCacheNode(hash)
ndb.uncacheNode(hash)
ndb.state.increaseDeletedCount()
} else {
ndb.log(IavlDebug, "MOVE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash)
Expand All @@ -638,6 +640,48 @@ func (ndb *nodeDB) deleteOrphans(batch dbm.Batch, version int64) {
})
}

func (ndb *nodeDB) deleteOrphansFromDB(version int64) {
// Will be zero if there is no previous version.
predecessor := ndb.getPreviousVersion(version)

// Traverse orphans with a lifetime ending at the version specified.
// TODO optimize.
ndb.traverseOrphansVersion(version, func(key, hash []byte) {
var fromVersion, toVersion int64

// See comment on `orphanKeyFmt`. Note that here, `version` and
// `toVersion` are always equal.
orphanKeyFormat.Scan(key, &toVersion, &fromVersion)

// Delete orphan key and reverse-lookup key.
err := ndb.db.Delete(key)
cwbhhjl marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}

// If there is no predecessor, or the predecessor is earlier than the
// beginning of the lifetime (ie: negative lifetime), or the lifetime
// spans a single version and that version is the one being deleted, we
// can delete the orphan. Otherwise, we shorten its lifetime, by
// moving its endpoint to the previous version.
if predecessor < fromVersion || fromVersion == toVersion {
ndb.log(IavlDebug, "DELETE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash)
err = ndb.db.Delete(ndb.nodeKey(hash))
if err != nil {
panic(err)
}
ndb.uncacheNode(hash)
ndb.state.increaseDeletedCount()
} else {
ndb.log(IavlDebug, "MOVE", "predecessor", predecessor, "fromVersion", fromVersion, "toVersion", toVersion, "hash", hash)
err = ndb.saveOrphanToDB(hash, fromVersion, predecessor)
if err != nil {
panic(err)
}
}
})
}

func (ndb *nodeDB) nodeKey(hash []byte) []byte {
return nodeKeyFormat.KeyBytes(hash)
}
Expand Down Expand Up @@ -710,11 +754,18 @@ func (ndb *nodeDB) getPreviousVersion(version int64) int64 {
}

// deleteRoot deletes the root entry from disk, but not the node it points to.
func (ndb *nodeDB) deleteRoot(batch dbm.Batch, version int64, checkLatestVersion bool) {
func (ndb *nodeDB) deleteRoot(batch dbm.Batch, version int64, checkLatestVersion bool, writeToDB bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename writeToDB a better name. batch is writeToDB too ;P

if checkLatestVersion && version == ndb.getLatestVersion() {
panic("Tried to delete latest version")
}
batch.Delete(ndb.rootKey(version))
if !writeToDB {
batch.Delete(ndb.rootKey(version))
} else {
err := ndb.db.Delete(ndb.rootKey(version))
if err != nil {
panic(err)
}
}
}

func (ndb *nodeDB) traverseOrphans(fn func(k, v []byte)) {
Expand Down
63 changes: 60 additions & 3 deletions libs/iavl/nodedb_oec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"container/list"
"encoding/binary"
"fmt"
"strconv"

cmap "github.com/orcaman/concurrent-map"

Expand Down Expand Up @@ -269,13 +270,69 @@ func (ndb *nodeDB) DeleteVersion(batch dbm.Batch, version int64, checkLatestVers
return err
}

ndb.deleteVersion(batch, version, checkLatestVersion)
ndb.deleteVersion(batch, version, checkLatestVersion, false)
return nil
}

func (ndb *nodeDB) deleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool) {
func (ndb *nodeDB) deleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool, writeToDB bool) {
if !writeToDB {
ndb.deleteOrphans(batch, version)
ndb.deleteRoot(batch, version, checkLatestVersion, writeToDB)
} else {
cwbhhjl marked this conversation as resolved.
Show resolved Hide resolved
ndb.setPruningRoot(version, checkLatestVersion)
ndb.deleteRoot(batch, version, checkLatestVersion, writeToDB)
ndb.deleteOrphansFromDB(version)
ndb.deletePruningRoot()
}
}

func (ndb *nodeDB) cleanPruningInDB() {
version, exist := ndb.getPruningRoot()
if !exist {
return
}
ndb.log(IavlErr, "start cleanPruningInDB", "name", ndb.name, "version", version)
ndb.deleteRoot(nil, version, false, true)
batch := ndb.db.NewBatch()
defer batch.Close()
ndb.deleteOrphans(batch, version)
ndb.deleteRoot(batch, version, checkLatestVersion)
if err := batch.Write(); err != nil {
panic(err)
}
ndb.deletePruningRoot()
ndb.log(IavlErr, "cleanPruningInDB is done", "name", ndb.name, "version", version)
}

func (ndb *nodeDB) setPruningRoot(version int64, checkLatestVersion bool) {
if checkLatestVersion && version == ndb.getLatestVersion() {
panic("Tried to delete latest version")
}
err := ndb.db.Set(metadataKeyFormat.Key([]byte(pruningVersionKey)), []byte(strconv.FormatInt(version, 10)))
if err != nil {
panic(err)
}
}

func (ndb *nodeDB) deletePruningRoot() {
err := ndb.db.Delete(metadataKeyFormat.Key([]byte(pruningVersionKey)))
if err != nil {
panic(err)
}
}

func (ndb *nodeDB) getPruningRoot() (int64, bool) {
bz, err := ndb.db.Get(metadataKeyFormat.Key([]byte(pruningVersionKey)))
if err != nil {
panic(err)
}
if len(bz) == 0 {
return 0, false
}
v, err := strconv.ParseInt(string(bz), 10, 64)
if err != nil {
panic(err)
}
return v, true
}

func (ndb *nodeDB) checkoutVersionReaders(version int64) error {
Expand Down