Skip to content

Commit

Permalink
tonNode.getOutMsgQueueProof query in public shard overlays
Browse files Browse the repository at this point in the history
  • Loading branch information
SpyCheese committed Dec 3, 2024
1 parent 9ae88d8 commit 7ffd866
Show file tree
Hide file tree
Showing 26 changed files with 657 additions and 5 deletions.
4 changes: 4 additions & 0 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ class HardforkCreator : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
}
Expand Down
6 changes: 6 additions & 0 deletions crypto/block/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ bool EnqueuedMsgDescr::check_key(td::ConstBitPtr key) const {
hash_ == key + 96;
}

bool ImportedMsgQueueLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xd3 // imported_msg_queue_limits#d3
&& cs.fetch_uint_to(32, max_bytes) // max_bytes:#
&& cs.fetch_uint_to(32, max_msgs); // max_msgs:#
}

bool ParamLimits::deserialize(vm::CellSlice& cs) {
return cs.fetch_ulong(8) == 0xc3 // param_limits#c3
&& cs.fetch_uint_to(32, limits_[0]) // underload:uint32
Expand Down
10 changes: 10 additions & 0 deletions crypto/block/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ static inline std::ostream& operator<<(std::ostream& os, const MsgProcessedUptoC
return proc_coll.print(os);
}

struct ImportedMsgQueueLimits {
// Default values
td::uint32 max_bytes = 1 << 16;
td::uint32 max_msgs = 30;
bool deserialize(vm::CellSlice& cs);
ImportedMsgQueueLimits operator*(td::uint32 x) const {
return {max_bytes * x, max_msgs * x};
}
};

struct ParamLimits {
enum { limits_cnt = 4 };
enum { cl_underload = 0, cl_normal = 1, cl_soft = 2, cl_medium = 3, cl_hard = 4 };
Expand Down
15 changes: 15 additions & 0 deletions tdutils/td/utils/StringBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,19 @@ std::enable_if_t<std::is_arithmetic<T>::value, string> to_string(const T &x) {
return sb.as_cslice().str();
}

template <class SB>
struct LambdaPrintHelper {
SB& sb;
};
template <class SB, class F>
SB& operator<<(const LambdaPrintHelper<SB>& helper, F&& f) {
f(helper.sb);
return helper.sb;
}
struct LambdaPrint {};

inline LambdaPrintHelper<td::StringBuilder> operator<<(td::StringBuilder& sb, const LambdaPrint&) {
return LambdaPrintHelper<td::StringBuilder>{sb};
}

} // namespace td
10 changes: 7 additions & 3 deletions tdutils/td/utils/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

#define LOG(level) LOG_IMPL(level, level, true, ::td::Slice())
#define LOG_IF(level, condition) LOG_IMPL(level, level, condition, #condition)
#define FLOG(level) LOG_IMPL(level, level, true, ::td::Slice()) << td::LambdaPrint{} << [&](auto &sb)

#define VLOG(level) LOG_IMPL(DEBUG, level, true, TD_DEFINE_STR(level))
#define VLOG_IF(level, condition) LOG_IMPL(DEBUG, level, condition, TD_DEFINE_STR(level) " " #condition)
Expand All @@ -95,13 +96,13 @@ inline bool no_return_func() {
#define DUMMY_LOG_CHECK(condition) LOG_IF(NEVER, !(condition))

#ifdef TD_DEBUG
#if TD_MSVC
#if TD_MSVC
#define LOG_CHECK(condition) \
__analysis_assume(!!(condition)); \
LOG_IMPL(FATAL, FATAL, !(condition), #condition)
#else
#else
#define LOG_CHECK(condition) LOG_IMPL(FATAL, FATAL, !(condition) && no_return_func(), #condition)
#endif
#endif
#else
#define LOG_CHECK DUMMY_LOG_CHECK
#endif
Expand Down Expand Up @@ -263,6 +264,9 @@ class Logger {
sb_ << other;
return *this;
}
LambdaPrintHelper<td::Logger> operator<<(const LambdaPrint &) {
return LambdaPrintHelper<td::Logger>{*this};
}

MutableCSlice as_cslice() {
return sb_.as_cslice();
Expand Down
4 changes: 4 additions & 0 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ class TestNode : public td::actor::Actor {
void download_archive(ton::BlockSeqno masterchain_seqno, ton::ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override {
}
void download_out_msg_queue_proof(
ton::ShardIdFull dst_shard, std::vector<ton::BlockIdExt> blocks, block::ImportedMsgQueueLimits limits,
td::Timestamp timeout, td::Promise<std::vector<td::Ref<ton::validator::OutMsgQueueProof>>> promise) override {
}

void new_key_block(ton::validator::BlockHandle handle) override {
}
Expand Down
6 changes: 6 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ tonNode.success = tonNode.Success;
tonNode.archiveNotFound = tonNode.ArchiveInfo;
tonNode.archiveInfo id:long = tonNode.ArchiveInfo;

tonNode.importedMsgQueueLimits max_bytes:int max_msgs:int = ImportedMsgQueueLimits;
tonNode.outMsgQueueProof queue_proofs:bytes block_state_proofs:bytes msg_counts:(vector int) = tonNode.OutMsgQueueProof;
tonNode.outMsgQueueProofEmpty = tonNode.OutMsgQueueProof;

tonNode.forgetPeer = tonNode.ForgetPeer;

---functions---
Expand Down Expand Up @@ -483,6 +487,8 @@ tonNode.downloadKeyBlockProofLink block:tonNode.blockIdExt = tonNode.Data;
tonNode.getArchiveInfo masterchain_seqno:int = tonNode.ArchiveInfo;
tonNode.getShardArchiveInfo masterchain_seqno:int shard_prefix:tonNode.shardId = tonNode.ArchiveInfo;
tonNode.getArchiveSlice archive_id:long offset:long max_size:int = tonNode.Data;
tonNode.getOutMsgQueueProof dst_shard:tonNode.shardId blocks:(vector tonNode.blockIdExt)
limits:tonNode.importedMsgQueueLimits = tonNode.OutMsgQueueProof;

tonNode.getCapabilities = tonNode.Capabilities;

Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
1 change: 1 addition & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(VALIDATOR_HEADERS
interfaces/db.h
interfaces/external-message.h
interfaces/liteserver.h
interfaces/out-msg-queue-proof.h
interfaces/proof.h
interfaces/shard.h
interfaces/signature-set.h
Expand Down
87 changes: 87 additions & 0 deletions validator/full-node-shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "net/download-proof.hpp"
#include "net/get-next-key-blocks.hpp"
#include "net/download-archive-slice.hpp"
#include "impl/out-msg-queue-proof.hpp"

#include "td/utils/Random.h"

Expand Down Expand Up @@ -669,6 +670,51 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod
query.offset_, query.max_size_, std::move(promise));
}

void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise) {
std::vector<BlockIdExt> blocks;
for (const auto& x : query.blocks_) {
BlockIdExt id = create_block_id(x);
if (!id.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid block_id"));
return;
}
if (!shard_is_ancestor(shard_, id.shard_full())) {
promise.set_error(td::Status::Error("query in wrong overlay"));
return;
}
blocks.push_back(create_block_id(x));
}
ShardIdFull dst_shard = create_shard_id(query.dst_shard_);
if (!dst_shard.is_valid_ext()) {
promise.set_error(td::Status::Error("invalid shard"));
return;
}
block::ImportedMsgQueueLimits limits{(td::uint32)query.limits_->max_bytes_, (td::uint32)query.limits_->max_msgs_};
if (limits.max_bytes > (1 << 24)) {
promise.set_error(td::Status::Error("max_bytes is too big"));
return;
}
auto P = td::PromiseCreator::lambda(
[promise = std::move(promise)](td::Result<tl_object_ptr<ton_api::tonNode_outMsgQueueProof>> R) mutable {
if (R.is_error()) {
promise.set_result(create_serialize_tl_object<ton_api::tonNode_outMsgQueueProofEmpty>());
} else {
promise.set_result(serialize_tl_object(R.move_as_ok(), true));
}
});
FLOG(DEBUG) {
sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks";
for (const BlockIdExt &id : blocks) {
sb << " " << id.id.to_str();
}
sb << " from " << src;
};
td::actor::create_actor<BuildOutMsgQueueProof>("buildqueueproof", dst_shard, std::move(blocks), limits,
validator_manager_, std::move(P))
.release();
}

void FullNodeShardImpl::receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query,
td::Promise<td::BufferSlice> promise) {
if (!active_) {
Expand Down Expand Up @@ -944,6 +990,47 @@ void FullNodeShardImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFu
.release();
}

void FullNodeShardImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
// TODO: maybe more complex download (like other requests here)
auto &b = choose_neighbour();
if (b.adnl_id == adnl::AdnlNodeIdShort::zero()) {
promise.set_error(td::Status::Error(ErrorCode::notready, "no nodes"));
return;
}
std::vector<tl_object_ptr<ton_api::tonNode_blockIdExt>> blocks_tl;
for (const BlockIdExt &id : blocks) {
blocks_tl.push_back(create_tl_block_id(id));
}
td::BufferSlice query = create_serialize_tl_object<ton_api::tonNode_getOutMsgQueueProof>(
create_tl_shard_id(dst_shard), std::move(blocks_tl),
create_tl_object<ton_api::tonNode_importedMsgQueueLimits>(limits.max_bytes, limits.max_msgs));

auto P = td::PromiseCreator::lambda(
[=, promise = std::move(promise), blocks = std::move(blocks)](td::Result<td::BufferSlice> R) mutable {
if (R.is_error()) {
promise.set_result(R.move_as_error());
return;
}
TRY_RESULT_PROMISE(promise, f, fetch_tl_object<ton_api::tonNode_OutMsgQueueProof>(R.move_as_ok(), true));
ton_api::downcast_call(
*f, td::overloaded(
[&](ton_api::tonNode_outMsgQueueProofEmpty &x) {
promise.set_error(td::Status::Error("node doesn't have this block"));
},
[&](ton_api::tonNode_outMsgQueueProof &x) {
delay_action(
[=, promise = std::move(promise), blocks = std::move(blocks), x = std::move(x)]() mutable {
promise.set_result(OutMsgQueueProof::fetch(dst_shard, blocks, limits, x));
},
td::Timestamp::now());
}));
});
td::actor::send_closure(overlays_, &overlay::Overlays::send_query_via, b.adnl_id, adnl_id_, overlay_id_,
"get_msg_queue", std::move(P), timeout, std::move(query), 1 << 22, rldp_);
}

void FullNodeShardImpl::set_handle(BlockHandle handle, td::Promise<td::Unit> promise) {
CHECK(!handle_);
handle_ = std::move(handle);
Expand Down
3 changes: 3 additions & 0 deletions validator/full-node-shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class FullNodeShard : public td::actor::Actor {
td::Promise<std::vector<BlockIdExt>> promise) = 0;
virtual void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) = 0;
virtual void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) = 0;

virtual void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) = 0;

Expand Down
7 changes: 5 additions & 2 deletions validator/full-node-shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getArchiveSlice &query,
td::Promise<td::BufferSlice> promise);
// void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_prepareNextKeyBlockProof &query,
// td::Promise<td::BufferSlice> promise);
void process_query(adnl::AdnlNodeIdShort src, ton_api::tonNode_getOutMsgQueueProof &query,
td::Promise<td::BufferSlice> promise);
void receive_query(adnl::AdnlNodeIdShort src, td::BufferSlice query, td::Promise<td::BufferSlice> promise);
void receive_message(adnl::AdnlNodeIdShort src, td::BufferSlice data);

Expand Down Expand Up @@ -183,6 +183,9 @@ class FullNodeShardImpl : public FullNodeShard {
td::Promise<std::vector<BlockIdExt>> promise) override;
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise) override;
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override;

void set_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;

Expand Down
24 changes: 24 additions & 0 deletions validator/full-node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,24 @@ void FullNodeImpl::download_archive(BlockSeqno masterchain_seqno, ShardIdFull sh
timeout, std::move(promise));
}

void FullNodeImpl::download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) {
if (blocks.empty()) {
promise.set_value({});
return;
}
// All blocks are expected to have the same minsplit shard prefix
auto shard = get_shard(blocks[0].shard_full());
if (shard.empty()) {
VLOG(FULL_NODE_WARNING) << "dropping download msg queue query to unknown shard";
promise.set_error(td::Status::Error(ErrorCode::notready, "shard not ready"));
return;
}
td::actor::send_closure(shard, &FullNodeShard::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

td::actor::ActorId<FullNodeShard> FullNodeImpl::get_shard(ShardIdFull shard) {
if (shard.is_masterchain()) {
return shards_[ShardIdFull{masterchainId}].actor.get();
Expand Down Expand Up @@ -645,6 +663,12 @@ void FullNodeImpl::start_up() {
td::actor::send_closure(id_, &FullNodeImpl::download_archive, masterchain_seqno, shard_prefix, std::move(tmp_dir),
timeout, std::move(promise));
}
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise) override {
td::actor::send_closure(id_, &FullNodeImpl::download_out_msg_queue_proof, dst_shard, std::move(blocks), limits,
timeout, std::move(promise));
}

void new_key_block(BlockHandle handle) override {
td::actor::send_closure(id_, &FullNodeImpl::new_key_block, std::move(handle));
Expand Down
3 changes: 3 additions & 0 deletions validator/full-node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class FullNodeImpl : public FullNode {
void get_next_key_blocks(BlockIdExt block_id, td::Timestamp timeout, td::Promise<std::vector<BlockIdExt>> promise);
void download_archive(BlockSeqno masterchain_seqno, ShardIdFull shard_prefix, std::string tmp_dir,
td::Timestamp timeout, td::Promise<std::string> promise);
void download_out_msg_queue_proof(ShardIdFull dst_shard, std::vector<BlockIdExt> blocks,
block::ImportedMsgQueueLimits limits, td::Timestamp timeout,
td::Promise<std::vector<td::Ref<OutMsgQueueProof>>> promise);

void got_key_block_config(td::Ref<ConfigHolder> config);
void new_key_block(BlockHandle handle);
Expand Down
2 changes: 2 additions & 0 deletions validator/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(TON_VALIDATOR_SOURCE
ihr-message.cpp
liteserver.cpp
message-queue.cpp
out-msg-queue-proof.cpp
proof.cpp
shard.cpp
signature-set.cpp
Expand All @@ -35,6 +36,7 @@ set(TON_VALIDATOR_SOURCE
liteserver.hpp
liteserver-cache.hpp
message-queue.hpp
out-msg-queue-proof.hpp
proof.hpp
shard.hpp
signature-set.hpp
Expand Down
Loading

0 comments on commit 7ffd866

Please sign in to comment.