Skip to content

Commit

Permalink
[AV-1919] addTrieInProgress before segments to the work chan (#860)
Browse files Browse the repository at this point in the history
* addTrieInProgress before segments to the work chan

* fix

* more explicit fix

* rm unneeded file

* add DrainAcceptorQueue to test

* add CopyBytes for db invariant

* nit

* use channel as semaphore

* Add comments

* Remove bad comment
  • Loading branch information
darioush authored Jul 19, 2022
1 parent 50e7dfd commit 812a9ba
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 47 deletions.
1 change: 1 addition & 0 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestStateSyncToggleEnabledToDisabled(t *testing.T) {
if err := syncDisabledVM.blockChain.Snapshots().Verify(lastRoot); err != nil {
t.Fatal(err)
}
syncDisabledVM.blockChain.DrainAcceptorQueue()

// Create a new VM from the same database with state sync enabled.
syncReEnabledVM := &VM{}
Expand Down
83 changes: 39 additions & 44 deletions sync/statesync/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package statesync

import (
"context"
"fmt"
"sync"

"github.com/ava-labs/coreth/core/state/snapshot"
Expand Down Expand Up @@ -53,10 +54,10 @@ type stateSync struct {
triesInProgress map[common.Hash]*trieToSync

// track completion and progress of work
mainTrieDone chan struct{}
trieCompleted chan struct{}
done chan error
stats *trieSyncStats
mainTrieDone chan struct{}
triesInProgressSem chan struct{}
done chan error
stats *trieSyncStats
}

func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) {
Expand All @@ -70,13 +71,16 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) {
stats: newTrieSyncStats(),
triesInProgress: make(map[common.Hash]*trieToSync),

// [triesInProgressSem] is used to keep the number of tries syncing
// less than or equal to [defaultNumThreads].
triesInProgressSem: make(chan struct{}, defaultNumThreads),

// Each [trieToSync] will have a maximum of [numSegments] segments.
// We set the capacity of [segments] such that [defaultNumThreads]
// storage tries can sync concurrently.
segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments),
mainTrieDone: make(chan struct{}),
trieCompleted: make(chan struct{}, 1),
done: make(chan error, 1),
segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments),
mainTrieDone: make(chan struct{}),
done: make(chan error, 1),
}
ss.syncer = syncclient.NewCallbackLeafSyncer(config.Client, ss.segments)
ss.codeSyncer = newCodeSyncer(CodeSyncerConfig{
Expand All @@ -98,14 +102,19 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) {
return nil, err
}
ss.addTrieInProgress(ss.root, ss.mainTrie)
ss.mainTrie.startSyncing() // start syncing after tracking the trie as in progress
return ss, nil
}

// onStorageTrieFinished is called after a storage trie finishes syncing.
func (t *stateSync) onStorageTrieFinished(root common.Hash) error {
<-t.triesInProgressSem // allow another trie to start (release the semaphore)
// mark the storage trie as done in trieQueue
if err := t.trieQueue.StorageTrieDone(root); err != nil {
return err
}
// track the completion of this storage trie
t.removeTrieInProgress(root)
return t.trieQueue.StorageTrieDone(root)
return t.removeTrieInProgress(root)
}

// onMainTrieFinishes is called after the main trie finishes syncing.
Expand All @@ -119,10 +128,9 @@ func (t *stateSync) onMainTrieFinished() error {
}
t.stats.setTriesRemaining(numStorageTries)

// mark the main trie done and check if the sync operation is complete
// mark the main trie done
close(t.mainTrieDone)
t.removeTrieInProgress(t.root)
return nil
return t.removeTrieInProgress(t.root)
}

// onSyncComplete is called after the account trie and
Expand All @@ -147,22 +155,7 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error {
return ctx.Err()
}

// Each storage trie may split up to [numStorageTrieSegments],
// so we keep the number of storage tries in progress limited
// to ensure the segments channel has capacity when segments
// are created.
maxStorageTriesInProgess := cap(t.segments) / numStorageTrieSegments
for {

for t.countTriesInProgress() >= maxStorageTriesInProgess {
// wait for a trie to complete to avoid spinning
select {
case <-t.trieCompleted:
case <-ctx.Done():
return ctx.Err()
}
}

// check ctx here to exit the loop early
if err := ctx.Err(); err != nil {
return err
Expand All @@ -172,15 +165,26 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error {
if err != nil {
return err
}
// it is possible there are no storage tries.
// If there are no storage tries, then root will be the empty hash on the first pass.
if root != (common.Hash{}) {
// acquire semaphore (to keep number of tries in progress limited)
select {
case t.triesInProgressSem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

// Arbitrarily use the first account for making requests to the server.
// Note: getNextTrie guarantees that if a non-nil storage root is returned, then the
// slice of account hashes is non-empty.
syncAccount := accounts[0]
// create a trieToSync for the storage trie and mark it as in progress.
syncAccount := accounts[0] // arbitrarily use the first account for making requests to the server
storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts))
if err != nil {
return err
}
t.addTrieInProgress(root, storageTrie)
storageTrie.startSyncing() // start syncing after tracking the trie as in progress
}
// if there are no more storage tries, close
// the task queue and exit the producer.
Expand Down Expand Up @@ -232,25 +236,16 @@ func (t *stateSync) addTrieInProgress(root common.Hash, trie *trieToSync) {
// tries in progress and notifies the storage root producer
// so it can continue in case it was paused due to the
// maximum number of tries in progress being previously reached.
func (t *stateSync) removeTrieInProgress(root common.Hash) {
func (t *stateSync) removeTrieInProgress(root common.Hash) error {
t.lock.Lock()
defer t.lock.Unlock()

t.stats.trieDone(root)
delete(t.triesInProgress, root)

select {
case t.trieCompleted <- struct{}{}:
default:
if _, ok := t.triesInProgress[root]; !ok {
return fmt.Errorf("removeTrieInProgress for unexpected root: %s", root)
}
}

// countTriesInProgress returns the number of tries in progress.
func (t *stateSync) countTriesInProgress() int {
t.lock.RLock()
defer t.lock.RUnlock()

return len(t.triesInProgress)
delete(t.triesInProgress, root)
return nil
}

// onSyncFailure is called if the sync fails, this writes all
Expand Down
9 changes: 9 additions & 0 deletions sync/statesync/trie_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (t *trieQueue) StorageTrieDone(root common.Hash) error {

// getNextTrie returns the next storage trie to sync, along with a slice
// of accounts that point to the returned storage trie.
// Returns true if there are more storage tries to sync and false otherwise.
// Note: if a non-nil root is returned, getNextTrie guarantees that there will be at least
// one account hash in the returned slice.
func (t *trieQueue) getNextTrie() (common.Hash, []common.Hash, bool, error) {
it := rawdb.NewSyncStorageTriesIterator(t.db, t.nextStorageRoot)
defer it.Release()
Expand All @@ -69,16 +72,22 @@ func (t *trieQueue) getNextTrie() (common.Hash, []common.Hash, bool, error) {
more bool
)

// Iterate over the keys to find the next storage trie root and all of the account hashes that contain the same storage root.
for it.Next() {
// Unpack the state root and account hash from the current key
nextRoot, nextAccount := rawdb.UnpackSyncStorageTrieKey(it.Key())
// Set the root for the first pass
if root == (common.Hash{}) {
root = nextRoot
}
// If the next root is different than the originally set root, then we've iterated over all of the account hashes that
// have the same storage trie root. Set more to be true, since there is at least one more storage trie.
if root != nextRoot {
t.nextStorageRoot = nextRoot[:]
more = true
break
}
// If we found another account with the same root, add the accountHash.
accounts = append(accounts, nextAccount)
}

Expand Down
10 changes: 8 additions & 2 deletions sync/statesync/trie_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,25 @@ func (t *trieToSync) loadSegments() error {
// don't go past the end of the segment
break
}
lastKey = it.Key()
lastKey = common.CopyBytes(it.Key())
segment.leafs++
}
if lastKey != nil {
utils.IncrOne(lastKey)
segment.pos = lastKey // syncing will start from this key
}
log.Debug("statesync: loading segment", "segment", segment)
t.sync.segments <- segment // this will queue the segment for syncing
}
return it.Error()
}

// startSyncing adds the trieToSync's segments to the work queue
func (t *trieToSync) startSyncing() {
for _, segment := range t.segments {
t.sync.segments <- segment // this will queue the segment for syncing
}
}

// addSegment appends a newly created segment specified by [start] and
// [end] to [t.segments] and returns it.
// note: addSegment does not take a lock and therefore is called only
Expand Down
2 changes: 1 addition & 1 deletion sync/statesync/trie_sync_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *mainTrieTask) OnLeafs(db ethdb.KeyValueWriter, keys, vals [][]byte) err
}

// persist the account data
writeAccountSnapshot(db, common.BytesToHash(key), acc)
writeAccountSnapshot(db, accountHash, acc)

// check if this account has storage root that we need to fetch
if acc.Root != (common.Hash{}) && acc.Root != types.EmptyRootHash {
Expand Down

0 comments on commit 812a9ba

Please sign in to comment.