diff --git a/cmd/ronin/main.go b/cmd/ronin/main.go index 811b631b4d..baaa04f9a4 100644 --- a/cmd/ronin/main.go +++ b/cmd/ronin/main.go @@ -178,6 +178,7 @@ var ( utils.DisableRoninProtocol, utils.AdditionalChainEventFlag, utils.DBEngineFlag, + utils.ConcurrentUpdateThresholdFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index a7c3b246fe..5e614e51b3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1073,6 +1073,13 @@ var ( Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators", Category: flags.MockCategory, } + + ConcurrentUpdateThresholdFlag = &cli.IntFlag{ + Name: "concurrent-update-threshold", + Usage: "The threshold of concurrent update", + Value: 0, // disable concurrent update by default + Category: flags.EthCategory, + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -2029,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.Bool(MonitorFinalityVoteFlag.Name) { cfg.EnableMonitorFinalityVote = true } + // Set concurrent update threshold + if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) { + cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name) + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if @@ -2250,13 +2261,14 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai Fatalf("--%s must be either 'full' or 'archive'", GCModeFlag.Name) } cache := &core.CacheConfig{ - TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, - TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), - TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, - TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", - TrieTimeLimit: ethconfig.Defaults.TrieTimeout, - SnapshotLimit: ethconfig.Defaults.SnapshotCache, - Preimages: ctx.Bool(CachePreimagesFlag.Name), + TrieCleanLimit: ethconfig.Defaults.TrieCleanCache, + TrieCleanNoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name), + TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache, + TrieDirtyDisabled: ctx.String(GCModeFlag.Name) == "archive", + TrieTimeLimit: ethconfig.Defaults.TrieTimeout, + SnapshotLimit: ethconfig.Defaults.SnapshotCache, + Preimages: ctx.Bool(CachePreimagesFlag.Name), + ConcurrentUpdateThreshold: ctx.Int(ConcurrentUpdateThresholdFlag.Name), } if cache.TrieDirtyDisabled && !cache.Preimages { cache.Preimages = true diff --git a/core/blockchain.go b/core/blockchain.go index 9d123fc9e8..17d066cfe7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -139,6 +139,9 @@ type CacheConfig struct { TriesInMemory int // The number of tries is kept in memory before pruning SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // defaultCacheConfig are the default caching values if none are specified by the @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps) + statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold if err != nil { return it.index, err } diff --git a/core/state/state_object.go b/core/state/state_object.go index 138fcbdecd..6ba299a7fe 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -319,7 +319,7 @@ func (s *stateObject) finalise(prefetch bool) { // updateTrie writes cached storage modifications into the object's storage trie. // It will return nil if the trie has not been loaded and no changes have been made -func (s *stateObject) updateTrie(db Database) Trie { +func (s *stateObject) updateTrie(db Database, skipTrieUpdate bool) Trie { // Make sure all dirty slots are finalized into the pending storage area s.finalise(false) // Don't prefetch anymore, pull directly if need be if len(s.pendingStorage) == 0 { @@ -332,25 +332,32 @@ func (s *stateObject) updateTrie(db Database) Trie { // The snapshot storage map for the object var storage map[common.Hash][]byte // Insert all the pending updates into the trie - tr := s.getTrie(db) + var tr Trie + if !skipTrieUpdate { + tr = s.getTrie(db) + } hasher := s.db.hasher - usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { continue } - s.originStorage[key] = value - + if !skipTrieUpdate { + s.originStorage[key] = value + } var v []byte if (value == common.Hash{}) { - s.setError(tr.TryDelete(key[:])) + if !skipTrieUpdate { + s.setError(tr.TryDelete(key[:])) + } s.db.StorageDeleted += 1 } else { // Encoding []byte cannot fail, ok to ignore the error. v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) + if !skipTrieUpdate { + s.setError(tr.TryUpdate(key[:], v)) + } s.db.StorageUpdated += 1 } // If state snapshotting is active, cache the data til commit @@ -369,16 +376,44 @@ func (s *stateObject) updateTrie(db Database) Trie { if s.db.prefetcher != nil { s.db.prefetcher.used(s.data.Root, usedStorage) } + if !skipTrieUpdate { + if len(s.pendingStorage) > 0 { + s.pendingStorage = make(Storage) + } + } + return tr +} + +func (s *stateObject) updateTrieConcurrent(db Database) { + tr := s.getTrie(db) + for key, value := range s.pendingStorage { + // Skip noop changes, persist actual changes + if value == s.originStorage[key] { + continue + } + s.originStorage[key] = value + + var v []byte + + if (value == common.Hash{}) { + s.setError(tr.TryDelete(key[:])) + } else { + v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) + s.setError(tr.TryUpdate(key[:], v)) + } + } if len(s.pendingStorage) > 0 { s.pendingStorage = make(Storage) } - return tr + if tr != nil { + s.data.Root = s.trie.Hash() + } } // UpdateRoot sets the trie root to the current root hash of func (s *stateObject) updateRoot(db Database) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if s.updateTrie(db, false) == nil { return } // Track the amount of time wasted on hashing the storage trie @@ -392,7 +427,7 @@ func (s *stateObject) updateRoot(db Database) { // This updates the trie root. func (s *stateObject) CommitTrie(db Database) (int, error) { // If nothing changed, don't bother with hashing anything - if s.updateTrie(db) == nil { + if s.updateTrie(db, false) == nil { return 0, nil } if s.dbErr != nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 014e8a0891..854b97396c 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,7 +21,9 @@ import ( "errors" "fmt" "math/big" + "runtime" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -122,6 +124,9 @@ type StateDB struct { StorageUpdated int AccountDeleted int StorageDeleted int + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // New creates a new state from a given trie. @@ -358,7 +363,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie { return nil } cpy := stateObject.deepCopy(s) - cpy.updateTrie(s.db) + cpy.updateTrie(s.db, false) return cpy.getTrie(s.db) } @@ -855,11 +860,58 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // the account prefetcher. Instead, let's process all the storage updates // first, giving the account prefeches just a few more milliseconds of time // to pull useful data from disk. + + // Get the stateObjects needed to be updated + updateObjs := []*stateObject{} for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { + updateObjs = append(updateObjs, obj) + } + } + + if len(updateObjs) < s.ConcurrentUpdateThreshold || s.ConcurrentUpdateThreshold == 0 { + // Update the state trie sequentially + for _, obj := range updateObjs { obj.updateRoot(s.db) } + } else { + // Update the state trie concurrently + for _, obj := range updateObjs { + obj.updateTrie(s.db, true) + } + + nGoroutines := runtime.NumCPU() + if nGoroutines > len(updateObjs) { + nGoroutines = len(updateObjs) + } + + if nGoroutines != 0 { + nObjectsPerRoutine := len(updateObjs) / nGoroutines + nObjectsRemaining := len(updateObjs) % nGoroutines + wg := sync.WaitGroup{} + wg.Add(nGoroutines) + i := 0 + for { + nObjects := nObjectsPerRoutine + if nObjectsRemaining > 0 { + nObjects++ + nObjectsRemaining-- + } + go func(objs []*stateObject) { + defer wg.Done() + for _, obj := range objs { + obj.updateTrieConcurrent(s.db) + } + }(updateObjs[i : i+nObjects]) + i += nObjects + if i == len(updateObjs) { + break + } + } + wg.Wait() + } } + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index e9576d4dc4..d8adc91775 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -28,6 +28,7 @@ import ( "sync" "testing" "testing/quick" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" @@ -915,3 +916,41 @@ func TestStateDBAccessList(t *testing.T) { t.Fatalf("expected empty, got %d", got) } } + +func TestIntermediateUpdateConcurrently(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().Unix())) + // Create an empty state + db1 := rawdb.NewMemoryDatabase() + db2 := rawdb.NewMemoryDatabase() + state1, _ := New(common.Hash{}, NewDatabase(db1), nil) + state2, _ := New(common.Hash{}, NewDatabase(db2), nil) + + // Update it with random data + for i := int64(0); i < 1000; i++ { + addr := common.BigToAddress(big.NewInt(i)) + balance := big.NewInt(int64(rng.Int63())) + nonce := rng.Uint64() + key := common.BigToHash(big.NewInt(int64(rng.Int63()))) + value := common.BigToHash(big.NewInt(int64(rng.Int63()))) + code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())} + state1.SetBalance(addr, balance) + state2.SetBalance(addr, balance) + state1.SetNonce(addr, nonce) + state2.SetNonce(addr, nonce) + state1.SetState(addr, key, value) + state2.SetState(addr, key, value) + state1.SetCode(addr, code) + state2.SetCode(addr, code) + } + + state1.ConcurrentUpdateThreshold = 0 + state2.ConcurrentUpdateThreshold = 1 + + root1 := state1.IntermediateRoot(false) // sequential + root2 := state2.IntermediateRoot(false) // concurrent + + if root1 != root2 { + t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex()) + } + +} diff --git a/eth/backend.go b/eth/backend.go index 4f8c10ec45..060a889ec6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { EnablePreimageRecording: config.EnablePreimageRecording, } cacheConfig = &core.CacheConfig{ - TrieCleanLimit: config.TrieCleanCache, - TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), - TrieCleanRejournal: config.TrieCleanCacheRejournal, - TrieCleanNoPrefetch: config.NoPrefetch, - TrieDirtyLimit: config.TrieDirtyCache, - TrieDirtyDisabled: config.NoPruning, - TrieTimeLimit: config.TrieTimeout, - SnapshotLimit: config.SnapshotCache, - Preimages: config.Preimages, - TriesInMemory: config.TriesInMemory, + TrieCleanLimit: config.TrieCleanCache, + TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal), + TrieCleanRejournal: config.TrieCleanCacheRejournal, + TrieCleanNoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + TrieDirtyDisabled: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, + Preimages: config.Preimages, + TriesInMemory: config.TriesInMemory, + ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold, } ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 5681bea965..d3c5d42d7c 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -222,6 +222,9 @@ type Config struct { // Send additional chain event EnableAdditionalChainEvent bool + + // Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable + ConcurrentUpdateThreshold int } // CreateConsensusEngine creates a consensus engine for the given chain configuration.