Skip to content

Commit

Permalink
intermediateRoot: update concurrency handling
Browse files Browse the repository at this point in the history
statedb/intermediateRoot: update options for concurrent: get changes that are not made yet to run concurrently
state_object/updateTrie: add option for concurrency: store and return data waiting for update
  • Loading branch information
Francesco4203 committed May 22, 2024
1 parent 51fef19 commit 08fd7c3
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 63 deletions.
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2036,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.Bool(MonitorFinalityVoteFlag.Name) {
cfg.EnableMonitorFinalityVote = true
}
// Set concurrent update threshold
if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) {
cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name)
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down
56 changes: 44 additions & 12 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,17 @@ func (s *stateObject) finalise(prefetch bool) {

// updateTrie writes cached storage modifications into the object's storage trie.
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
func (s *stateObject) updateTrie(db Database, concurrent bool) (Trie, []struct {
s *stateObject
key common.Hash
value common.Hash
v []byte
tr Trie
}) {
// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false) // Don't prefetch anymore, pull directly if need be
if len(s.pendingStorage) == 0 {
return s.trie
return s.trie, nil
}
// Track the amount of time wasted on updating the storage trie
if metrics.EnabledExpensive {
Expand All @@ -334,6 +340,14 @@ func (s *stateObject) updateTrie(db Database) Trie {
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher
// Storage saved for later update
var savedStorage []struct {
s *stateObject
key common.Hash
value common.Hash
v []byte
tr Trie
}

usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
Expand All @@ -344,15 +358,33 @@ func (s *stateObject) updateTrie(db Database) Trie {
s.originStorage[key] = value

var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1

if concurrent {
if (value == common.Hash{}) {
s.db.StorageDeleted += 1
} else {
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.db.StorageUpdated += 1
}
savedStorage = append(savedStorage, struct {
s *stateObject
key common.Hash
value common.Hash
v []byte
tr Trie
}{s, key, value, v, tr})
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
}
}

// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
if storage == nil {
Expand All @@ -372,13 +404,13 @@ func (s *stateObject) updateTrie(db Database) Trie {
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
return tr
return tr, savedStorage
}

// UpdateRoot sets the trie root to the current root hash of
func (s *stateObject) updateRoot(db Database) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if trie, _ := s.updateTrie(db, false); trie == nil {
return
}
// Track the amount of time wasted on hashing the storage trie
Expand All @@ -392,7 +424,7 @@ func (s *stateObject) updateRoot(db Database) {
// This updates the trie root.
func (s *stateObject) CommitTrie(db Database) (int, error) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if trie, _ := s.updateTrie(db, false); trie == nil {
return 0, nil
}
if s.dbErr != nil {
Expand Down
91 changes: 81 additions & 10 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie {
return nil
}
cpy := stateObject.deepCopy(s)
cpy.updateTrie(s.db)
cpy.updateTrie(s.db, false)
return cpy.getTrie(s.db)
}

Expand Down Expand Up @@ -875,30 +875,101 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
obj.updateRoot(s.db)
}
} else {
// Declare min function
var pendingStorage []struct {
s *stateObject
key common.Hash
value common.Hash
v []byte
tr Trie
}
var pendingRootUpdate []*stateObject

for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
trie, savedStorage := obj.updateTrie(s.db, true)
pendingStorage = append(pendingStorage, savedStorage...)
if trie != nil {
pendingRootUpdate = append(pendingRootUpdate, obj)
}
}
}

// Update the account trie
min := func(a, b int) int {
if a < b {
return a
}
return b
}
// Update the state objects using goroutines, with maximum of NumCPU goroutines
nRoutines := min(runtime.NumCPU(), len(updateObjs))
nRoutines := min(runtime.NumCPU(), len(pendingStorage))
if nRoutines != 0 {
nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines
nUpdatePerRoutine := (len(pendingStorage) + nRoutines - 1) / nRoutines
wg := sync.WaitGroup{}
wg.Add(nRoutines)
for i := 0; i < len(updateObjs); i += nObjPerRoutine {
go func(objs []*stateObject) {
for i := 0; i < len(pendingStorage); i += nUpdatePerRoutine {
go func(stores []struct {
s *stateObject
key common.Hash
value common.Hash
v []byte
tr Trie
}) {
defer wg.Done()
for _, obj := range objs {
obj.updateRoot(s.db)
for _, store := range stores {
if (store.value == common.Hash{}) {
s.setError(store.tr.TryDelete(store.key[:]))
} else {
// Encoding []byte cannot fail, ok to ignore the error.
store.v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(store.value[:]))
s.setError(store.tr.TryUpdate(store.key[:], store.v))
}
}
}(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))])
}(pendingStorage[i:min(i+nUpdatePerRoutine, len(pendingStorage))])
}
wg.Wait()
}
for _, obj := range pendingRootUpdate {
obj.data.Root = obj.trie.Hash()
}
}
// // Get the stateObjects needed to be updated
// updateObjs := []*stateObject{}
// for addr := range s.stateObjectsPending {
// if obj := s.stateObjects[addr]; !obj.deleted {
// updateObjs = append(updateObjs, obj)
// }
// }

// if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 {
// // Update the state objects sequentially
// for _, obj := range updateObjs {
// obj.updateRoot(s.db)
// }
// } else {
// // Declare min function
// min := func(a, b int) int {
// if a < b {
// return a
// }
// return b
// }
// // Update the state objects using goroutines, with maximum of NumCPU goroutines
// nRoutines := min(runtime.NumCPU(), len(updateObjs))
// if nRoutines != 0 {
// nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines
// wg := sync.WaitGroup{}
// wg.Add(nRoutines)
// for i := 0; i < len(updateObjs); i += nObjPerRoutine {
// go func(objs []*stateObject) {
// defer wg.Done()
// for _, obj := range objs {
// obj.updateRoot(s.db)
// }
// }(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))])
// }
// wg.Wait()
// }
// }

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
Expand Down
45 changes: 4 additions & 41 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,48 +946,11 @@ func TestIntermediateUpdateConcurrently(t *testing.T) {
state1.ConcurrentUpdateThreshold = 0
state2.ConcurrentUpdateThreshold = 1

state1.IntermediateRoot(false) // sequential
state2.IntermediateRoot(false) // concurrent
root1 := state1.IntermediateRoot(false) // sequential
root2 := state2.IntermediateRoot(false) // concurrent

root1, err1 := state1.Commit(false)
root2, err2 := state2.Commit(false)

if err1 != nil {
t.Fatalf("sequential commit failed: %v", err1)
}
if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil {
t.Errorf("cannot commit trie %v to persistent database", root1.Hex())
}
if err2 != nil {
t.Fatalf("concurrent commit failed: %v", err2)
}
if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil {
t.Errorf("cannot commit trie %v to persistent database", root2.Hex())
if root1 != root2 {
t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex())
}

it1 := db1.NewIterator(nil, nil)
it2 := db2.NewIterator(nil, nil)
for it1.Next() {
if !it2.Next() {
t.Fatalf("concurrent iterator ended prematurely")
}
if !bytes.Equal(it1.Key(), it2.Key()) {
t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key()))
}
if !bytes.Equal(it1.Value(), it2.Value()) {
t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value()))
}
}
if it1.Error() != nil {
t.Fatalf("sequential iterator error: %v", it1.Error())
}
if it2.Error() != nil {
t.Fatalf("concurrent iterator error: %v", it2.Error())
}
if it1.Next() {
t.Fatalf("sequential iterator has extra data")
}
if it2.Next() {
t.Fatalf("concurrent iterator has extra data")
}
}

0 comments on commit 08fd7c3

Please sign in to comment.