diff --git a/electrum/server.go b/electrum/server.go index 69091c42f..774beb528 100644 --- a/electrum/server.go +++ b/electrum/server.go @@ -523,7 +523,7 @@ func handleTransactionBroadcast(s *ElectrumServer, cmd *btcjson.Request, conn ne } tx.MsgTx().UData = udata - acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, false, false, 0) + acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, udata, false, false, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, diff --git a/mempool/interface.go b/mempool/interface.go index 3b97c8aa9..e83e9f5e3 100644 --- a/mempool/interface.go +++ b/mempool/interface.go @@ -47,7 +47,7 @@ type TxMempool interface { // error is nil, the list will include the passed transaction itself // along with any additional orphan transactions that were added as a // result of the passed one being accepted. - ProcessTransaction(tx *btcutil.Tx, allowOrphan, + ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) // RemoveTransaction removes the passed transaction from the mempool. diff --git a/mempool/mempool.go b/mempool/mempool.go index 1ec39bcb5..910fcf81a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -586,6 +586,22 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil return txD } +// addUtreexoData add the passed leaves to the memory pool and caches the proof to the accumulator. +// It should not be called directly as it doesn't perform any validation. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *TxPool) addUtreexoData(tx *btcutil.Tx, udata *wire.UData) error { + // Ingest the proof. Shouldn't error out with the proof being invalid + // here since we've already verified it above. + err := mp.cfg.VerifyUData(udata, tx.MsgTx().TxIn, true) + if err != nil { + return fmt.Errorf("error while ingesting proof. %v", err) + } + mp.poolLeaves[*tx.Hash()] = udata.LeafDatas + + return nil +} + // checkPoolDoubleSpend checks whether or not the passed transaction is // attempting to spend coins already spent by other transactions in the pool. // If it does, we'll check whether each of those transactions are signaling for @@ -1003,14 +1019,14 @@ func (mp *TxPool) validateReplacement(tx *btcutil.Tx, // more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, +func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, isNew, rateLimit, rejectDupOrphans bool) ([]*chainhash.Hash, *TxDesc, error) { txHash := tx.Hash() // Check for mempool acceptance. r, err := mp.checkMempoolAcceptance( - tx, nil, isNew, rateLimit, rejectDupOrphans, + tx, utreexoData, isNew, rateLimit, rejectDupOrphans, ) if err != nil { return nil, nil, err @@ -1038,6 +1054,13 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, // it for the ingestion. mp.removeTransaction(conflict, false) } + if utreexoData != nil { + err = mp.addUtreexoData(tx, utreexoData) + if err != nil { + return nil, nil, err + } + } + txD := mp.addTransaction(r.utxoView, tx, r.bestHeight, int64(r.TxFee)) log.Debugf("Accepted transaction %v (pool size: %v)", txHash, @@ -1057,10 +1080,12 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, // be added to the orphan pool. // // This function is safe for concurrent access. -func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) { +func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, + isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) { + // Protect concurrent access. mp.mtx.Lock() - hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true) + hashes, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, isNew, rateLimit, true) mp.mtx.Unlock() return hashes, txD, err @@ -1103,7 +1128,7 @@ func (mp *TxPool) processOrphans(acceptedTx *btcutil.Tx) []*TxDesc { // Potentially accept an orphan into the tx pool. for _, tx := range orphans { missing, txD, err := mp.maybeAcceptTransaction( - tx, true, true, false) + tx, nil, true, true, false) if err != nil { // The orphan is now invalid, so there // is no way any other orphans which @@ -1178,7 +1203,7 @@ func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc { // the passed one being accepted. // // This function is safe for concurrent access. -func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { +func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { log.Tracef("Processing transaction %v", tx.Hash()) // Protect concurrent access. @@ -1186,7 +1211,7 @@ func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool defer mp.mtx.Unlock() // Potentially accept the transaction to the memory pool. - missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit, + missingParents, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, true, rateLimit, true) if err != nil { return nil, err diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 76c6f1214..972de30b9 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -406,7 +406,7 @@ func (ctx *testContext) addSignedTx(inputs []spendableOutput, ctx.harness.chain.SetMedianTimePast(time.Now()) } else { acceptedTxns, err := ctx.harness.txPool.ProcessTransaction( - tx, true, false, 0, + tx, nil, true, false, 0, ) if err != nil { ctx.t.Fatalf("unable to process transaction: %v", err) @@ -475,7 +475,7 @@ func TestSimpleOrphanChain(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -498,7 +498,7 @@ func TestSimpleOrphanChain(t *testing.T) { // all get accepted. Notice the accept orphans flag is also false here // to ensure it has no bearing on whether or not already existing // orphans in the pool are linked. - acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], + acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], nil, false, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -537,7 +537,7 @@ func TestOrphanReject(t *testing.T) { // Ensure orphans are rejected when the allow orphans flag is not set. for _, tx := range chainedTxns[1:] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, false, false, 0) if err == nil { t.Fatalf("ProcessTransaction: did not fail on orphan "+ @@ -594,7 +594,7 @@ func TestOrphanEviction(t *testing.T) { // Add enough orphans to exceed the max allowed while ensuring they are // all accepted. This will cause an eviction. for _, tx := range chainedTxns[1:] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -658,7 +658,7 @@ func TestBasicOrphanRemoval(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -733,7 +733,7 @@ func TestOrphanChainRemoval(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -796,7 +796,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { // Start by adding the orphan transactions from the generated chain // except the final one. for _, tx := range chainedTxns[1:maxOrphans] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -822,7 +822,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { if err != nil { t.Fatalf("unable to create signed tx: %v", err) } - acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, + acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid orphan %v", @@ -841,7 +841,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { // // This will cause the shared output to become a concrete spend which // will in turn must cause the double spending orphan to be removed. - acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], + acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], nil, false, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err) @@ -889,7 +889,7 @@ func TestCheckSpend(t *testing.T) { t.Fatalf("unable to create transaction chain: %v", err) } for _, tx := range chainedTxns { - _, err := harness.txPool.ProcessTransaction(tx, true, + _, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept "+ @@ -1817,7 +1817,7 @@ func TestRBF(t *testing.T) { // it's not a valid one, we should see the error // expected by the test. _, err = ctx.harness.txPool.ProcessTransaction( - replacementTx, false, false, 0, + replacementTx, nil, false, false, 0, ) if testCase.err == "" && err != nil { ctx.t.Fatalf("expected no error when "+ diff --git a/mempool/mocks.go b/mempool/mocks.go index 1a7624534..c27072706 100644 --- a/mempool/mocks.go +++ b/mempool/mocks.go @@ -73,7 +73,7 @@ func (m *MockTxMempool) HaveTransaction(hash *chainhash.Hash) bool { // free-standing transactions into the memory pool. It includes functionality // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. -func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, +func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { args := m.Called(tx, allowOrphan, rateLimit, tag) diff --git a/netsync/manager.go b/netsync/manager.go index 486bfe72e..e87b7c24c 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -100,6 +100,14 @@ type txMsg struct { reply chan struct{} } +// utreexoTxMsg packages a bitcoin utreexo tx message and the peer it came from together +// so the block handler has access to that information. +type utreexoTxMsg struct { + utreexoTx *btcutil.UtreexoTx + peer *peerpkg.Peer + reply chan struct{} +} + // getSyncPeerMsg is a message type to be sent across the message channel for // retrieving the current sync peer. type getSyncPeerMsg struct { @@ -591,8 +599,7 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { } // handleTxMsg handles transaction messages from all peers. -func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { - peer := tmsg.peer +func (sm *SyncManager) handleTxMsg(tx *btcutil.Tx, peer *peerpkg.Peer, utreexoData *wire.UData) { state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received tx message from unknown peer %s", peer) @@ -607,7 +614,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // spec to proliferate. While this is not ideal, there is no check here // to disconnect peers for sending unsolicited transactions to provide // interoperability. - txHash := tmsg.tx.Hash() + txHash := tx.Hash() // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already @@ -620,7 +627,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := sm.txMemPool.ProcessTransaction(tx, utreexoData, true, true, mempool.Tag(peer.ID())) // Remove transaction from request maps. Either the mempool/chain @@ -1380,9 +1387,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // Add it to the request queue. state.requestQueue = append(state.requestQueue, iv) - // If the inv is for a utreexo tx, then also pop off the utreexo - // proof hash invs and add it to the request queue. - if peer.IsUtreexoEnabled() { + if sm.chain.IsUtreexoViewActive() { switch iv.Type { case wire.InvTypeTx: case wire.InvTypeWitnessTx: @@ -1549,7 +1554,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } // Add in the utreexo flag then add the tx inv. - iv.Type |= wire.InvUtreexoFlag + iv.Type = wire.InvTypeUtreexoTx gdmsg.AddInvVect(iv) numRequested++ @@ -1597,7 +1602,11 @@ out: sm.handleNewPeerMsg(msg.peer) case *txMsg: - sm.handleTxMsg(msg) + sm.handleTxMsg(msg.tx, msg.peer, nil) + msg.reply <- struct{}{} + + case *utreexoTxMsg: + sm.handleTxMsg(&msg.utreexoTx.Tx, msg.peer, &msg.utreexoTx.MsgUtreexoTx().UData) msg.reply <- struct{}{} case *blockMsg: @@ -1749,8 +1758,10 @@ func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Not // Reinsert all of the transactions (except the coinbase) into // the transaction pool. + // + // TODO handle txs here for utreexo nodes. for _, tx := range block.Transactions()[1:] { - _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, + _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, nil, false, false) if err != nil { // Remove the transaction and all transactions @@ -1789,6 +1800,19 @@ func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan str sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } +// QueueUtreexoTx adds the passed transaction message and peer to the block handling +// queue. Responds to the done channel argument after the utreexo tx message is +// processed. +func (sm *SyncManager) QueueUtreexoTx(tx *btcutil.UtreexoTx, peer *peerpkg.Peer, done chan struct{}) { + // Don't accept more transactions if we're shutting down. + if atomic.LoadInt32(&sm.shutdown) != 0 { + done <- struct{}{} + return + } + + sm.msgChan <- &utreexoTxMsg{utreexoTx: tx, peer: peer, reply: done} +} + // QueueBlock adds the passed block message and peer to the block handling // queue. Responds to the done channel argument after the block message is // processed. diff --git a/peer/peer.go b/peer/peer.go index 497e8651f..e061c9e1f 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -117,6 +117,9 @@ type MessageListeners struct { // OnTx is invoked when a peer receives a tx bitcoin message. OnTx func(p *Peer, msg *wire.MsgTx) + // OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message. + OnUtreexoTx func(p *Peer, msg *wire.MsgUtreexoTx) + // OnBlock is invoked when a peer receives a block bitcoin message. OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) @@ -1175,6 +1178,7 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdMerkleBlock] = deadline pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdUtreexoTx] = deadline pendingResponses[wire.CmdNotFound] = deadline case wire.CmdGetHeaders: @@ -1234,10 +1238,13 @@ out: fallthrough case wire.CmdTx: fallthrough + case wire.CmdUtreexoTx: + fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdMerkleBlock) delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdUtreexoTx) delete(pendingResponses, wire.CmdNotFound) default: @@ -1439,6 +1446,11 @@ out: p.cfg.Listeners.OnTx(p, msg) } + case *wire.MsgUtreexoTx: + if p.cfg.Listeners.OnUtreexoTx != nil { + p.cfg.Listeners.OnUtreexoTx(p, msg) + } + case *wire.MsgBlock: if p.cfg.Listeners.OnBlock != nil { p.cfg.Listeners.OnBlock(p, msg, buf) diff --git a/rpcserver.go b/rpcserver.go index 9a7f7f0bb..cffb743c3 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4273,7 +4273,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // rpcProcessTx checks that the tx is accepted into the mempool and relays it to peers // and other processes. func (s *rpcServer) rpcProcessTx(tx *btcutil.Tx, allowOrphan, rateLimit bool) error { - acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, allowOrphan, rateLimit, 0) + acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, nil, allowOrphan, rateLimit, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, diff --git a/server.go b/server.go index 610308aae..950368841 100644 --- a/server.go +++ b/server.go @@ -601,6 +601,33 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { <-sp.txProcessed } +// OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message. +// It blocks until the bitcoin transaction has been fully processed. Unlock the block +// handler this does not serialize all transactions through a single thread +// transactions don't rely on the previous one in a linear fashion like blocks. +func (sp *serverPeer) OnUtreexoTx(_ *peer.Peer, msg *wire.MsgUtreexoTx) { + if cfg.BlocksOnly { + peerLog.Tracef("Ignoring utreexo tx %v from %v - blocksonly enabled", + msg.TxHash(), sp) + return + } + + // Add the transaction to the known inventory for the peer. + // Convert the raw MsgUtreexoTx to a btcutil.UtreexoTx which provides some convenience + // methods and things such as hash caching. + tx := btcutil.NewUtreexoTx(msg) + iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + sp.AddKnownInventory(iv) + + // Queue the transaction up to be handled by the sync manager and + // intentionally block further receives until the transaction is fully + // processed and known good or bad. This helps prevent a malicious peer + // from queuing up a bunch of bad transactions before disconnecting (or + // being disconnected) and wasting memory. + sp.server.syncManager.QueueUtreexoTx(tx, sp.Peer, sp.txProcessed) + <-sp.txProcessed +} + // OnBlock is invoked when a peer receives a block bitcoin message. It // blocks until the bitcoin block has been fully processed. func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { @@ -1562,26 +1589,50 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions return err } - // If the requsted encoding is a utreexo encoding, then also grab the - // utreexo proof for the tx. - if encoding&wire.UtreexoEncoding == wire.UtreexoEncoding { - // If utreexo proof index is not present, we can't send the tx - // as we can't grab the proof for the tx. - if s.utreexoProofIndex == nil && s.flatUtreexoProofIndex == nil && cfg.NoUtreexo { - err := fmt.Errorf("UtreexoProofIndex and FlatUtreexoProofIndex is nil. " + - "Cannot fetch utreexo accumulator proofs.") - srvrLog.Debugf(err.Error()) + // If the requsted encoding is not a utreexo encoding, send the tx over and return. + if encoding&wire.UtreexoEncoding != wire.UtreexoEncoding { + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + + sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) + return nil + } + + // If utreexo proof index is not present, we can't send the tx + // as we can't grab the proof for the tx. + if s.utreexoProofIndex == nil && s.flatUtreexoProofIndex == nil && cfg.NoUtreexo { + err := fmt.Errorf("UtreexoProofIndex and FlatUtreexoProofIndex is nil. " + + "Cannot fetch utreexo accumulator proofs.") + srvrLog.Debugf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} + } + + return err + } + + var utreexoTx *wire.MsgUtreexoTx + // For compact state nodes. + if !cfg.NoUtreexo { + // Fetch the necessary leafdatas to create the utreexo data. + leafDatas, err := s.txMemPool.FetchLeafDatas(tx.Hash()) + if err != nil { + chanLog.Errorf(err.Error()) if doneChan != nil { doneChan <- struct{}{} } - return err } - // For compact state nodes. - if !cfg.NoUtreexo { - // Fetch the necessary leafdatas to create the utreexo data. - leafDatas, err := s.txMemPool.FetchLeafDatas(tx.Hash()) + btcdLog.Debugf("fetched %v for tx %s", leafDatas, tx.Hash()) + + // Packed positions may be nil or of a length 0 if the + // peer already has all the necessary proof hashes cached. + if packedPositions != nil || len(packedPositions) == 0 { + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.chain.GenerateUDataPartial(leafDatas, positions) if err != nil { chanLog.Errorf(err.Error()) if doneChan != nil { @@ -1590,69 +1641,63 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions return err } - btcdLog.Debugf("fetched %v for tx %s", leafDatas, tx.Hash()) - - // Packed positions may be nil or of a length 0 if the - // peer already has all the necessary proof hashes cached. - if packedPositions != nil || len(packedPositions) == 0 { - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.chain.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } } - // For bridge nodes. - if s.utreexoProofIndex != nil { - if packedPositions != nil || len(packedPositions) == 0 { - leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + } + + // For bridge nodes. + if s.utreexoProofIndex != nil { + if packedPositions != nil || len(packedPositions) == 0 { + leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.utreexoProofIndex.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.utreexoProofIndex.GenerateUDataPartial(leafDatas, positions) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } - } else if s.flatUtreexoProofIndex != nil { - if packedPositions != nil || len(packedPositions) == 0 { - leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + } + } else if s.flatUtreexoProofIndex != nil { + if packedPositions != nil || len(packedPositions) == 0 { + leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.flatUtreexoProofIndex.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + return err + } + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.flatUtreexoProofIndex.GenerateUDataPartial(leafDatas, positions) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } } } @@ -1662,7 +1707,7 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions <-waitChan } - sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) + sp.QueueMessageWithEncoding(utreexoTx, doneChan, wire.WitnessEncoding) return nil } @@ -2390,6 +2435,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnVerAck: sp.OnVerAck, OnMemPool: sp.OnMemPool, OnTx: sp.OnTx, + OnUtreexoTx: sp.OnUtreexoTx, OnBlock: sp.OnBlock, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders, diff --git a/wire/msgblock.go b/wire/msgblock.go index ad4b306f3..812e143aa 100644 --- a/wire/msgblock.go +++ b/wire/msgblock.go @@ -234,8 +234,15 @@ func (msg *MsgBlock) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) er return err } + // Unset UtreexoEncoding for the encoding that we'll pass off to the + // tx.BtcDecode(). This is done as tx.BtcDecode() expects Utreexo + // Proofs to be appended to each tx if UtreexoEncoding bit is turned on. + // However, this only applies to mempool txs and there are no separate + // Utreexo Proofs for individual txs as the MsgBlock contains a proof + // for all the txs. + txEncoding := enc &^ UtreexoEncoding for _, tx := range msg.Transactions { - err = tx.BtcEncode(w, pver, enc) + err = tx.BtcEncode(w, pver, txEncoding) if err != nil { return err } diff --git a/wire/msgtx.go b/wire/msgtx.go index 8a959a038..d88f74316 100644 --- a/wire/msgtx.go +++ b/wire/msgtx.go @@ -679,14 +679,6 @@ func (msg *MsgTx) BtcDecode(r io.Reader, pver uint32, enc MessageEncoding) error scriptPool.Return(pkScript) } - if enc&UtreexoEncoding == UtreexoEncoding { - msg.UData = new(UData) - err = msg.UData.DeserializeCompact(r) - if err != nil { - return err - } - } - return nil } @@ -786,17 +778,6 @@ func (msg *MsgTx) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) error return err } - if enc&UtreexoEncoding == UtreexoEncoding { - // AccProof can be nil for transactions that are included in - // a block. - if msg.UData != nil { - err = msg.UData.SerializeCompact(w) - if err != nil { - return err - } - } - } - return nil }