Skip to content

Commit

Permalink
Merge pull request #214 from kcalvinalvin/2024-11-05-add-leaves-in-ad…
Browse files Browse the repository at this point in the history
…dtransaction

mempool, netsync, electrum, main, peer: Use utreexotx
  • Loading branch information
kcalvinalvin authored Nov 11, 2024
2 parents 77f1b36 + 1eb08e4 commit 6368725
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 121 deletions.
2 changes: 1 addition & 1 deletion electrum/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func handleTransactionBroadcast(s *ElectrumServer, cmd *btcjson.Request, conn ne
}
tx.MsgTx().UData = udata

acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, false, false, 0)
acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, udata, false, false, 0)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
Expand Down
2 changes: 1 addition & 1 deletion mempool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type TxMempool interface {
// error is nil, the list will include the passed transaction itself
// along with any additional orphan transactions that were added as a
// result of the passed one being accepted.
ProcessTransaction(tx *btcutil.Tx, allowOrphan,
ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan,
rateLimit bool, tag Tag) ([]*TxDesc, error)

// RemoveTransaction removes the passed transaction from the mempool.
Expand Down
39 changes: 32 additions & 7 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,22 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil
return txD
}

// addUtreexoData add the passed leaves to the memory pool and caches the proof to the accumulator.
// It should not be called directly as it doesn't perform any validation.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) addUtreexoData(tx *btcutil.Tx, udata *wire.UData) error {
// Ingest the proof. Shouldn't error out with the proof being invalid
// here since we've already verified it above.
err := mp.cfg.VerifyUData(udata, tx.MsgTx().TxIn, true)
if err != nil {
return fmt.Errorf("error while ingesting proof. %v", err)
}
mp.poolLeaves[*tx.Hash()] = udata.LeafDatas

return nil
}

// checkPoolDoubleSpend checks whether or not the passed transaction is
// attempting to spend coins already spent by other transactions in the pool.
// If it does, we'll check whether each of those transactions are signaling for
Expand Down Expand Up @@ -1003,14 +1019,14 @@ func (mp *TxPool) validateReplacement(tx *btcutil.Tx,
// more details.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, isNew, rateLimit,
rejectDupOrphans bool) ([]*chainhash.Hash, *TxDesc, error) {

txHash := tx.Hash()

// Check for mempool acceptance.
r, err := mp.checkMempoolAcceptance(
tx, nil, isNew, rateLimit, rejectDupOrphans,
tx, utreexoData, isNew, rateLimit, rejectDupOrphans,
)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1038,6 +1054,13 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
// it for the ingestion.
mp.removeTransaction(conflict, false)
}
if utreexoData != nil {
err = mp.addUtreexoData(tx, utreexoData)
if err != nil {
return nil, nil, err
}
}

txD := mp.addTransaction(r.utxoView, tx, r.bestHeight, int64(r.TxFee))

log.Debugf("Accepted transaction %v (pool size: %v)", txHash,
Expand All @@ -1057,10 +1080,12 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit,
// be added to the orphan pool.
//
// This function is safe for concurrent access.
func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {
func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData,
isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) {

// Protect concurrent access.
mp.mtx.Lock()
hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true)
hashes, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, isNew, rateLimit, true)
mp.mtx.Unlock()

return hashes, txD, err
Expand Down Expand Up @@ -1103,7 +1128,7 @@ func (mp *TxPool) processOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
// Potentially accept an orphan into the tx pool.
for _, tx := range orphans {
missing, txD, err := mp.maybeAcceptTransaction(
tx, true, true, false)
tx, nil, true, true, false)
if err != nil {
// The orphan is now invalid, so there
// is no way any other orphans which
Expand Down Expand Up @@ -1178,15 +1203,15 @@ func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc {
// the passed one being accepted.
//
// This function is safe for concurrent access.
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
log.Tracef("Processing transaction %v", tx.Hash())

// Protect concurrent access.
mp.mtx.Lock()
defer mp.mtx.Unlock()

// Potentially accept the transaction to the memory pool.
missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
missingParents, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, true, rateLimit,
true)
if err != nil {
return nil, err
Expand Down
24 changes: 12 additions & 12 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (ctx *testContext) addSignedTx(inputs []spendableOutput,
ctx.harness.chain.SetMedianTimePast(time.Now())
} else {
acceptedTxns, err := ctx.harness.txPool.ProcessTransaction(
tx, true, false, 0,
tx, nil, true, false, 0,
)
if err != nil {
ctx.t.Fatalf("unable to process transaction: %v", err)
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand All @@ -498,7 +498,7 @@ func TestSimpleOrphanChain(t *testing.T) {
// all get accepted. Notice the accept orphans flag is also false here
// to ensure it has no bearing on whether or not already existing
// orphans in the pool are linked.
acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0],
acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], nil,
false, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -537,7 +537,7 @@ func TestOrphanReject(t *testing.T) {

// Ensure orphans are rejected when the allow orphans flag is not set.
for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, false,
false, 0)
if err == nil {
t.Fatalf("ProcessTransaction: did not fail on orphan "+
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestOrphanEviction(t *testing.T) {
// Add enough orphans to exceed the max allowed while ensuring they are
// all accepted. This will cause an eviction.
for _, tx := range chainedTxns[1:] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -658,7 +658,7 @@ func TestBasicOrphanRemoval(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestOrphanChainRemoval(t *testing.T) {
// Ensure the orphans are accepted (only up to the maximum allowed so
// none are evicted).
for _, tx := range chainedTxns[1 : maxOrphans+1] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand Down Expand Up @@ -796,7 +796,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
// Start by adding the orphan transactions from the generated chain
// except the final one.
for _, tx := range chainedTxns[1:maxOrphans] {
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true,
acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid "+
Expand All @@ -822,7 +822,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
if err != nil {
t.Fatalf("unable to create signed tx: %v", err)
}
acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx,
acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, nil,
true, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid orphan %v",
Expand All @@ -841,7 +841,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) {
//
// This will cause the shared output to become a concrete spend which
// will in turn must cause the double spending orphan to be removed.
acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0],
acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], nil,
false, false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err)
Expand Down Expand Up @@ -889,7 +889,7 @@ func TestCheckSpend(t *testing.T) {
t.Fatalf("unable to create transaction chain: %v", err)
}
for _, tx := range chainedTxns {
_, err := harness.txPool.ProcessTransaction(tx, true,
_, err := harness.txPool.ProcessTransaction(tx, nil, true,
false, 0)
if err != nil {
t.Fatalf("ProcessTransaction: failed to accept "+
Expand Down Expand Up @@ -1817,7 +1817,7 @@ func TestRBF(t *testing.T) {
// it's not a valid one, we should see the error
// expected by the test.
_, err = ctx.harness.txPool.ProcessTransaction(
replacementTx, false, false, 0,
replacementTx, nil, false, false, 0,
)
if testCase.err == "" && err != nil {
ctx.t.Fatalf("expected no error when "+
Expand Down
2 changes: 1 addition & 1 deletion mempool/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *MockTxMempool) HaveTransaction(hash *chainhash.Hash) bool {
// free-standing transactions into the memory pool. It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, allowOrphan,
func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan,
rateLimit bool, tag Tag) ([]*TxDesc, error) {

args := m.Called(tx, allowOrphan, rateLimit, tag)
Expand Down
44 changes: 34 additions & 10 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ type txMsg struct {
reply chan struct{}
}

// utreexoTxMsg packages a bitcoin utreexo tx message and the peer it came from together
// so the block handler has access to that information.
type utreexoTxMsg struct {
utreexoTx *btcutil.UtreexoTx
peer *peerpkg.Peer
reply chan struct{}
}

// getSyncPeerMsg is a message type to be sent across the message channel for
// retrieving the current sync peer.
type getSyncPeerMsg struct {
Expand Down Expand Up @@ -591,8 +599,7 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) {
}

// handleTxMsg handles transaction messages from all peers.
func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
peer := tmsg.peer
func (sm *SyncManager) handleTxMsg(tx *btcutil.Tx, peer *peerpkg.Peer, utreexoData *wire.UData) {
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received tx message from unknown peer %s", peer)
Expand All @@ -607,7 +614,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
// spec to proliferate. While this is not ideal, there is no check here
// to disconnect peers for sending unsolicited transactions to provide
// interoperability.
txHash := tmsg.tx.Hash()
txHash := tx.Hash()

// Ignore transactions that we have already rejected. Do not
// send a reject message here because if the transaction was already
Expand All @@ -620,7 +627,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {

// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tx, utreexoData,
true, true, mempool.Tag(peer.ID()))

// Remove transaction from request maps. Either the mempool/chain
Expand Down Expand Up @@ -1380,9 +1387,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// Add it to the request queue.
state.requestQueue = append(state.requestQueue, iv)

// If the inv is for a utreexo tx, then also pop off the utreexo
// proof hash invs and add it to the request queue.
if peer.IsUtreexoEnabled() {
if sm.chain.IsUtreexoViewActive() {
switch iv.Type {
case wire.InvTypeTx:
case wire.InvTypeWitnessTx:
Expand Down Expand Up @@ -1549,7 +1554,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}

// Add in the utreexo flag then add the tx inv.
iv.Type |= wire.InvUtreexoFlag
iv.Type = wire.InvTypeUtreexoTx
gdmsg.AddInvVect(iv)
numRequested++

Expand Down Expand Up @@ -1597,7 +1602,11 @@ out:
sm.handleNewPeerMsg(msg.peer)

case *txMsg:
sm.handleTxMsg(msg)
sm.handleTxMsg(msg.tx, msg.peer, nil)
msg.reply <- struct{}{}

case *utreexoTxMsg:
sm.handleTxMsg(&msg.utreexoTx.Tx, msg.peer, &msg.utreexoTx.MsgUtreexoTx().UData)
msg.reply <- struct{}{}

case *blockMsg:
Expand Down Expand Up @@ -1749,8 +1758,10 @@ func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Not

// Reinsert all of the transactions (except the coinbase) into
// the transaction pool.
//
// TODO handle txs here for utreexo nodes.
for _, tx := range block.Transactions()[1:] {
_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, nil,
false, false)
if err != nil {
// Remove the transaction and all transactions
Expand Down Expand Up @@ -1789,6 +1800,19 @@ func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan str
sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
}

// QueueUtreexoTx adds the passed transaction message and peer to the block handling
// queue. Responds to the done channel argument after the utreexo tx message is
// processed.
func (sm *SyncManager) QueueUtreexoTx(tx *btcutil.UtreexoTx, peer *peerpkg.Peer, done chan struct{}) {
// Don't accept more transactions if we're shutting down.
if atomic.LoadInt32(&sm.shutdown) != 0 {
done <- struct{}{}
return
}

sm.msgChan <- &utreexoTxMsg{utreexoTx: tx, peer: peer, reply: done}
}

// QueueBlock adds the passed block message and peer to the block handling
// queue. Responds to the done channel argument after the block message is
// processed.
Expand Down
12 changes: 12 additions & 0 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ type MessageListeners struct {
// OnTx is invoked when a peer receives a tx bitcoin message.
OnTx func(p *Peer, msg *wire.MsgTx)

// OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message.
OnUtreexoTx func(p *Peer, msg *wire.MsgUtreexoTx)

// OnBlock is invoked when a peer receives a block bitcoin message.
OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte)

Expand Down Expand Up @@ -1175,6 +1178,7 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st
pendingResponses[wire.CmdBlock] = deadline
pendingResponses[wire.CmdMerkleBlock] = deadline
pendingResponses[wire.CmdTx] = deadline
pendingResponses[wire.CmdUtreexoTx] = deadline
pendingResponses[wire.CmdNotFound] = deadline

case wire.CmdGetHeaders:
Expand Down Expand Up @@ -1234,10 +1238,13 @@ out:
fallthrough
case wire.CmdTx:
fallthrough
case wire.CmdUtreexoTx:
fallthrough
case wire.CmdNotFound:
delete(pendingResponses, wire.CmdBlock)
delete(pendingResponses, wire.CmdMerkleBlock)
delete(pendingResponses, wire.CmdTx)
delete(pendingResponses, wire.CmdUtreexoTx)
delete(pendingResponses, wire.CmdNotFound)

default:
Expand Down Expand Up @@ -1439,6 +1446,11 @@ out:
p.cfg.Listeners.OnTx(p, msg)
}

case *wire.MsgUtreexoTx:
if p.cfg.Listeners.OnUtreexoTx != nil {
p.cfg.Listeners.OnUtreexoTx(p, msg)
}

case *wire.MsgBlock:
if p.cfg.Listeners.OnBlock != nil {
p.cfg.Listeners.OnBlock(p, msg, buf)
Expand Down
2 changes: 1 addition & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4273,7 +4273,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
// rpcProcessTx checks that the tx is accepted into the mempool and relays it to peers
// and other processes.
func (s *rpcServer) rpcProcessTx(tx *btcutil.Tx, allowOrphan, rateLimit bool) error {
acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, allowOrphan, rateLimit, 0)
acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, nil, allowOrphan, rateLimit, 0)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
Expand Down
Loading

0 comments on commit 6368725

Please sign in to comment.