Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triedb/pathdb: introduce lookup structure to optimize node query #30557

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var (

snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil)
triedbCommitTimer = metrics.NewRegisteredResettingTimer("chain/triedb/commits", nil)
prefetchWaitTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/wait", nil)

blockInsertTimer = metrics.NewRegisteredResettingTimer("chain/inserts", nil)
blockValidationTimer = metrics.NewRegisteredResettingTimer("chain/validation", nil)
Expand Down Expand Up @@ -1949,12 +1950,13 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
if statedb.StorageLoaded != 0 {
storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded))
}
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
accountUpdateTimer.Update(statedb.AccountUpdates - statedb.PrefetcherWait) // Account updates are complete(in validation)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
triehash := statedb.AccountHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing
prefetchWaitTimer.Update(statedb.PrefetcherWait) // The time spent on waiting prefetcher to finish preload tasks
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation

Expand Down
17 changes: 10 additions & 7 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ type StateDB struct {
witness *stateless.Witness

// Measurements gathered during execution for debugging purposes
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration
AccountReads time.Duration
AccountHashes time.Duration
AccountUpdates time.Duration
AccountCommits time.Duration
StorageReads time.Duration
StorageUpdates time.Duration
StorageCommits time.Duration

PrefetcherWait time.Duration
SnapshotCommits time.Duration
TrieDBCommits time.Duration

Expand Down Expand Up @@ -867,6 +869,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
} else {
s.trie = trie
}
s.PrefetcherWait = time.Since(start)
}
// Perform updates before deletions. This prevents resolution of unnecessary trie nodes
// in circumstances similar to the following:
Expand Down
19 changes: 19 additions & 0 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"errors"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -55,13 +56,15 @@ type triePrefetcher struct {
accountDupWriteMeter *metrics.Meter
accountDupCrossMeter *metrics.Meter
accountWasteMeter *metrics.Meter
accountLoadTimer *metrics.ResettingTimer

storageLoadReadMeter *metrics.Meter
storageLoadWriteMeter *metrics.Meter
storageDupReadMeter *metrics.Meter
storageDupWriteMeter *metrics.Meter
storageDupCrossMeter *metrics.Meter
storageWasteMeter *metrics.Meter
storageLoadTimer *metrics.ResettingTimer
}

func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads bool) *triePrefetcher {
Expand All @@ -78,13 +81,15 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string, noreads

accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/account/load/time", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),

storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageLoadTimer: metrics.GetOrRegisterResettingTimer(prefix+"/storage/load/time", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
Expand Down Expand Up @@ -121,6 +126,10 @@ func (p *triePrefetcher) report() {
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr)))

total := len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr)
if total > 0 {
p.accountLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand All @@ -134,6 +143,10 @@ func (p *triePrefetcher) report() {
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot)))

total := len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot)
if total > 0 {
p.storageLoadTimer.Update(fetcher.readTime / time.Duration(total))
}
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
Expand Down Expand Up @@ -241,6 +254,7 @@ type subfetcher struct {
seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations
seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations
seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations
readTime time.Duration // Total time spent on state resolving

dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
Expand Down Expand Up @@ -388,6 +402,7 @@ func (sf *subfetcher) loop() {
sf.tasks = nil
sf.lock.Unlock()

start := time.Now()
for _, task := range tasks {
if task.addr != nil {
key := *task.addr
Expand Down Expand Up @@ -451,6 +466,10 @@ func (sf *subfetcher) loop() {
}
}
}
// Count the time being spent on state resolving. While it's not very
// accurate due to some additional operations (e.g., filter out duplicated
// task), but it's already good enough for monitoring.
sf.readTime += time.Since(start)

case <-sf.stop:
// Termination is requested, abort if no more tasks are pending. If
Expand Down
3 changes: 3 additions & 0 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type layer interface {
// This is meant to be used during shutdown to persist the layer without
// flattening everything down (bad for reorgs).
journal(w io.Writer) error

// isStale returns whether this layer has become stale or if it's still live.
isStale() bool
}

// Config contains the settings for database.
Expand Down
29 changes: 26 additions & 3 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type diffLayer struct {
states *StateSetWithOrigin // Associated state changes along with origin value

parent layer // Parent layer modified by this one, never nil, **can be changed**
lock sync.RWMutex // Lock used to protect parent
stale bool // Signals that the layer became stale (referenced disk layer became stale)
lock sync.RWMutex // Lock used to protect parent and stale fields
}

// newDiffLayer creates a new diff layer on top of an existing layer.
Expand Down Expand Up @@ -77,6 +78,25 @@ func (dl *diffLayer) parentLayer() layer {
return dl.parent
}

// isStale returns whether this layer has become stale or if it's still live.
func (dl *diffLayer) isStale() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.stale
}

// markStale sets the stale flag as true.
func (dl *diffLayer) markStale() {
dl.lock.Lock()
defer dl.lock.Unlock()

if dl.stale {
panic("triedb diff layer is stale")
}
dl.stale = true
}

// node implements the layer interface, retrieving the trie node blob with the
// provided node information. No error will be returned if the node is not found.
func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) {
Expand All @@ -85,6 +105,9 @@ func (dl *diffLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// If the trie node is known locally, return it
n, ok := dl.nodes.node(owner, path)
if ok {
Expand Down Expand Up @@ -156,7 +179,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes *no
}

// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
Expand All @@ -183,7 +206,7 @@ func (dl *diffLayer) size() uint64 {

// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (dl *diskLayer) parentLayer() layer {
return nil
}

// isStale return whether this layer has become stale (was flattened across) or if
// isStale returns whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diskLayer) isStale() bool {
dl.lock.RLock()
Expand Down
Loading