Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IntermediateRoot: add flag for threshold to update concurrently #453

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ronin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ var (
utils.DisableRoninProtocol,
utils.AdditionalChainEventFlag,
utils.DBEngineFlag,
utils.ConcurrentUpdateThresholdFlag,
}

rpcFlags = []cli.Flag{
Expand Down
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,13 @@ var (
Usage: "List of mock bls public keys which are reflect 1:1 with mock.validators",
Category: flags.MockCategory,
}

ConcurrentUpdateThresholdFlag = &cli.IntFlag{
Name: "concurrent-update-threshold",
Usage: "The threshold of concurrent update",
Value: 0, // disable concurrent update by default
Category: flags.EthCategory,
}
)

// MakeDataDir retrieves the currently requested data directory, terminating
Expand Down Expand Up @@ -2029,6 +2036,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.Bool(MonitorFinalityVoteFlag.Name) {
cfg.EnableMonitorFinalityVote = true
}
// Set concurrent update threshold
if ctx.IsSet(ConcurrentUpdateThresholdFlag.Name) {
cfg.ConcurrentUpdateThreshold = ctx.Int(ConcurrentUpdateThresholdFlag.Name)
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down
4 changes: 4 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type CacheConfig struct {
TriesInMemory int // The number of tries is kept in memory before pruning

SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// defaultCacheConfig are the default caching values if none are specified by the
Expand Down Expand Up @@ -1814,6 +1817,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
}
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
statedb.ConcurrentUpdateThreshold = bc.cacheConfig.ConcurrentUpdateThreshold
if err != nil {
return it.index, err
}
Expand Down
61 changes: 49 additions & 12 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *stateObject) finalise(prefetch bool) {

// updateTrie writes cached storage modifications into the object's storage trie.
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
func (s *stateObject) updateTrie(db Database, concurrent bool) Trie {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think change this variable to skipTrieUpdate is more reasonable

// Make sure all dirty slots are finalized into the pending storage area
s.finalise(false) // Don't prefetch anymore, pull directly if need be
if len(s.pendingStorage) == 0 {
Expand All @@ -332,26 +332,33 @@ func (s *stateObject) updateTrie(db Database) Trie {
// The snapshot storage map for the object
var storage map[common.Hash][]byte
// Insert all the pending updates into the trie
tr := s.getTrie(db)
var tr Trie = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need to initialize to nil here, the default value is nil already

if !concurrent {
tr = s.getTrie(db)
}
hasher := s.db.hasher

usedStorage := make([][]byte, 0, len(s.pendingStorage))
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
continue
}
s.originStorage[key] = value

if !concurrent {
s.originStorage[key] = value
}
var v []byte
if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1
if !concurrent {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1
}
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
if !concurrent {
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
}
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
Expand All @@ -369,16 +376,46 @@ func (s *stateObject) updateTrie(db Database) Trie {
if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
if !concurrent {
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
}
return tr
}

func (s *stateObject) updateTrieConcurrent(db Database) {
tr := s.getTrie(db)
for key, value := range s.pendingStorage {
// Skip noop changes, persist actual changes
if value == s.originStorage[key] {
continue
}
s.originStorage[key] = value

var v []byte

if (value == common.Hash{}) {
s.setError(tr.TryDelete(key[:]))
s.db.StorageDeleted += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is shared resource, not safe to update concurrently. Moreover, this is updated in updateTrie already

} else {
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment

}
}
if len(s.pendingStorage) > 0 {
s.pendingStorage = make(Storage)
}
return tr
if tr != nil {
s.data.Root = s.trie.Hash()
}
}

// UpdateRoot sets the trie root to the current root hash of
func (s *stateObject) updateRoot(db Database) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if s.updateTrie(db, false) == nil {
return
}
// Track the amount of time wasted on hashing the storage trie
Expand All @@ -392,7 +429,7 @@ func (s *stateObject) updateRoot(db Database) {
// This updates the trie root.
func (s *stateObject) CommitTrie(db Database) (int, error) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
if s.updateTrie(db, false) == nil {
return 0, nil
}
if s.dbErr != nil {
Expand Down
53 changes: 51 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"sort"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -122,6 +124,12 @@ type StateDB struct {
StorageUpdated int
AccountDeleted int
StorageDeleted int

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int

// Lock for concurrent access
lock sync.RWMutex
}

// New creates a new state from a given trie.
Expand Down Expand Up @@ -358,7 +366,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie {
return nil
}
cpy := stateObject.deepCopy(s)
cpy.updateTrie(s.db)
cpy.updateTrie(s.db, false)
return cpy.getTrie(s.db)
}

Expand Down Expand Up @@ -855,11 +863,52 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// to pull useful data from disk.

// ---------------------------------------------- DEBUGGING -------------------------------------------------------------------------------------------
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clean this up


// Get the stateObjects needed to be updated
updateObjs := []*stateObject{}
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
obj.updateRoot(s.db)
updateObjs = append(updateObjs, obj)
obj.updateTrie(s.db, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use ConcurrentUpdateThreshold config to determine if we need to update trie concurrently or not

}
}
min := func(a, b int) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new way of calculating number of objects per routine, this function is only called once, so I think removing this function and using if for that one is better

if a < b {
return a
}
return b
}
nGoroutines := min(runtime.NumCPU(), len(updateObjs))
if nGoroutines != 0 {
nObjectsPerRoutine := len(updateObjs) / nGoroutines
nObjectsRemaining := len(updateObjs) % nGoroutines
wg := sync.WaitGroup{}
wg.Add(nGoroutines)
i := 0
for {
nObjects := nObjectsPerRoutine
if nObjectsRemaining > 0 {
nObjects++
nObjectsRemaining--
}
go func(objs []*stateObject) {
defer wg.Done()
for _, obj := range objs {
obj.updateTrieConcurrent(s.db)
}
}(updateObjs[i : i+nObjects])
i += nObjects
if i == len(updateObjs) {
break
}
}
wg.Wait()
}

// ---------------------------------------------- DEBUGGING -------------------------------------------------------------------------------------------

// Now we're about to start to write changes to the trie. The trie is so far
// _untouched_. We can check with the prefetcher, if it can give us a trie
// which has the same root, but also has some content loaded into it.
Expand Down
39 changes: 39 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync"
"testing"
"testing/quick"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -915,3 +916,41 @@ func TestStateDBAccessList(t *testing.T) {
t.Fatalf("expected empty, got %d", got)
}
}

func TestIntermediateUpdateConcurrently(t *testing.T) {
rng := rand.New(rand.NewSource(time.Now().Unix()))
// Create an empty state
db1 := rawdb.NewMemoryDatabase()
db2 := rawdb.NewMemoryDatabase()
state1, _ := New(common.Hash{}, NewDatabase(db1), nil)
state2, _ := New(common.Hash{}, NewDatabase(db2), nil)

// Update it with random data
for i := int64(0); i < 1000; i++ {
addr := common.BigToAddress(big.NewInt(i))
balance := big.NewInt(int64(rng.Int63()))
nonce := rng.Uint64()
key := common.BigToHash(big.NewInt(int64(rng.Int63())))
value := common.BigToHash(big.NewInt(int64(rng.Int63())))
code := []byte{byte(rng.Uint64()), byte(rng.Uint64()), byte(rng.Uint64())}
state1.SetBalance(addr, balance)
state2.SetBalance(addr, balance)
state1.SetNonce(addr, nonce)
state2.SetNonce(addr, nonce)
state1.SetState(addr, key, value)
state2.SetState(addr, key, value)
state1.SetCode(addr, code)
state2.SetCode(addr, code)
}

state1.ConcurrentUpdateThreshold = 0
state2.ConcurrentUpdateThreshold = 1

root1 := state1.IntermediateRoot(false) // sequential
root2 := state2.IntermediateRoot(false) // concurrent

if root1 != root2 {
t.Fatalf("intermediate roots mismatch: %v != %v", root1.Hex(), root2.Hex())
}

}
21 changes: 11 additions & 10 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnablePreimageRecording: config.EnablePreimageRecording,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
TrieCleanLimit: config.TrieCleanCache,
TrieCleanJournal: stack.ResolvePath(config.TrieCleanCacheJournal),
TrieCleanRejournal: config.TrieCleanCacheRejournal,
TrieCleanNoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
TrieDirtyDisabled: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
TriesInMemory: config.TriesInMemory,
ConcurrentUpdateThreshold: config.ConcurrentUpdateThreshold,
}
)
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit)
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type Config struct {

// Send additional chain event
EnableAdditionalChainEvent bool

// Minimum stateObjects (updating accounts) to apply concurrent updates, 0 to disable
ConcurrentUpdateThreshold int
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
Loading