Skip to content
This repository was archived by the owner on Jun 6, 2025. It is now read-only.

Commit 324ccbd

Browse files
committed
IntermediateRoot: add flag for threshold to update concurrently
Divide the root updating of stateObjects into goroutines if number of stateObjects is at least the threshold statedb_test.go/TestIntermediateUpdateConcurrently: add test to check if the states after processed with both options are identical
1 parent 9e55df7 commit 324ccbd

File tree

2 files changed

+116
-0
lines changed

2 files changed

+116
-0
lines changed

core/state/statedb.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import (
2121
"errors"
2222
"fmt"
2323
"math/big"
24+
"runtime"
2425
"sort"
26+
"sync"
2527
"time"
2628

2729
"github.com/ethereum/go-ethereum/common"
@@ -122,6 +124,9 @@ type StateDB struct {
122124
StorageUpdated int
123125
AccountDeleted int
124126
StorageDeleted int
127+
128+
// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
129+
ConcurrentUpdateThreshold int
125130
}
126131

127132
// New creates a new state from a given trie.
@@ -855,11 +860,46 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
855860
// the account prefetcher. Instead, let's process all the storage updates
856861
// first, giving the account prefeches just a few more milliseconds of time
857862
// to pull useful data from disk.
863+
864+
// Get the stateObjects needed to be updated
865+
updateObjs := []*stateObject{}
858866
for addr := range s.stateObjectsPending {
859867
if obj := s.stateObjects[addr]; !obj.deleted {
868+
updateObjs = append(updateObjs, obj)
869+
}
870+
}
871+
872+
if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 {
873+
// Update the state objects sequentially
874+
for _, obj := range updateObjs {
860875
obj.updateRoot(s.db)
861876
}
877+
} else {
878+
// Declare min function
879+
min := func(a, b int) int {
880+
if a < b {
881+
return a
882+
}
883+
return b
884+
}
885+
// Update the state objects using goroutines, with maximum of NumCPU goroutines
886+
nRoutines := min(runtime.NumCPU(), len(updateObjs))
887+
if nRoutines != 0 {
888+
nObjPerRoutine := (len(updateObjs) + nRoutines - 1) / nRoutines
889+
wg := sync.WaitGroup{}
890+
wg.Add(nRoutines)
891+
for i := 0; i < len(updateObjs); i += nObjPerRoutine {
892+
go func(objs []*stateObject) {
893+
defer wg.Done()
894+
for _, obj := range objs {
895+
obj.updateRoot(s.db)
896+
}
897+
}(updateObjs[i:min(i+nObjPerRoutine, len(updateObjs))])
898+
}
899+
wg.Wait()
900+
}
862901
}
902+
863903
// Now we're about to start to write changes to the trie. The trie is so far
864904
// _untouched_. We can check with the prefetcher, if it can give us a trie
865905
// which has the same root, but also has some content loaded into it.

core/state/statedb_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"sync"
2929
"testing"
3030
"testing/quick"
31+
"time"
3132

3233
"github.com/ethereum/go-ethereum/common"
3334
"github.com/ethereum/go-ethereum/core/rawdb"
@@ -915,3 +916,78 @@ func TestStateDBAccessList(t *testing.T) {
915916
t.Fatalf("expected empty, got %d", got)
916917
}
917918
}
919+
920+
func TestIntermediateUpdateConcurrently(t *testing.T) {
921+
rng := rand.New(rand.NewSource(time.Now().Unix()))
922+
// Create an empty state
923+
db1 := rawdb.NewMemoryDatabase()
924+
db2 := rawdb.NewMemoryDatabase()
925+
state1, _ := New(common.Hash{}, NewDatabase(db1), nil)
926+
state2, _ := New(common.Hash{}, NewDatabase(db2), nil)
927+
928+
// Update it with random data
929+
for i := int64(0); i < 1000; i++ {
930+
addr := common.BigToAddress(big.NewInt(i))
931+
balance := big.NewInt(int64(rng.Int63()))
932+
nonce := rng.Uint64()
933+
key := common.BigToHash(big.NewInt(int64(rng.Int63())))
934+
value := common.BigToHash(big.NewInt(int64(rng.Int63())))
935+
code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())}
936+
state1.SetBalance(addr, balance)
937+
state2.SetBalance(addr, balance)
938+
state1.SetNonce(addr, nonce)
939+
state2.SetNonce(addr, nonce)
940+
state1.SetState(addr, key, value)
941+
state2.SetState(addr, key, value)
942+
state1.SetCode(addr, code)
943+
state2.SetCode(addr, code)
944+
}
945+
946+
state1.ConcurrentUpdateThreshold = 0
947+
state2.ConcurrentUpdateThreshold = 1
948+
949+
state1.IntermediateRoot(false) // sequential
950+
state2.IntermediateRoot(false) // concurrent
951+
952+
root1, err1 := state1.Commit(false)
953+
root2, err2 := state2.Commit(false)
954+
955+
if err1 != nil {
956+
t.Fatalf("sequential commit failed: %v", err1)
957+
}
958+
if err1 = state1.Database().TrieDB().Commit(root1, false, nil); err1 != nil {
959+
t.Errorf("cannot commit trie %v to persistent database", root1.Hex())
960+
}
961+
if err2 != nil {
962+
t.Fatalf("concurrent commit failed: %v", err2)
963+
}
964+
if err2 = state2.Database().TrieDB().Commit(root2, false, nil); err2 != nil {
965+
t.Errorf("cannot commit trie %v to persistent database", root2.Hex())
966+
}
967+
968+
it1 := db1.NewIterator(nil, nil)
969+
it2 := db2.NewIterator(nil, nil)
970+
for it1.Next() {
971+
if !it2.Next() {
972+
t.Fatalf("concurrent iterator ended prematurely")
973+
}
974+
if !bytes.Equal(it1.Key(), it2.Key()) {
975+
t.Fatalf("concurrent iterator key mismatch: " + string(it1.Key()) + " != " + string(it2.Key()))
976+
}
977+
if !bytes.Equal(it1.Value(), it2.Value()) {
978+
t.Fatalf("concurrent iterator value mismatch: " + string(it1.Value()) + " != " + string(it2.Value()))
979+
}
980+
}
981+
if it1.Error() != nil {
982+
t.Fatalf("sequential iterator error: %v", it1.Error())
983+
}
984+
if it2.Error() != nil {
985+
t.Fatalf("concurrent iterator error: %v", it2.Error())
986+
}
987+
if it1.Next() {
988+
t.Fatalf("sequential iterator has extra data")
989+
}
990+
if it2.Next() {
991+
t.Fatalf("concurrent iterator has extra data")
992+
}
993+
}

0 commit comments

Comments
 (0)