From 5208d06298d2aae058ef6a862d396b0c40a1ad3d Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 17 Jan 2025 15:14:05 +0900 Subject: [PATCH 1/5] netsync: allow fetchHeaderBlocks and fetchUtreexoHeaderBlocks to be specified a peer Both of the functions currently default to the syncPeer for fetching blocks and utreexo headers but if we allow our peers to send over headers instead of invs, then we may not have a syncPeer as we're no longer in ibd. To allow these functions to be re-used when our peer sends us block headers, we allow the caller to pass in the desired peer to request the additional data from. --- netsync/manager.go | 60 ++++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index e87962a8..be6f5d98 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -396,11 +396,11 @@ func (sm *SyncManager) startSync() { // should have all the previous headers as well. _, have := sm.utreexoHeaders[*node.hash] if !have { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } } - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -414,11 +414,11 @@ func (sm *SyncManager) startSync() { // should have all the previous headers as well. _, have := sm.utreexoHeaders[*node.hash] if !have { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } } - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -1030,7 +1030,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { if bmsg.block.Height() < lastHeight { if sm.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } return } @@ -1050,7 +1050,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { if !isCheckpointBlock { if sm.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } return } @@ -1090,16 +1090,23 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // fetchUtreexoHeaders creates and sends a request to the syncPeer for the next // list of utreexo headers to be downloaded based on the current list of headers. -func (sm *SyncManager) fetchUtreexoHeaders() { +// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer. +func (sm *SyncManager) fetchUtreexoHeaders(peer *peerpkg.Peer) { // Nothing to do if there is no start header. if sm.startHeader == nil { log.Warnf("fetchUtreexoHeaders called with no start header") return } - state, exists := sm.peerStates[sm.syncPeer] + // Default to the syncPeer unless we're given a peer by the caller. + reqPeer := sm.syncPeer + if reqPeer == nil { + reqPeer = peer + } + + state, exists := sm.peerStates[reqPeer] if !exists { - log.Warnf("Don't have peer state for sync peer %s", sm.syncPeer.String()) + log.Warnf("Don't have peer state for request peer %s", reqPeer.String()) return } @@ -1115,7 +1122,7 @@ func (sm *SyncManager) fetchUtreexoHeaders() { if !requested && !have { state.requestedUtreexoHeaders[*node.hash] = struct{}{} ghmsg := wire.NewMsgGetUtreexoHeader(*node.hash) - sm.syncPeer.QueueMessage(ghmsg, nil) + reqPeer.QueueMessage(ghmsg, nil) } if len(state.requestedUtreexoHeaders) > minInFlightBlocks { @@ -1126,13 +1133,20 @@ func (sm *SyncManager) fetchUtreexoHeaders() { // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (sm *SyncManager) fetchHeaderBlocks() { +// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer. +func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) { // Nothing to do if there is no start header. if sm.startHeader == nil { log.Warnf("fetchHeaderBlocks called with no start header") return } + // Default to the syncPeer unless we're given a peer by the caller. + reqPeer := sm.syncPeer + if reqPeer == nil { + reqPeer = peer + } + // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. @@ -1153,24 +1167,24 @@ func (sm *SyncManager) fetchHeaderBlocks() { "fetch: %v", err) } if !haveInv { - syncPeerState := sm.peerStates[sm.syncPeer] + syncPeerState := sm.peerStates[reqPeer] syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the // witness data in the blocks. - if sm.syncPeer.IsWitnessEnabled() { + if reqPeer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessBlock // If we're syncing from a utreexo enabled peer, also // ask for the proofs. - if sm.syncPeer.IsUtreexoEnabled() { + if reqPeer.IsUtreexoEnabled() { iv.Type = wire.InvTypeWitnessUtreexoBlock } } else { // If we're syncing from a utreexo enabled peer, also // ask for the proofs. - if sm.syncPeer.IsUtreexoEnabled() { + if reqPeer.IsUtreexoEnabled() { iv.Type = wire.InvTypeUtreexoBlock } } @@ -1184,7 +1198,7 @@ func (sm *SyncManager) fetchHeaderBlocks() { } } if len(gdmsg.InvList) > 0 { - sm.syncPeer.QueueMessage(gdmsg, nil) + reqPeer.QueueMessage(gdmsg, nil) } } @@ -1359,11 +1373,11 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { if utreexoViewActive { log.Infof("Received %v block headers: Fetching utreexo headers", sm.headerList.Len()) - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) } else { log.Infof("Received %v block headers: Fetching blocks", sm.headerList.Len()) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } } @@ -1438,7 +1452,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Infof("Received %v block headers: Fetching utreexo headers", sm.headerList.Len()) sm.progressLogger.SetLastLogTime(time.Now()) - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } // Since the first entry of the list is always the final block @@ -1449,7 +1463,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Infof("Received %v block headers: Fetching blocks", sm.headerList.Len()) sm.progressLogger.SetLastLogTime(time.Now()) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -1510,20 +1524,20 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) { log.Infof("Received utreexo headers to block "+ "%d/hash %s. Fetching blocks", node.height, node.hash) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } else if node.height == peer.LastBlock() { log.Infof("Received utreexo headers to block "+ "%d/hash %s. Fetching blocks", node.height, node.hash) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } } } if len(peerState.requestedUtreexoHeaders) < minInFlightBlocks { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) } return From af859771a5b73ad23d0ecf2238840cba8aa77a75 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 17 Jan 2025 15:20:52 +0900 Subject: [PATCH 2/5] netsync: handle headers even when we're not in headers first or headers build mode --- netsync/manager.go | 74 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index be6f5d98..4898adb6 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -1212,21 +1212,79 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) - if !sm.headersFirstMode && !sm.headersBuildMode { - log.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, peer.Addr()) - peer.Disconnect() - return - } // Nothing to do for an empty headers message. if numHeaders == 0 { return } + utreexoViewActive := sm.chain.IsUtreexoViewActive() + + // If we're not in headers first, it means that these headers are for new + // block announcements. + if !sm.headersFirstMode && !sm.headersBuildMode { + best := sm.chain.BestSnapshot() + sm.headerList.Init() + sm.resetHeaderState(&best.Hash, best.Height) + + for _, blockHeader := range msg.Headers { + err := sm.chain.ProcessBlockHeader(blockHeader) + if err != nil { + log.Warnf("Received block header from peer %v "+ + "failed header verification -- disconnecting", + peer.Addr()) + peer.Disconnect() + return + } + + prevNodeEl := sm.headerList.Back() + if prevNodeEl == nil { + log.Warnf("Header list does not contain a previous" + + "element as expected -- disconnecting peer") + peer.Disconnect() + return + } + + prevNode := prevNodeEl.Value.(*headerNode) + blockHash := blockHeader.BlockHash() + node := headerNode{hash: &blockHash} + if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { + node.height = prevNode.height + 1 + e := sm.headerList.PushBack(&node) + if sm.startHeader == nil { + sm.startHeader = e + } + } else { + log.Warnf("Received block header that does not "+ + "properly connect to the chain from peer %s "+ + "-- disconnecting", peer.Addr()) + peer.Disconnect() + return + } + } + + // Since the first entry of the list is always the final block + // that is already in the database and is only used to ensure + // the next header links properly, it must be removed before + // fetching the headers or the utreexo headers. + sm.headerList.Remove(sm.headerList.Front()) + sm.progressLogger.SetLastLogTime(time.Now()) + + if utreexoViewActive { + log.Infof("Received %v block headers: Fetching utreexo headers", + sm.headerList.Len()) + sm.fetchUtreexoHeaders(hmsg.peer) + } else { + log.Infof("Received %v block headers: Fetching blocks", + sm.headerList.Len()) + sm.fetchHeaderBlocks(hmsg.peer) + } + + return + } + if sm.headersBuildMode { var finalHeader *wire.BlockHeader var finalHeight int32 @@ -1308,8 +1366,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - utreexoViewActive := sm.chain.IsUtreexoViewActive() - // This means that we've ran out of checkpoints and need to verify the headers that // we've received. if sm.nextCheckpoint == nil { From bfd6e0974befde6bb8849a6c967d7cc9d4d779bc Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 17 Jan 2025 15:23:03 +0900 Subject: [PATCH 3/5] main: send MsgSendHeaders to peer on VerAck On VerAck, we send MsgSendHeaders to notify the peer that we prefer headers instead of invs for blocks. --- server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server.go b/server.go index 8d7f20a7..88fbc92f 100644 --- a/server.go +++ b/server.go @@ -521,6 +521,10 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // to kick start communication with them. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { sp.server.AddPeer(sp) + + // Let the peer know that we prefer headers over invs for block annoucements. + sendHeadersMsg := wire.NewMsgSendHeaders() + sp.QueueMessage(sendHeadersMsg, nil) } // OnMemPool is invoked when a peer receives a mempool bitcoin message. From cd98878f035f34b6475f94a93e555fd310fe50cc Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 17 Jan 2025 16:32:50 +0900 Subject: [PATCH 4/5] netsync: remove sm.requestedUtreexoHeaders Since we'll always use the individual requestedUtreexoHeaders per peer, we never need to have a global map. The global map is only useful during inv requests but since we'll be syncing off of headers instead, we don't need to have this map at all. --- netsync/manager.go | 50 +++++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index 4898adb6..839c55c6 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -219,12 +219,11 @@ type SyncManager struct { headersBuildMode bool // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint - utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader - requestedUtreexoHeaders map[chainhash.Hash]struct{} + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -366,7 +365,6 @@ func (sm *SyncManager) startSync() { // during headersFirstMode. if !sm.headersFirstMode { sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - sm.requestedUtreexoHeaders = make(map[chainhash.Hash]struct{}) } log.Infof("Syncing to block height %d from peer %v", @@ -693,12 +691,6 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { for blockHash := range state.requestedBlocks { delete(sm.requestedBlocks, blockHash) } - - // Also remove requested utreexo headers from the global map so - // that they will be fetched from elsewhere next time we get an inv. - for blockHash := range state.requestedUtreexoHeaders { - delete(sm.requestedUtreexoHeaders, blockHash) - } } } @@ -1602,7 +1594,7 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) { // We're not in headers-first mode. When we receive a utreexo header, // immediately ask for the block. sm.utreexoHeaders[msg.BlockHash] = hmsg.header - delete(sm.requestedUtreexoHeaders, msg.BlockHash) + delete(peerState.requestedUtreexoHeaders, msg.BlockHash) log.Debugf("accepted utreexo header for block %v. have %v headers", msg.BlockHash, len(sm.utreexoHeaders)) @@ -1946,7 +1938,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { if _, exists := sm.requestedBlocks[iv.Hash]; !exists { amUtreexoNode := sm.chain.IsUtreexoViewActive() if amUtreexoNode { - sm.requestedUtreexoHeaders[iv.Hash] = struct{}{} ghmsg := wire.NewMsgGetUtreexoHeader(iv.Hash) peer.QueueMessage(ghmsg, nil) continue @@ -2432,21 +2423,20 @@ func (sm *SyncManager) Pause() chan<- struct{} { // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { sm := SyncManager{ - peerNotifier: config.PeerNotifier, - chain: config.Chain, - txMemPool: config.TxMemPool, - chainParams: config.ChainParams, - rejectedTxns: make(map[chainhash.Hash]struct{}), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - requestedUtreexoHeaders: make(map[chainhash.Hash]struct{}), - utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader), - peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: newBlockProgressLogger("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - feeEstimator: config.FeeEstimator, + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, + chainParams: config.ChainParams, + rejectedTxns: make(map[chainhash.Hash]struct{}), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + feeEstimator: config.FeeEstimator, } best := sm.chain.BestSnapshot() From dc23fd711584b4a040b94235b2439601e69fcf2c Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 17 Jan 2025 16:34:16 +0900 Subject: [PATCH 5/5] netsync: use fetchHeaderBlocks for newly announced blocks as well We'll only be syncing off of headers so we can re-use the fetchHeaderBlocks even when we're not in headers first mode. --- netsync/manager.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index 839c55c6..40ee9079 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -1598,15 +1598,8 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) { log.Debugf("accepted utreexo header for block %v. have %v headers", msg.BlockHash, len(sm.utreexoHeaders)) - if !sm.headersFirstMode { - sm.requestedBlocks[msg.BlockHash] = struct{}{} - } peerState.requestedBlocks[msg.BlockHash] = struct{}{} - - gdmsg := wire.NewMsgGetData() - iv := wire.NewInvVect(wire.InvTypeWitnessUtreexoBlock, &msg.BlockHash) - gdmsg.AddInvVect(iv) - peer.QueueMessage(gdmsg, nil) + sm.fetchHeaderBlocks(peer) } // handleNotFoundMsg handles notfound messages from all peers.