Skip to content

Commit

Permalink
Merge pull request #240 from kcalvinalvin/2025-01-16-always-download-…
Browse files Browse the repository at this point in the history
…headers-for-blocks

netsync, main: always download headers for blocks
  • Loading branch information
kcalvinalvin authored Jan 17, 2025
2 parents 0616196 + dc23fd7 commit fbbb32b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 70 deletions.
193 changes: 123 additions & 70 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,11 @@ type SyncManager struct {
headersBuildMode bool

// The following fields are used for headers-first mode.
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader
requestedUtreexoHeaders map[chainhash.Hash]struct{}
headersFirstMode bool
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand Down Expand Up @@ -366,7 +365,6 @@ func (sm *SyncManager) startSync() {
// during headersFirstMode.
if !sm.headersFirstMode {
sm.requestedBlocks = make(map[chainhash.Hash]struct{})
sm.requestedUtreexoHeaders = make(map[chainhash.Hash]struct{})
}

log.Infof("Syncing to block height %d from peer %v",
Expand Down Expand Up @@ -396,11 +394,11 @@ func (sm *SyncManager) startSync() {
// should have all the previous headers as well.
_, have := sm.utreexoHeaders[*node.hash]
if !have {
sm.fetchUtreexoHeaders()
sm.fetchUtreexoHeaders(nil)
return
}
}
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
return
}

Expand All @@ -414,11 +412,11 @@ func (sm *SyncManager) startSync() {
// should have all the previous headers as well.
_, have := sm.utreexoHeaders[*node.hash]
if !have {
sm.fetchUtreexoHeaders()
sm.fetchUtreexoHeaders(nil)
return
}
}
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
return
}

Expand Down Expand Up @@ -693,12 +691,6 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
}

// Also remove requested utreexo headers from the global map so
// that they will be fetched from elsewhere next time we get an inv.
for blockHash := range state.requestedUtreexoHeaders {
delete(sm.requestedUtreexoHeaders, blockHash)
}
}
}

Expand Down Expand Up @@ -1030,7 +1022,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
if bmsg.block.Height() < lastHeight {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
}
return
}
Expand All @@ -1050,7 +1042,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
if !isCheckpointBlock {
if sm.startHeader != nil &&
len(state.requestedBlocks) < minInFlightBlocks {
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
}
return
}
Expand Down Expand Up @@ -1090,16 +1082,23 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {

// fetchUtreexoHeaders creates and sends a request to the syncPeer for the next
// list of utreexo headers to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchUtreexoHeaders() {
// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer.
func (sm *SyncManager) fetchUtreexoHeaders(peer *peerpkg.Peer) {
// Nothing to do if there is no start header.
if sm.startHeader == nil {
log.Warnf("fetchUtreexoHeaders called with no start header")
return
}

state, exists := sm.peerStates[sm.syncPeer]
// Default to the syncPeer unless we're given a peer by the caller.
reqPeer := sm.syncPeer
if reqPeer == nil {
reqPeer = peer
}

state, exists := sm.peerStates[reqPeer]
if !exists {
log.Warnf("Don't have peer state for sync peer %s", sm.syncPeer.String())
log.Warnf("Don't have peer state for request peer %s", reqPeer.String())
return
}

Expand All @@ -1115,7 +1114,7 @@ func (sm *SyncManager) fetchUtreexoHeaders() {
if !requested && !have {
state.requestedUtreexoHeaders[*node.hash] = struct{}{}
ghmsg := wire.NewMsgGetUtreexoHeader(*node.hash)
sm.syncPeer.QueueMessage(ghmsg, nil)
reqPeer.QueueMessage(ghmsg, nil)
}

if len(state.requestedUtreexoHeaders) > minInFlightBlocks {
Expand All @@ -1126,13 +1125,20 @@ func (sm *SyncManager) fetchUtreexoHeaders() {

// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
// list of blocks to be downloaded based on the current list of headers.
func (sm *SyncManager) fetchHeaderBlocks() {
// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer.
func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) {
// Nothing to do if there is no start header.
if sm.startHeader == nil {
log.Warnf("fetchHeaderBlocks called with no start header")
return
}

// Default to the syncPeer unless we're given a peer by the caller.
reqPeer := sm.syncPeer
if reqPeer == nil {
reqPeer = peer
}

// Build up a getdata request for the list of blocks the headers
// describe. The size hint will be limited to wire.MaxInvPerMsg by
// the function, so no need to double check it here.
Expand All @@ -1153,24 +1159,24 @@ func (sm *SyncManager) fetchHeaderBlocks() {
"fetch: %v", err)
}
if !haveInv {
syncPeerState := sm.peerStates[sm.syncPeer]
syncPeerState := sm.peerStates[reqPeer]
syncPeerState.requestedBlocks[*node.hash] = struct{}{}

// If we're fetching from a witness enabled peer
// post-fork, then ensure that we receive all the
// witness data in the blocks.
if sm.syncPeer.IsWitnessEnabled() {
if reqPeer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessBlock

// If we're syncing from a utreexo enabled peer, also
// ask for the proofs.
if sm.syncPeer.IsUtreexoEnabled() {
if reqPeer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeWitnessUtreexoBlock
}
} else {
// If we're syncing from a utreexo enabled peer, also
// ask for the proofs.
if sm.syncPeer.IsUtreexoEnabled() {
if reqPeer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeUtreexoBlock
}
}
Expand All @@ -1184,7 +1190,7 @@ func (sm *SyncManager) fetchHeaderBlocks() {
}
}
if len(gdmsg.InvList) > 0 {
sm.syncPeer.QueueMessage(gdmsg, nil)
reqPeer.QueueMessage(gdmsg, nil)
}
}

Expand All @@ -1198,21 +1204,79 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
return
}

// The remote peer is misbehaving if we didn't request headers.
msg := hmsg.headers
numHeaders := len(msg.Headers)
if !sm.headersFirstMode && !sm.headersBuildMode {
log.Warnf("Got %d unrequested headers from %s -- "+
"disconnecting", numHeaders, peer.Addr())
peer.Disconnect()
return
}

// Nothing to do for an empty headers message.
if numHeaders == 0 {
return
}

utreexoViewActive := sm.chain.IsUtreexoViewActive()

// If we're not in headers first, it means that these headers are for new
// block announcements.
if !sm.headersFirstMode && !sm.headersBuildMode {
best := sm.chain.BestSnapshot()
sm.headerList.Init()
sm.resetHeaderState(&best.Hash, best.Height)

for _, blockHeader := range msg.Headers {
err := sm.chain.ProcessBlockHeader(blockHeader)
if err != nil {
log.Warnf("Received block header from peer %v "+
"failed header verification -- disconnecting",
peer.Addr())
peer.Disconnect()
return
}

prevNodeEl := sm.headerList.Back()
if prevNodeEl == nil {
log.Warnf("Header list does not contain a previous" +
"element as expected -- disconnecting peer")
peer.Disconnect()
return
}

prevNode := prevNodeEl.Value.(*headerNode)
blockHash := blockHeader.BlockHash()
node := headerNode{hash: &blockHash}
if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
node.height = prevNode.height + 1
e := sm.headerList.PushBack(&node)
if sm.startHeader == nil {
sm.startHeader = e
}
} else {
log.Warnf("Received block header that does not "+
"properly connect to the chain from peer %s "+
"-- disconnecting", peer.Addr())
peer.Disconnect()
return
}
}

// Since the first entry of the list is always the final block
// that is already in the database and is only used to ensure
// the next header links properly, it must be removed before
// fetching the headers or the utreexo headers.
sm.headerList.Remove(sm.headerList.Front())
sm.progressLogger.SetLastLogTime(time.Now())

if utreexoViewActive {
log.Infof("Received %v block headers: Fetching utreexo headers",
sm.headerList.Len())
sm.fetchUtreexoHeaders(hmsg.peer)
} else {
log.Infof("Received %v block headers: Fetching blocks",
sm.headerList.Len())
sm.fetchHeaderBlocks(hmsg.peer)
}

return
}

if sm.headersBuildMode {
var finalHeader *wire.BlockHeader
var finalHeight int32
Expand Down Expand Up @@ -1294,8 +1358,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
return
}

utreexoViewActive := sm.chain.IsUtreexoViewActive()

// This means that we've ran out of checkpoints and need to verify the headers that
// we've received.
if sm.nextCheckpoint == nil {
Expand Down Expand Up @@ -1359,11 +1421,11 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
if utreexoViewActive {
log.Infof("Received %v block headers: Fetching utreexo headers",
sm.headerList.Len())
sm.fetchUtreexoHeaders()
sm.fetchUtreexoHeaders(nil)
} else {
log.Infof("Received %v block headers: Fetching blocks",
sm.headerList.Len())
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
}
}

Expand Down Expand Up @@ -1438,7 +1500,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Infof("Received %v block headers: Fetching utreexo headers",
sm.headerList.Len())
sm.progressLogger.SetLastLogTime(time.Now())
sm.fetchUtreexoHeaders()
sm.fetchUtreexoHeaders(nil)
return
}
// Since the first entry of the list is always the final block
Expand All @@ -1449,7 +1511,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Infof("Received %v block headers: Fetching blocks",
sm.headerList.Len())
sm.progressLogger.SetLastLogTime(time.Now())
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
return
}

Expand Down Expand Up @@ -1510,20 +1572,20 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) {
log.Infof("Received utreexo headers to block "+
"%d/hash %s. Fetching blocks",
node.height, node.hash)
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
return
} else if node.height == peer.LastBlock() {
log.Infof("Received utreexo headers to block "+
"%d/hash %s. Fetching blocks",
node.height, node.hash)
sm.fetchHeaderBlocks()
sm.fetchHeaderBlocks(nil)
return
}
}
}

if len(peerState.requestedUtreexoHeaders) < minInFlightBlocks {
sm.fetchUtreexoHeaders()
sm.fetchUtreexoHeaders(nil)
}

return
Expand All @@ -1532,19 +1594,12 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) {
// We're not in headers-first mode. When we receive a utreexo header,
// immediately ask for the block.
sm.utreexoHeaders[msg.BlockHash] = hmsg.header
delete(sm.requestedUtreexoHeaders, msg.BlockHash)
delete(peerState.requestedUtreexoHeaders, msg.BlockHash)
log.Debugf("accepted utreexo header for block %v. have %v headers",
msg.BlockHash, len(sm.utreexoHeaders))

if !sm.headersFirstMode {
sm.requestedBlocks[msg.BlockHash] = struct{}{}
}
peerState.requestedBlocks[msg.BlockHash] = struct{}{}

gdmsg := wire.NewMsgGetData()
iv := wire.NewInvVect(wire.InvTypeWitnessUtreexoBlock, &msg.BlockHash)
gdmsg.AddInvVect(iv)
peer.QueueMessage(gdmsg, nil)
sm.fetchHeaderBlocks(peer)
}

// handleNotFoundMsg handles notfound messages from all peers.
Expand Down Expand Up @@ -1876,7 +1931,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
amUtreexoNode := sm.chain.IsUtreexoViewActive()
if amUtreexoNode {
sm.requestedUtreexoHeaders[iv.Hash] = struct{}{}
ghmsg := wire.NewMsgGetUtreexoHeader(iv.Hash)
peer.QueueMessage(ghmsg, nil)
continue
Expand Down Expand Up @@ -2362,21 +2416,20 @@ func (sm *SyncManager) Pause() chan<- struct{} {
// block, tx, and inv updates.
func New(config *Config) (*SyncManager, error) {
sm := SyncManager{
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedUtreexoHeaders: make(map[chainhash.Hash]struct{}),
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
peerNotifier: config.PeerNotifier,
chain: config.Chain,
txMemPool: config.TxMemPool,
chainParams: config.ChainParams,
rejectedTxns: make(map[chainhash.Hash]struct{}),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
}

best := sm.chain.BestSnapshot()
Expand Down
Loading

0 comments on commit fbbb32b

Please sign in to comment.