Skip to content

Commit 043548a

Browse files
Merge #6842: fix: use std::atomic to protect connected manager/signer pointers, check before queueman access, update P2P message map in tests
24f9357 fix: update P2P message map in functional test framework (Kittywhiskers Van Gogh) a432a95 fix: check if `queueman` is initialized before accessing it (Kittywhiskers Van Gogh) 0444e59 trivial: use `std::atomic` to protect connected manager pointers (Kittywhiskers Van Gogh) b7700b3 trivial: use `std::atomic` to protect connected signer pointers (Kittywhiskers Van Gogh) Pull request description: ## Additional Information * Dependency for #6838 This pull request contains the following: * Minor follow-up to [dash#6828](#6828) based on feedback received during review also extended to similar connections introduced in [dash#5980](#5980) ([commit](a5be37c#diff-c065d4cd2398ad0dbcef393c5dfc53f465bf44723348892395fffd2fb3bac522R350-R355)) and [dash#6030](#6030) ([commit](805537e#diff-59f8e9f1b35c1428332caab753a818e3b2146e73bb6c998a2aed5f7eddbc6fa1R357-R363)) * A bugfix to avoid potential crash caused by missing `nullptr` check after `CCoinJoinClientQueueManager` was conditionally initialized in [dash#5163](#5163) ([commit](2d2814e#diff-b1e19192258d83199d8adaa5ac31f067af98f63554bfdd679bd8e8073815e69dR2308-R2310)) * Updating the Python mapping of Dash-specific P2P messages to avoid unexpected test failures ([build](https://github.com/dashpay/dash/actions/runs/17707917238/job/50323243018#step:6:328)), observed when working on [dash#6838](#6838) ## Breaking Changes None expected. ## Checklist - [x] I have performed a self-review of my own code - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)** - [x] I have added or updated relevant unit/integration/functional/e2e tests **(note: N/A)** - [x] I have made corresponding changes to the documentation - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_ ACKs for top commit: UdjinM6: utACK 24f9357 Tree-SHA512: 90b0b2536a7704e3f3da4ece05b6ad09c393a4348aaff87682b7547f6a7d6ffede504176fa1f63bd9ad88961fb1e3b113aae5df357c0dfb70df2fc55500c2b5f
2 parents 24064da + 24f9357 commit 043548a

File tree

11 files changed

+142
-92
lines changed

11 files changed

+142
-92
lines changed

src/chainlock/chainlock.cpp

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,18 @@ CChainLocksHandler::~CChainLocksHandler()
6464

6565
void CChainLocksHandler::Start(const llmq::CInstantSendManager& isman)
6666
{
67-
if (m_signer) {
68-
m_signer->Start();
67+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
68+
signer->Start();
6969
}
7070
scheduler->scheduleEvery(
7171
[&]() {
72+
auto signer = m_signer.load(std::memory_order_acquire);
7273
CheckActiveState();
7374
EnforceBestChainLock();
7475
Cleanup();
7576
// regularly retry signing the current chaintip as it might have failed before due to missing islocks
76-
if (m_signer) {
77-
m_signer->TrySignChainTip(isman);
77+
if (signer) {
78+
signer->TrySignChainTip(isman);
7879
}
7980
},
8081
std::chrono::seconds{5});
@@ -83,8 +84,8 @@ void CChainLocksHandler::Start(const llmq::CInstantSendManager& isman)
8384
void CChainLocksHandler::Stop()
8485
{
8586
scheduler->stop();
86-
if (m_signer) {
87-
m_signer->Stop();
87+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
88+
signer->Stop();
8889
}
8990
}
9091

@@ -218,11 +219,12 @@ void CChainLocksHandler::UpdatedBlockTip(const llmq::CInstantSendManager& isman)
218219
if (bool expected = false; tryLockChainTipScheduled.compare_exchange_strong(expected, true)) {
219220
scheduler->scheduleFromNow(
220221
[&]() {
222+
auto signer = m_signer.load(std::memory_order_acquire);
221223
CheckActiveState();
222224
EnforceBestChainLock();
223225
Cleanup();
224-
if (m_signer) {
225-
m_signer->TrySignChainTip(isman);
226+
if (signer) {
227+
signer->TrySignChainTip(isman);
226228
}
227229
tryLockChainTipScheduled = false;
228230
},
@@ -274,16 +276,16 @@ void CChainLocksHandler::BlockConnected(const std::shared_ptr<const CBlock>& pbl
274276
}
275277

276278
// We need this information later when we try to sign a new tip, so that we can determine if all included TXs are safe.
277-
if (m_signer) {
278-
m_signer->UpdateBlockHashTxidMap(pindex->GetBlockHash(), pblock->vtx);
279+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
280+
signer->UpdateBlockHashTxidMap(pindex->GetBlockHash(), pblock->vtx);
279281
}
280282
}
281283

282284
void CChainLocksHandler::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock,
283285
gsl::not_null<const CBlockIndex*> pindexDisconnected)
284286
{
285-
if (m_signer) {
286-
m_signer->EraseFromBlockHashTxidMap(pindexDisconnected->GetBlockHash());
287+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
288+
signer->EraseFromBlockHashTxidMap(pindexDisconnected->GetBlockHash());
287289
}
288290
}
289291

@@ -451,8 +453,8 @@ void CChainLocksHandler::Cleanup()
451453
}
452454
}
453455

454-
if (m_signer) {
455-
const auto cleanup_txes{m_signer->Cleanup()};
456+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
457+
const auto cleanup_txes{signer->Cleanup()};
456458
LOCK(cs);
457459
for (const auto& tx : cleanup_txes) {
458460
for (const auto& txid : *tx) {

src/chainlock/chainlock.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class CChainLocksHandler final : public chainlock::ChainLockSignerParent
4646
std::unique_ptr<CScheduler> scheduler;
4747
std::unique_ptr<std::thread> scheduler_thread;
4848

49-
chainlock::ChainLockSigner* m_signer{nullptr};
49+
std::atomic<chainlock::ChainLockSigner*> m_signer{nullptr};
5050

5151
mutable Mutex cs;
5252
std::atomic<bool> tryLockChainTipScheduled{false};
@@ -73,10 +73,10 @@ class CChainLocksHandler final : public chainlock::ChainLockSignerParent
7373
void ConnectSigner(gsl::not_null<chainlock::ChainLockSigner*> signer)
7474
{
7575
// Prohibit double initialization
76-
assert(m_signer == nullptr);
77-
m_signer = signer;
76+
assert(m_signer.load(std::memory_order_acquire) == nullptr);
77+
m_signer.store(signer, std::memory_order_release);
7878
}
79-
void DisconnectSigner() { m_signer = nullptr; }
79+
void DisconnectSigner() { m_signer.store(nullptr, std::memory_order_release); }
8080

8181
void Start(const llmq::CInstantSendManager& isman);
8282
void Stop();

src/evo/mnhftx.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,12 @@ static bool extractSignals(const ChainstateManager& chainman, const llmq::CQuoru
201201

202202
std::optional<CMNHFManager::Signals> CMNHFManager::ProcessBlock(const CBlock& block, const CBlockIndex* const pindex, bool fJustCheck, BlockValidationState& state)
203203
{
204-
assert(m_chainman && m_qman);
204+
auto chainman = Assert(m_chainman.load(std::memory_order_acquire));
205+
auto qman = Assert(m_qman.load(std::memory_order_acquire));
205206

206207
try {
207208
std::vector<uint8_t> new_signals;
208-
if (!extractSignals(*m_chainman, *m_qman, block, pindex, new_signals, state)) {
209+
if (!extractSignals(*chainman, *qman, block, pindex, new_signals, state)) {
209210
// state is set inside extractSignals
210211
return std::nullopt;
211212
}
@@ -252,11 +253,12 @@ std::optional<CMNHFManager::Signals> CMNHFManager::ProcessBlock(const CBlock& bl
252253

253254
bool CMNHFManager::UndoBlock(const CBlock& block, const CBlockIndex* const pindex)
254255
{
255-
assert(m_chainman && m_qman);
256+
auto chainman = Assert(m_chainman.load(std::memory_order_acquire));
257+
auto qman = Assert(m_qman.load(std::memory_order_acquire));
256258

257259
std::vector<uint8_t> excluded_signals;
258260
BlockValidationState state;
259-
if (!extractSignals(*m_chainman, *m_qman, block, pindex, excluded_signals, state)) {
261+
if (!extractSignals(*chainman, *qman, block, pindex, excluded_signals, state)) {
260262
LogPrintf("CMNHFManager::%s: failed to extract signals\n", __func__);
261263
return false;
262264
}
@@ -372,19 +374,28 @@ void CMNHFManager::AddSignal(const CBlockIndex* const pindex, int bit)
372374
void CMNHFManager::ConnectManagers(gsl::not_null<ChainstateManager*> chainman, gsl::not_null<llmq::CQuorumManager*> qman)
373375
{
374376
// Do not allow double-initialization
375-
assert(m_chainman == nullptr && m_qman == nullptr);
376-
m_chainman = chainman;
377-
m_qman = qman;
377+
assert(m_chainman.load(std::memory_order_acquire) == nullptr);
378+
m_chainman.store(chainman, std::memory_order_release);
379+
assert(m_qman.load(std::memory_order_acquire) == nullptr);
380+
m_qman.store(qman, std::memory_order_release);
381+
}
382+
383+
void CMNHFManager::DisconnectManagers()
384+
{
385+
m_chainman.store(nullptr, std::memory_order_release);
386+
m_qman.store(nullptr, std::memory_order_release);
378387
}
379388

380389
bool CMNHFManager::ForceSignalDBUpdate()
381390
{
391+
auto chainman = Assert(m_chainman.load(std::memory_order_acquire));
392+
382393
// force ehf signals db update
383394
auto dbTx = m_evoDb.BeginTransaction();
384395

385396
const bool last_legacy = bls::bls_legacy_scheme.load();
386397
bls::bls_legacy_scheme.store(false);
387-
GetSignalsStage(m_chainman->ActiveChainstate().m_chain.Tip());
398+
GetSignalsStage(chainman->ActiveChainstate().m_chain.Tip());
388399
bls::bls_legacy_scheme.store(last_legacy);
389400

390401
dbTx->Commit();

src/evo/mnhftx.h

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
#ifndef BITCOIN_EVO_MNHFTX_H
66
#define BITCOIN_EVO_MNHFTX_H
77

8-
#include <bls/bls.h>
9-
#include <gsl/pointers.h>
8+
#include <saltedhasher.h>
109
#include <sync.h>
1110
#include <threadsafety.h>
11+
#include <versionbits.h>
12+
13+
#include <bls/bls.h>
14+
#include <unordered_lru_cache.h>
15+
16+
#include <gsl/pointers.h>
1217
#include <univalue.h>
1318

19+
#include <atomic>
1420
#include <optional>
15-
#include <saltedhasher.h>
16-
#include <unordered_lru_cache.h>
17-
#include <versionbits.h>
1821

1922
class BlockValidationState;
2023
class CBlock;
@@ -91,8 +94,8 @@ class CMNHFManager : public AbstractEHFManager
9194
{
9295
private:
9396
CEvoDB& m_evoDb;
94-
ChainstateManager* m_chainman{nullptr};
95-
llmq::CQuorumManager* m_qman{nullptr};
97+
std::atomic<ChainstateManager*> m_chainman{nullptr};
98+
std::atomic<llmq::CQuorumManager*> m_qman{nullptr};
9699

97100
static constexpr size_t MNHFCacheSize = 1000;
98101
Mutex cs_cache;
@@ -144,7 +147,7 @@ class CMNHFManager : public AbstractEHFManager
144147
*
145148
* @pre Must be called before LLMQContext (containing llmq::CQuorumManager) is destroyed.
146149
*/
147-
void DisconnectManagers() { m_chainman = nullptr; m_qman = nullptr; };
150+
void DisconnectManagers();
148151

149152
bool ForceSignalDBUpdate() EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
150153

src/instantsend/instantsend.cpp

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ void CInstantSendManager::Start(PeerManager& peerman)
7575

7676
workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); });
7777

78-
if (m_signer) {
79-
m_signer->Start();
78+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
79+
signer->Start();
8080
}
8181
}
8282

8383
void CInstantSendManager::Stop()
8484
{
85-
if (m_signer) {
86-
m_signer->Stop();
85+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
86+
signer->Stop();
8787
}
8888

8989
// make sure to call InterruptWorkerThread() first
@@ -348,8 +348,8 @@ MessageProcessingResult CInstantSendManager::ProcessInstantSendLock(NodeId from,
348348
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n",
349349
__func__, islock->txid.ToString(), hash.ToString(), from);
350350

351-
if (m_signer) {
352-
m_signer->ClearLockFromQueue(islock);
351+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
352+
signer->ClearLockFromQueue(islock);
353353
}
354354
if (db.KnownInstantSendLock(hash)) {
355355
return {};
@@ -449,8 +449,8 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
449449
}
450450

451451
if (islock == nullptr) {
452-
if (m_signer) {
453-
m_signer->ProcessTx(*tx, false, Params().GetConsensus());
452+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
453+
signer->ProcessTx(*tx, false, Params().GetConsensus());
454454
}
455455
// TX is not locked, so make sure it is tracked
456456
AddNonLockedTx(tx, nullptr);
@@ -491,8 +491,8 @@ void CInstantSendManager::BlockConnected(const std::shared_ptr<const CBlock>& pb
491491
}
492492

493493
if (!IsLocked(tx->GetHash()) && !has_chainlock) {
494-
if (m_signer) {
495-
m_signer->ProcessTx(*tx, true, Params().GetConsensus());
494+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
495+
signer->ProcessTx(*tx, true, Params().GetConsensus());
496496
}
497497
// TX is not locked, so make sure it is tracked
498498
AddNonLockedTx(tx, pindex);
@@ -597,16 +597,16 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild
597597
void CInstantSendManager::RemoveConflictedTx(const CTransaction& tx)
598598
{
599599
RemoveNonLockedTx(tx.GetHash(), false);
600-
if (m_signer) {
601-
m_signer->ClearInputsFromQueue(GetIdsFromLockable(tx.vin));
600+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
601+
signer->ClearInputsFromQueue(GetIdsFromLockable(tx.vin));
602602
}
603603
}
604604

605605
void CInstantSendManager::TruncateRecoveredSigsForInputs(const instantsend::InstantSendLock& islock)
606606
{
607607
auto ids = GetIdsFromLockable(islock.inputs);
608-
if (m_signer) {
609-
m_signer->ClearInputsFromQueue(ids);
608+
if (auto signer = m_signer.load(std::memory_order_acquire); signer) {
609+
signer->ClearInputsFromQueue(ids);
610610
}
611611
for (const auto& id : ids) {
612612
sigman.TruncateRecoveredSig(Params().GetConsensus().llmqTypeDIP0024InstantSend, id);
@@ -925,7 +925,8 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
925925
for (auto& [node_id, mpr] : peer_activity) {
926926
peerman.PostProcessMessage(std::move(mpr), node_id);
927927
}
928-
if (!m_signer) return more_work;
928+
auto signer = m_signer.load(std::memory_order_acquire);
929+
if (!signer) return more_work;
929930
// Construct set of non-locked transactions that are pending to retry
930931
std::vector<CTransactionRef> txns{};
931932
{
@@ -942,7 +943,7 @@ void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
942943
}
943944
}
944945
// Retry processing them
945-
m_signer->ProcessPendingRetryLockTxs(txns);
946+
signer->ProcessPendingRetryLockTxs(txns);
946947
return more_work;
947948
}();
948949

src/instantsend/instantsend.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
5959
CTxMemPool& mempool;
6060
const CMasternodeSync& m_mn_sync;
6161

62-
instantsend::InstantSendSigner* m_signer{nullptr};
62+
std::atomic<instantsend::InstantSendSigner*> m_signer{nullptr};
6363

6464
std::thread workThread;
6565
CThreadInterrupt workInterrupt;
@@ -97,10 +97,10 @@ class CInstantSendManager final : public instantsend::InstantSendSignerParent
9797
void ConnectSigner(gsl::not_null<instantsend::InstantSendSigner*> signer)
9898
{
9999
// Prohibit double initialization
100-
assert(m_signer == nullptr);
101-
m_signer = signer;
100+
assert(m_signer.load(std::memory_order_acquire) == nullptr);
101+
m_signer.store(signer, std::memory_order_release);
102102
}
103-
void DisconnectSigner() { m_signer = nullptr; }
103+
void DisconnectSigner() { m_signer.store(nullptr, std::memory_order_release); }
104104

105105
void Start(PeerManager& peerman);
106106
void Stop();

src/net_processing.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2276,11 +2276,11 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv)
22762276
case MSG_ISDLOCK:
22772277
return m_llmq_ctx->isman->AlreadyHave(inv);
22782278
case MSG_DSQ:
2279+
return
22792280
#ifdef ENABLE_WALLET
2280-
return m_cj_ctx->queueman->HasQueue(inv.hash) || (m_active_ctx && m_active_ctx->cj_server->HasQueue(inv.hash));
2281-
#else
2282-
return m_active_ctx && m_active_ctx->cj_server->HasQueue(inv.hash);
2283-
#endif
2281+
(m_cj_ctx->queueman && m_cj_ctx->queueman->HasQueue(inv.hash)) ||
2282+
#endif // ENABLE_WALLET
2283+
(m_active_ctx && m_active_ctx->cj_server->HasQueue(inv.hash));
22842284
}
22852285

22862286

@@ -2885,7 +2885,7 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic
28852885
if (!push && inv.type == MSG_DSQ) {
28862886
auto opt_dsq = m_active_ctx ? m_active_ctx->cj_server->GetQueueFromHash(inv.hash) : std::nullopt;
28872887
#ifdef ENABLE_WALLET
2888-
if (!opt_dsq.has_value()) {
2888+
if (m_cj_ctx->queueman && !opt_dsq.has_value()) {
28892889
opt_dsq = m_cj_ctx->queueman->GetQueueFromHash(inv.hash);
28902890
}
28912891
#endif
@@ -5273,7 +5273,9 @@ void PeerManagerImpl::ProcessMessage(
52735273
{
52745274
//probably one the extensions
52755275
#ifdef ENABLE_WALLET
5276-
PostProcessMessage(m_cj_ctx->queueman->ProcessMessage(pfrom.GetId(), m_connman, *this, msg_type, vRecv), pfrom.GetId());
5276+
if (m_cj_ctx->queueman) {
5277+
PostProcessMessage(m_cj_ctx->queueman->ProcessMessage(pfrom.GetId(), m_connman, *this, msg_type, vRecv), pfrom.GetId());
5278+
}
52775279
m_cj_ctx->walletman->ForEachCJClientMan([this, &pfrom, &msg_type, &vRecv](std::unique_ptr<CCoinJoinClientManager>& clientman) {
52785280
clientman->ProcessMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv);
52795281
});

src/rpc/coinjoin.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,9 @@ static RPCHelpMan getcoinjoininfo()
474474
#ifdef ENABLE_WALLET
475475
CCoinJoinClientOptions::GetJsonInfo(obj);
476476

477-
obj.pushKV("queue_size", node.cj_ctx->queueman->GetQueueSize());
477+
if (node.cj_ctx->queueman) {
478+
obj.pushKV("queue_size", node.cj_ctx->queueman->GetQueueSize());
479+
}
478480

479481
const std::shared_ptr<const CWallet> wallet = GetWalletForJSONRPCRequest(request);
480482
if (!wallet) {

0 commit comments

Comments
 (0)