Skip to content

Commit

Permalink
eth/fetcher: don't skip block/header when parent is not found
Browse files Browse the repository at this point in the history
Currently, we simply skip importing block/header when parent block/header is not
found. However, since multiple blocks can be imported in parallel, the not
found parent might be due to the fact that the parent import does not finish
yet. This leads to a suitation that the correct block in canonical chain is
skipped and the node gets stuck until the peer timeout. We observe this behavior
when there are reorgs and block import is time consuming.

This commit fixes it by creating a new queue for those missing parent blocks and
re-import them after the parent is imported.
  • Loading branch information
minh-bq committed Nov 21, 2024
1 parent 584db0f commit 070729b
Showing 1 changed file with 105 additions and 37 deletions.
142 changes: 105 additions & 37 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package fetcher
import (
"errors"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -32,10 +33,11 @@ import (
)

const (
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping
)

const (
Expand Down Expand Up @@ -183,6 +185,10 @@ type BlockFetcher struct {
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use
missingParent map[common.Hash]common.Hash // Mapping from parent hash to block hash of missing parent blocks
importMissingParent chan *blockOrHeaderInject

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
Expand All @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
) *BlockFetcher {

return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
missingParent: make(map[common.Hash]common.Hash),
importMissingParent: make(chan *blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}

Expand Down Expand Up @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
func (f *BlockFetcher) loop() {
// Iterate the block fetching until a quit is requested
var (
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval)
)
<-fetchTimer.C // clear out the channel
<-completeTimer.C
defer fetchTimer.Stop()
defer completeTimer.Stop()
defer cleanMissingParentTicker.Stop()

for {
// Clean up any expired block fetches
Expand Down Expand Up @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() {
}
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()
continue
}
if f.light {
Expand Down Expand Up @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() {
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
f.forgetHash(hash)
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()

case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
Expand Down Expand Up @@ -684,6 +698,24 @@ func (f *BlockFetcher) loop() {
f.enqueue(announce.origin, nil, block, sidecars)
}
}

case op := <-f.importMissingParent:
if f.light {
f.importHeaders(op.origin, op.header)
} else {
f.importBlocks(op.origin, op.block, op.sidecars)
}
case <-cleanMissingParentTicker.C:
height := f.chainHeight()
f.missingParentLock.Lock()
for _, hash := range f.missingParent {
if op := f.queued[hash]; op != nil {
if op.number()+maxUncleDist < height {
f.forgetBlock(hash)
}
}
}
f.missingParentLock.Unlock()
}
}
}
Expand Down Expand Up @@ -780,13 +812,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
// If the parent's unknown, queue for later processing when parent block is imported
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
f.missingParentLock.Lock()
f.missingParent[header.ParentHash] = hash
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
Expand All @@ -798,6 +833,18 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
f.missingParentLock.Lock()
nextBlockHash, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
op := f.queued[nextBlockHash]
if op != nil {
f.importMissingParent <- op
} else {
// Something must be very wrong here
log.Warn("Missing parent block is not in f.queued", "hash", nextBlockHash, "parent", hash)
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
Expand All @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()

// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
// If the parent's unknown, queue for later processing when parent block is imported
parentHash := block.ParentHash()
parent := f.getBlock(parentHash)
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash)
f.missingParentLock.Lock()
f.missingParent[parentHash] = hash
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Quickly validate the header and propagate the block if it passes
err := f.verifyHeader(block.Header())
if err == nil {
Expand Down Expand Up @@ -853,6 +903,18 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, nil, false)

f.missingParentLock.Lock()
nextBlockHash, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
op := f.queued[nextBlockHash]
if op != nil {
f.importMissingParent <- op
} else {
// Something must be very wrong here
log.Warn("Missing parent block is not in f.queued", "hash", nextBlockHash, "parent", hash)
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(nil, block)
Expand Down Expand Up @@ -906,12 +968,18 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {

// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
// The caller must hold the missingParentLock.
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
if f.queues[insert.origin] == 0 {
delete(f.queues, insert.origin)
}
delete(f.queued, hash)
if f.light {
delete(f.missingParent, insert.header.ParentHash)
} else {
delete(f.missingParent, insert.block.ParentHash())
}
}
}

0 comments on commit 070729b

Please sign in to comment.