diff --git a/netsync/manager.go b/netsync/manager.go index e87962a8..40ee9079 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -219,12 +219,11 @@ type SyncManager struct { headersBuildMode bool // The following fields are used for headers-first mode. - headersFirstMode bool - headerList *list.List - startHeader *list.Element - nextCheckpoint *chaincfg.Checkpoint - utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader - requestedUtreexoHeaders map[chainhash.Hash]struct{} + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *chaincfg.Checkpoint + utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader // An optional fee estimator. feeEstimator *mempool.FeeEstimator @@ -366,7 +365,6 @@ func (sm *SyncManager) startSync() { // during headersFirstMode. if !sm.headersFirstMode { sm.requestedBlocks = make(map[chainhash.Hash]struct{}) - sm.requestedUtreexoHeaders = make(map[chainhash.Hash]struct{}) } log.Infof("Syncing to block height %d from peer %v", @@ -396,11 +394,11 @@ func (sm *SyncManager) startSync() { // should have all the previous headers as well. _, have := sm.utreexoHeaders[*node.hash] if !have { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } } - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -414,11 +412,11 @@ func (sm *SyncManager) startSync() { // should have all the previous headers as well. _, have := sm.utreexoHeaders[*node.hash] if !have { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } } - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -693,12 +691,6 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) { for blockHash := range state.requestedBlocks { delete(sm.requestedBlocks, blockHash) } - - // Also remove requested utreexo headers from the global map so - // that they will be fetched from elsewhere next time we get an inv. - for blockHash := range state.requestedUtreexoHeaders { - delete(sm.requestedUtreexoHeaders, blockHash) - } } } @@ -1030,7 +1022,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { if bmsg.block.Height() < lastHeight { if sm.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } return } @@ -1050,7 +1042,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { if !isCheckpointBlock { if sm.startHeader != nil && len(state.requestedBlocks) < minInFlightBlocks { - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } return } @@ -1090,16 +1082,23 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { // fetchUtreexoHeaders creates and sends a request to the syncPeer for the next // list of utreexo headers to be downloaded based on the current list of headers. -func (sm *SyncManager) fetchUtreexoHeaders() { +// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer. +func (sm *SyncManager) fetchUtreexoHeaders(peer *peerpkg.Peer) { // Nothing to do if there is no start header. if sm.startHeader == nil { log.Warnf("fetchUtreexoHeaders called with no start header") return } - state, exists := sm.peerStates[sm.syncPeer] + // Default to the syncPeer unless we're given a peer by the caller. + reqPeer := sm.syncPeer + if reqPeer == nil { + reqPeer = peer + } + + state, exists := sm.peerStates[reqPeer] if !exists { - log.Warnf("Don't have peer state for sync peer %s", sm.syncPeer.String()) + log.Warnf("Don't have peer state for request peer %s", reqPeer.String()) return } @@ -1115,7 +1114,7 @@ func (sm *SyncManager) fetchUtreexoHeaders() { if !requested && !have { state.requestedUtreexoHeaders[*node.hash] = struct{}{} ghmsg := wire.NewMsgGetUtreexoHeader(*node.hash) - sm.syncPeer.QueueMessage(ghmsg, nil) + reqPeer.QueueMessage(ghmsg, nil) } if len(state.requestedUtreexoHeaders) > minInFlightBlocks { @@ -1126,13 +1125,20 @@ func (sm *SyncManager) fetchUtreexoHeaders() { // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. -func (sm *SyncManager) fetchHeaderBlocks() { +// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer. +func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) { // Nothing to do if there is no start header. if sm.startHeader == nil { log.Warnf("fetchHeaderBlocks called with no start header") return } + // Default to the syncPeer unless we're given a peer by the caller. + reqPeer := sm.syncPeer + if reqPeer == nil { + reqPeer = peer + } + // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. @@ -1153,24 +1159,24 @@ func (sm *SyncManager) fetchHeaderBlocks() { "fetch: %v", err) } if !haveInv { - syncPeerState := sm.peerStates[sm.syncPeer] + syncPeerState := sm.peerStates[reqPeer] syncPeerState.requestedBlocks[*node.hash] = struct{}{} // If we're fetching from a witness enabled peer // post-fork, then ensure that we receive all the // witness data in the blocks. - if sm.syncPeer.IsWitnessEnabled() { + if reqPeer.IsWitnessEnabled() { iv.Type = wire.InvTypeWitnessBlock // If we're syncing from a utreexo enabled peer, also // ask for the proofs. - if sm.syncPeer.IsUtreexoEnabled() { + if reqPeer.IsUtreexoEnabled() { iv.Type = wire.InvTypeWitnessUtreexoBlock } } else { // If we're syncing from a utreexo enabled peer, also // ask for the proofs. - if sm.syncPeer.IsUtreexoEnabled() { + if reqPeer.IsUtreexoEnabled() { iv.Type = wire.InvTypeUtreexoBlock } } @@ -1184,7 +1190,7 @@ func (sm *SyncManager) fetchHeaderBlocks() { } } if len(gdmsg.InvList) > 0 { - sm.syncPeer.QueueMessage(gdmsg, nil) + reqPeer.QueueMessage(gdmsg, nil) } } @@ -1198,21 +1204,79 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) - if !sm.headersFirstMode && !sm.headersBuildMode { - log.Warnf("Got %d unrequested headers from %s -- "+ - "disconnecting", numHeaders, peer.Addr()) - peer.Disconnect() - return - } // Nothing to do for an empty headers message. if numHeaders == 0 { return } + utreexoViewActive := sm.chain.IsUtreexoViewActive() + + // If we're not in headers first, it means that these headers are for new + // block announcements. + if !sm.headersFirstMode && !sm.headersBuildMode { + best := sm.chain.BestSnapshot() + sm.headerList.Init() + sm.resetHeaderState(&best.Hash, best.Height) + + for _, blockHeader := range msg.Headers { + err := sm.chain.ProcessBlockHeader(blockHeader) + if err != nil { + log.Warnf("Received block header from peer %v "+ + "failed header verification -- disconnecting", + peer.Addr()) + peer.Disconnect() + return + } + + prevNodeEl := sm.headerList.Back() + if prevNodeEl == nil { + log.Warnf("Header list does not contain a previous" + + "element as expected -- disconnecting peer") + peer.Disconnect() + return + } + + prevNode := prevNodeEl.Value.(*headerNode) + blockHash := blockHeader.BlockHash() + node := headerNode{hash: &blockHash} + if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { + node.height = prevNode.height + 1 + e := sm.headerList.PushBack(&node) + if sm.startHeader == nil { + sm.startHeader = e + } + } else { + log.Warnf("Received block header that does not "+ + "properly connect to the chain from peer %s "+ + "-- disconnecting", peer.Addr()) + peer.Disconnect() + return + } + } + + // Since the first entry of the list is always the final block + // that is already in the database and is only used to ensure + // the next header links properly, it must be removed before + // fetching the headers or the utreexo headers. + sm.headerList.Remove(sm.headerList.Front()) + sm.progressLogger.SetLastLogTime(time.Now()) + + if utreexoViewActive { + log.Infof("Received %v block headers: Fetching utreexo headers", + sm.headerList.Len()) + sm.fetchUtreexoHeaders(hmsg.peer) + } else { + log.Infof("Received %v block headers: Fetching blocks", + sm.headerList.Len()) + sm.fetchHeaderBlocks(hmsg.peer) + } + + return + } + if sm.headersBuildMode { var finalHeader *wire.BlockHeader var finalHeight int32 @@ -1294,8 +1358,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { return } - utreexoViewActive := sm.chain.IsUtreexoViewActive() - // This means that we've ran out of checkpoints and need to verify the headers that // we've received. if sm.nextCheckpoint == nil { @@ -1359,11 +1421,11 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { if utreexoViewActive { log.Infof("Received %v block headers: Fetching utreexo headers", sm.headerList.Len()) - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) } else { log.Infof("Received %v block headers: Fetching blocks", sm.headerList.Len()) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) } } @@ -1438,7 +1500,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Infof("Received %v block headers: Fetching utreexo headers", sm.headerList.Len()) sm.progressLogger.SetLastLogTime(time.Now()) - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) return } // Since the first entry of the list is always the final block @@ -1449,7 +1511,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { log.Infof("Received %v block headers: Fetching blocks", sm.headerList.Len()) sm.progressLogger.SetLastLogTime(time.Now()) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } @@ -1510,20 +1572,20 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) { log.Infof("Received utreexo headers to block "+ "%d/hash %s. Fetching blocks", node.height, node.hash) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } else if node.height == peer.LastBlock() { log.Infof("Received utreexo headers to block "+ "%d/hash %s. Fetching blocks", node.height, node.hash) - sm.fetchHeaderBlocks() + sm.fetchHeaderBlocks(nil) return } } } if len(peerState.requestedUtreexoHeaders) < minInFlightBlocks { - sm.fetchUtreexoHeaders() + sm.fetchUtreexoHeaders(nil) } return @@ -1532,19 +1594,12 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) { // We're not in headers-first mode. When we receive a utreexo header, // immediately ask for the block. sm.utreexoHeaders[msg.BlockHash] = hmsg.header - delete(sm.requestedUtreexoHeaders, msg.BlockHash) + delete(peerState.requestedUtreexoHeaders, msg.BlockHash) log.Debugf("accepted utreexo header for block %v. have %v headers", msg.BlockHash, len(sm.utreexoHeaders)) - if !sm.headersFirstMode { - sm.requestedBlocks[msg.BlockHash] = struct{}{} - } peerState.requestedBlocks[msg.BlockHash] = struct{}{} - - gdmsg := wire.NewMsgGetData() - iv := wire.NewInvVect(wire.InvTypeWitnessUtreexoBlock, &msg.BlockHash) - gdmsg.AddInvVect(iv) - peer.QueueMessage(gdmsg, nil) + sm.fetchHeaderBlocks(peer) } // handleNotFoundMsg handles notfound messages from all peers. @@ -1876,7 +1931,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { if _, exists := sm.requestedBlocks[iv.Hash]; !exists { amUtreexoNode := sm.chain.IsUtreexoViewActive() if amUtreexoNode { - sm.requestedUtreexoHeaders[iv.Hash] = struct{}{} ghmsg := wire.NewMsgGetUtreexoHeader(iv.Hash) peer.QueueMessage(ghmsg, nil) continue @@ -2362,21 +2416,20 @@ func (sm *SyncManager) Pause() chan<- struct{} { // block, tx, and inv updates. func New(config *Config) (*SyncManager, error) { sm := SyncManager{ - peerNotifier: config.PeerNotifier, - chain: config.Chain, - txMemPool: config.TxMemPool, - chainParams: config.ChainParams, - rejectedTxns: make(map[chainhash.Hash]struct{}), - requestedTxns: make(map[chainhash.Hash]struct{}), - requestedBlocks: make(map[chainhash.Hash]struct{}), - requestedUtreexoHeaders: make(map[chainhash.Hash]struct{}), - utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader), - peerStates: make(map[*peerpkg.Peer]*peerSyncState), - progressLogger: newBlockProgressLogger("Processed", log), - msgChan: make(chan interface{}, config.MaxPeers*3), - headerList: list.New(), - quit: make(chan struct{}), - feeEstimator: config.FeeEstimator, + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, + chainParams: config.ChainParams, + rejectedTxns: make(map[chainhash.Hash]struct{}), + requestedTxns: make(map[chainhash.Hash]struct{}), + requestedBlocks: make(map[chainhash.Hash]struct{}), + utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader), + peerStates: make(map[*peerpkg.Peer]*peerSyncState), + progressLogger: newBlockProgressLogger("Processed", log), + msgChan: make(chan interface{}, config.MaxPeers*3), + headerList: list.New(), + quit: make(chan struct{}), + feeEstimator: config.FeeEstimator, } best := sm.chain.BestSnapshot() diff --git a/server.go b/server.go index 8d7f20a7..88fbc92f 100644 --- a/server.go +++ b/server.go @@ -521,6 +521,10 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgRej // to kick start communication with them. func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { sp.server.AddPeer(sp) + + // Let the peer know that we prefer headers over invs for block annoucements. + sendHeadersMsg := wire.NewMsgSendHeaders() + sp.QueueMessage(sendHeadersMsg, nil) } // OnMemPool is invoked when a peer receives a mempool bitcoin message.