From 070729b5ef866351ce89e2a7add098494bef9998 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Wed, 20 Nov 2024 10:45:34 +0700 Subject: [PATCH] eth/fetcher: don't skip block/header when parent is not found Currently, we simply skip importing block/header when parent block/header is not found. However, since multiple blocks can be imported in parallel, the not found parent might be due to the fact that the parent import does not finish yet. This leads to a suitation that the correct block in canonical chain is skipped and the node gets stuck until the peer timeout. We observe this behavior when there are reorgs and block import is time consuming. This commit fixes it by creating a new queue for those missing parent blocks and re-import them after the parent is imported. --- eth/fetcher/block_fetcher.go | 142 ++++++++++++++++++++++++++--------- 1 file changed, 105 insertions(+), 37 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index fd547ebce..d0d8094dd 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -20,6 +20,7 @@ package fetcher import ( "errors" "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -32,10 +33,11 @@ import ( ) const ( - lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested - arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested - gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches - fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction + cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping ) const ( @@ -183,6 +185,10 @@ type BlockFetcher struct { queues map[string]int // Per peer block counts to prevent memory exhaustion queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) + missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use + missingParent map[common.Hash]common.Hash // Mapping from parent hash to block hash of missing parent blocks + importMissingParent chan *blockOrHeaderInject + // Callbacks getHeader HeaderRetrievalFn // Retrieves a header from the local chain getBlock blockRetrievalFn // Retrieves a block from the local chain @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr ) *BlockFetcher { return &BlockFetcher{ - light: light, - notify: make(chan *blockAnnounce), - inject: make(chan *blockOrHeaderInject), - headerFilter: make(chan chan *headerFilterTask), - bodyFilter: make(chan chan *bodyFilterTask), - done: make(chan common.Hash), - quit: make(chan struct{}), - announces: make(map[string]int), - announced: make(map[common.Hash][]*blockAnnounce), - fetching: make(map[common.Hash]*blockAnnounce), - fetched: make(map[common.Hash][]*blockAnnounce), - completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New(nil), - queues: make(map[string]int), - queued: make(map[common.Hash]*blockOrHeaderInject), - getHeader: getHeader, - getBlock: getBlock, - verifyHeader: verifyHeader, - verifyBlobHeader: verifyBlobHeader, - broadcastBlock: broadcastBlock, - chainHeight: chainHeight, - insertHeaders: insertHeaders, - insertChain: insertChain, - dropPeer: dropPeer, + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New(nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + missingParent: make(map[common.Hash]common.Hash), + importMissingParent: make(chan *blockOrHeaderInject), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + verifyBlobHeader: verifyBlobHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac func (f *BlockFetcher) loop() { // Iterate the block fetching until a quit is requested var ( - fetchTimer = time.NewTimer(0) - completeTimer = time.NewTimer(0) + fetchTimer = time.NewTimer(0) + completeTimer = time.NewTimer(0) + cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval) ) <-fetchTimer.C // clear out the channel <-completeTimer.C defer fetchTimer.Stop() defer completeTimer.Stop() + defer cleanMissingParentTicker.Stop() for { // Clean up any expired block fetches @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() { } // Otherwise if fresh and still unknown, try and import if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() continue } if f.light { @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() { case hash := <-f.done: // A pending import finished, remove all traces of the notification f.forgetHash(hash) + f.missingParentLock.Lock() f.forgetBlock(hash) + f.missingParentLock.Unlock() case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval @@ -684,6 +698,24 @@ func (f *BlockFetcher) loop() { f.enqueue(announce.origin, nil, block, sidecars) } } + + case op := <-f.importMissingParent: + if f.light { + f.importHeaders(op.origin, op.header) + } else { + f.importBlocks(op.origin, op.block, op.sidecars) + } + case <-cleanMissingParentTicker.C: + height := f.chainHeight() + f.missingParentLock.Lock() + for _, hash := range f.missingParent { + if op := f.queued[hash]; op != nil { + if op.number()+maxUncleDist < height { + f.forgetBlock(hash) + } + } + } + f.missingParentLock.Unlock() } } } @@ -780,13 +812,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) go func() { - defer func() { f.done <- hash }() - // If the parent's unknown, abort insertion + // If the parent's unknown, queue for later processing when parent block is imported parent := f.getHeader(header.ParentHash) if parent == nil { log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + f.missingParentLock.Lock() + f.missingParent[header.ParentHash] = hash + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Validate the header and if something went wrong, drop the peer if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) @@ -798,6 +833,18 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) return } + f.missingParentLock.Lock() + nextBlockHash, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + op := f.queued[nextBlockHash] + if op != nil { + f.importMissingParent <- op + } else { + // Something must be very wrong here + log.Warn("Missing parent block is not in f.queued", "hash", nextBlockHash, "parent", hash) + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(header, nil) @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] // Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) go func() { - defer func() { f.done <- hash }() - - // If the parent's unknown, abort insertion - parent := f.getBlock(block.ParentHash()) + // If the parent's unknown, queue for later processing when parent block is imported + parentHash := block.ParentHash() + parent := f.getBlock(parentHash) if parent == nil { - log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) + log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash) + f.missingParentLock.Lock() + f.missingParent[parentHash] = hash + f.missingParentLock.Unlock() return } + defer func() { f.done <- hash }() // Quickly validate the header and propagate the block if it passes err := f.verifyHeader(block.Header()) if err == nil { @@ -853,6 +903,18 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars [] blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, nil, false) + f.missingParentLock.Lock() + nextBlockHash, ok := f.missingParent[hash] + f.missingParentLock.Unlock() + if ok { + op := f.queued[nextBlockHash] + if op != nil { + f.importMissingParent <- op + } else { + // Something must be very wrong here + log.Warn("Missing parent block is not in f.queued", "hash", nextBlockHash, "parent", hash) + } + } // Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(nil, block) @@ -906,6 +968,7 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. +// The caller must hold the missingParentLock. func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]-- @@ -913,5 +976,10 @@ func (f *BlockFetcher) forgetBlock(hash common.Hash) { delete(f.queues, insert.origin) } delete(f.queued, hash) + if f.light { + delete(f.missingParent, insert.header.ParentHash) + } else { + delete(f.missingParent, insert.block.ParentHash()) + } } }