Skip to content

Commit

Permalink
Cache fanout candidates to optimize txreconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
naumenkogs committed Jan 9, 2024
1 parent d595e74 commit c7c6bcc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
55 changes: 42 additions & 13 deletions src/node/txreconciliation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ class TxReconciliationTracker::Impl
*/
std::unordered_map<NodeId, std::variant<uint64_t, TxReconciliationState>> m_states GUARDED_BY(m_txreconciliation_mutex);

/*
* A least-recently-added cache tracking which peers we should fanout a transaction to.
*
* Since the time between cache accesses is on the order of seconds, returning an outdated
* set of peers is not a concern (especially since we fanout to outbound peers, which should
* be hard to manipulate).
*
* No need to use LRU (bump transaction order upon access) because in most cases
* transactions are processed almost-sequentially.
*/
std::deque<Wtxid> tx_fanout_targes_cache_order;
std::map<Wtxid, std::set<NodeId>> tx_fanout_targets_cache_data GUARDED_BY(m_txreconciliation_mutex);

public:
explicit Impl(uint32_t recon_version) : m_recon_version(recon_version) {}

Expand Down Expand Up @@ -204,10 +217,19 @@ class TxReconciliationTracker::Impl
return IsPeerRegistered(peer_id);
}

bool IsFanoutTarget(const CSipHasher& deterministic_randomizer_with_wtxid,
bool IsFanoutTarget(CSipHasher&& deterministic_randomizer,
bool we_initiate, double limit,
NodeId peer_id) const EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
NodeId peer_id, const Wtxid& wtxid) EXCLUSIVE_LOCKS_REQUIRED(m_txreconciliation_mutex)
{
auto fanout_candidates = tx_fanout_targets_cache_data.find(wtxid);
if (fanout_candidates != tx_fanout_targets_cache_data.end()) {
return fanout_candidates->second.find(peer_id) != fanout_candidates->second.end();
}

// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
deterministic_randomizer.Write(wtxid.ToUint256());

// The algorithm works as follows. We iterate through the peers (of a given direction)
// hashing them with the given wtxid, and sort them by this hash.
// We then consider top `limit` peers to be low-fanout flood targets.
Expand All @@ -217,15 +239,15 @@ class TxReconciliationTracker::Impl
// The fractional part of `limit` is stored in the lower 32 bits, and then we check
// whether adding a random lower 32-bit value (first element) would end up modifying
// the higher bits.
const size_t targets_size = ((deterministic_randomizer_with_wtxid.Finalize() & 0xFFFFFFFF) + uint64_t(limit * 0x100000000)) >> 32;
const size_t targets_size = ((deterministic_randomizer.Finalize() & 0xFFFFFFFF) + uint64_t(limit * 0x100000000)) >> 32;

std::vector<std::pair<uint64_t, NodeId>> best_peers;
best_peers.reserve(m_states.size());

for (const auto& indexed_state : m_states) {
const auto cur_state = std::get_if<TxReconciliationState>(&indexed_state.second);
if (cur_state && cur_state->m_we_initiate == we_initiate) {
uint64_t hash_key = CSipHasher(deterministic_randomizer_with_wtxid).Write(cur_state->m_k0).Finalize();
uint64_t hash_key = CSipHasher(deterministic_randomizer).Write(cur_state->m_k0).Finalize();
best_peers.emplace_back(hash_key, indexed_state.first);
}
}
Expand All @@ -235,16 +257,26 @@ class TxReconciliationTracker::Impl
};
std::sort(best_peers.begin(), best_peers.end(), cmp_by_key);

std::set<NodeId> new_fanout_candidates;
auto it = best_peers.begin();
for (size_t i = 0; i < targets_size && it != best_peers.end(); ++i, ++it) {
if (it->second == peer_id) return true;
new_fanout_candidates.insert(it->second);
}

tx_fanout_targets_cache_data.emplace(wtxid, new_fanout_candidates);
// Replace the oldest cache item with this new one.
if (tx_fanout_targes_cache_order.size () == 3000) {
auto expired_tx = tx_fanout_targes_cache_order.front();
tx_fanout_targets_cache_data.erase(expired_tx);
tx_fanout_targes_cache_order.pop_front();
tx_fanout_targes_cache_order.push_back(wtxid);
}
return false;
return new_fanout_candidates.find(peer_id) != new_fanout_candidates.end();
}

bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
const EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
EXCLUSIVE_LOCKS_REQUIRED(!m_txreconciliation_mutex)
{
AssertLockNotHeld(m_txreconciliation_mutex);
LOCK(m_txreconciliation_mutex);
Expand Down Expand Up @@ -281,10 +313,7 @@ class TxReconciliationTracker::Impl
return false;
}

// We use the pre-determined randomness to give a consistent result per transaction,
// thus making sure that no transaction gets "unlucky" if every per-peer roll fails.
deterministic_randomizer.Write(wtxid.ToUint256());
return IsFanoutTarget(std::move(deterministic_randomizer), recon_state.m_we_initiate, destinations, peer_id);
return IsFanoutTarget(std::move(deterministic_randomizer), recon_state.m_we_initiate, destinations, peer_id, wtxid);
}
};

Expand Down Expand Up @@ -323,8 +352,8 @@ bool TxReconciliationTracker::IsPeerRegistered(NodeId peer_id) const
return m_impl->IsPeerRegisteredExternal(peer_id);
}

bool TxReconciliationTracker::ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const
bool TxReconciliationTracker::ShouldFanoutTo(const Wtxid& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay)
{
return m_impl->ShouldFanoutTo(wtxid, std::move(deterministic_randomizer), peer_id,
inbounds_nonrcncl_tx_relay, outbounds_nonrcncl_tx_relay);
Expand Down
4 changes: 2 additions & 2 deletions src/node/txreconciliation.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class TxReconciliationTracker
/**
* Returns whether the peer is chosen as a low-fanout destination for a given tx.
*/
bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher&& deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay) const;
bool ShouldFanoutTo(const Wtxid& wtxid, CSipHasher deterministic_randomizer, NodeId peer_id,
size_t inbounds_nonrcncl_tx_relay, size_t outbounds_nonrcncl_tx_relay);
};

#endif // BITCOIN_NODE_TXRECONCILIATION_H

0 comments on commit c7c6bcc

Please sign in to comment.