Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/dashpay/dash into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
PastaPastaPasta committed Aug 31, 2024
2 parents f188180 + 7a9e475 commit 395154c
Show file tree
Hide file tree
Showing 29 changed files with 727 additions and 163 deletions.
21 changes: 12 additions & 9 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
Expand Down Expand Up @@ -58,18 +59,14 @@ def __init__(self):
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"

if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)

if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(body.hex())
Expand Down Expand Up @@ -118,6 +115,12 @@ async def handle(self) :
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(body.hex())
elif topic == b"sequence":
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

Expand Down
56 changes: 48 additions & 8 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Currently, the following notifications are supported:
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubrawrecoveredsig=address
-zmqpubsequence=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand All @@ -103,23 +104,52 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n
-zmqpubsequencehwm=address

The high water mark value must be an integer greater than or equal to 0.

For instance:

$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubhashblock="tcp://[::1]:28333" \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000

Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
terminator). These options can also be provided in dash.conf.

These options can also be provided in dash.conf.
The topics are:

`sequence`: the body is structured as the following based on the type of message:

<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool

Where the 8-byte uints correspond to the mempool sequence number.

`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages).

| rawtx | <serialized transaction> | <uint32 sequence number in Little Endian>

`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashtx | <32-byte transaction hash in Little Endian> | <uint32 sequence number in Little Endian>


`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages).

| rawblock | <serialized block> | <uint32 sequence number in Little Endian>

`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashblock | <32-byte block hash in Little Endian> | <uint32 sequence number in Little Endian>

**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes.

ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org/4-0:_start).
Expand All @@ -143,6 +173,9 @@ Setting the keepalive values appropriately for your operating environment may
improve connectivity in situations where long-lived connections are silently
dropped by network middle boxes.

Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6
hosts as well. If needed, this option has to be set on the client side too.

## Remarks

From the perspective of dashd, the ZeroMQ socket is write-only; PUB
Expand All @@ -154,13 +187,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.
6 changes: 4 additions & 2 deletions src/dsnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
}
}

void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime)
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime,
uint64_t mempool_sequence)
{
assert(m_cj_ctx && m_llmq_ctx);

Expand All @@ -111,7 +112,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}

void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason)
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence)
{
assert(m_llmq_ctx);

Expand Down
5 changes: 3 additions & 2 deletions src/dsnotificationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CDSNotificationInterface : public CValidationInterface
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override;
void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override;
Expand Down
15 changes: 8 additions & 7 deletions src/evo/deterministicmns.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,25 +239,26 @@ class CDeterministicMNList

[[nodiscard]] size_t GetValidMNsCount() const
{
return ranges::count_if(mnMap, [this](const auto& p){ return IsMNValid(*p.second); });
return ranges::count_if(mnMap, [](const auto& p) { return IsMNValid(*p.second); });
}

[[nodiscard]] size_t GetAllEvoCount() const
{
return ranges::count_if(mnMap, [this](const auto& p) { return p.second->nType == MnType::Evo; });
return ranges::count_if(mnMap, [](const auto& p) { return p.second->nType == MnType::Evo; });
}

[[nodiscard]] size_t GetValidEvoCount() const
{
return ranges::count_if(mnMap, [this](const auto& p) { return p.second->nType == MnType::Evo && IsMNValid(*p.second); });
return ranges::count_if(mnMap,
[](const auto& p) { return p.second->nType == MnType::Evo && IsMNValid(*p.second); });
}

[[nodiscard]] size_t GetValidWeightedMNsCount() const
{
return std::accumulate(mnMap.begin(), mnMap.end(), 0, [this](auto res, const auto& p) {
if (!IsMNValid(*p.second)) return res;
return res + GetMnType(p.second->nType).voting_weight;
});
return std::accumulate(mnMap.begin(), mnMap.end(), 0, [](auto res, const auto& p) {
if (!IsMNValid(*p.second)) return res;
return res + GetMnType(p.second->nType).voting_weight;
});
}

/**
Expand Down
5 changes: 1 addition & 4 deletions src/evo/dmn_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ constexpr auto Invalid = mntype_struct{
}
}

[[nodiscard]] constexpr const bool IsValidMnType(MnType type)
{
return type < MnType::COUNT;
}
[[nodiscard]] constexpr bool IsValidMnType(MnType type) { return type < MnType::COUNT; }

#endif // BITCOIN_EVO_DMN_TYPES_H
11 changes: 7 additions & 4 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node)

#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface);
delete g_zmq_notification_interface;
g_zmq_notification_interface = nullptr;
UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif

Expand Down Expand Up @@ -646,6 +645,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
Expand All @@ -664,6 +664,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
Expand All @@ -683,6 +684,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
Expand All @@ -701,6 +703,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif

argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
Expand Down Expand Up @@ -1736,7 +1739,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
g_zmq_notification_interface = CZMQNotificationInterface::Create();

if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface);
RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class Chain
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}
Expand Down
1 change: 1 addition & 0 deletions src/netmessagemaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class CNetMsgMaker
msg.m_type = std::move(msg_type);
msg.data.reserve(4 * 1024);
CVectorWriter{ SER_NETWORK, nFlags | nVersion, msg.data, 0, std::forward<Args>(args)... };
msg.data.shrink_to_fit();
return msg;
}

Expand Down
10 changes: 5 additions & 5 deletions src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ class NotificationsProxy : public CValidationInterface
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx, nAcceptTime);
m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
Expand Down Expand Up @@ -997,7 +997,7 @@ class ChainImpl : public Chain
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0);
notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0);
}
}
NodeContext& m_node;
Expand Down
Loading

0 comments on commit 395154c

Please sign in to comment.