Skip to content

Commit 653c61f

Browse files
committed
Feature/syncing node (#1648)
* Syncing node Signed-off-by: iceseer <[email protected]> Signed-off-by: Alexander Lednev <[email protected]> * YAC removed Signed-off-by: iceseer <[email protected]> Signed-off-by: Alexander Lednev <[email protected]> * RocksDB store sync peers Signed-off-by: iceseer <[email protected]> Signed-off-by: Alexander Lednev <[email protected]> * Peers refactoring Signed-off-by: iceseer <[email protected]> Signed-off-by: Alexander Lednev <[email protected]> * Ledger state Signed-off-by: iceseer <[email protected]> Signed-off-by: Alexander Lednev <[email protected]>
1 parent 0fc3c5d commit 653c61f

File tree

71 files changed

+732
-271
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+732
-271
lines changed

irohad/ametsuchi/impl/mutable_storage_impl.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,22 @@ namespace iroha::ametsuchi {
7878
block_storage_->insert(block);
7979
block_index_->index(*block);
8080

81-
auto opt_ledger_peers = peer_query_->getLedgerPeers();
82-
if (not opt_ledger_peers) {
83-
log_->error("Failed to get ledger peers!");
84-
return false;
85-
}
81+
boost::optional<
82+
std::vector<std::shared_ptr<shared_model::interface::Peer>>>
83+
opt_ledger_peers[] = {peer_query_->getLedgerPeers(false),
84+
peer_query_->getLedgerPeers(true)};
85+
86+
for (auto const &peer_list : opt_ledger_peers)
87+
if (!peer_list) {
88+
log_->error("Failed to get ledger peers!");
89+
return false;
90+
}
8691

8792
ledger_state_ = std::make_shared<const LedgerState>(
88-
std::move(*opt_ledger_peers), block->height(), block->hash());
93+
std::move(*(opt_ledger_peers[0])), // peers
94+
std::move(*(opt_ledger_peers[1])), // syncing peers
95+
block->height(),
96+
block->hash());
8997
}
9098

9199
return block_applied;
@@ -161,7 +169,8 @@ namespace iroha::ametsuchi {
161169
try {
162170
db_tx_.rollback();
163171
} catch (std::exception &e) {
164-
log_->warn("~MutableStorageImpl(): rollback failed. Reason: {}", e.what());
172+
log_->warn("~MutableStorageImpl(): rollback failed. Reason: {}",
173+
e.what());
165174
}
166175
}
167176
}

irohad/ametsuchi/impl/peer_query_wsv.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ namespace iroha {
1515
PeerQueryWsv::PeerQueryWsv(std::shared_ptr<WsvQuery> wsv)
1616
: wsv_(std::move(wsv)) {}
1717

18-
boost::optional<std::vector<PeerQuery::wPeer>>
19-
PeerQueryWsv::getLedgerPeers() {
20-
return wsv_->getPeers();
18+
boost::optional<std::vector<PeerQuery::wPeer>> PeerQueryWsv::getLedgerPeers(
19+
bool syncing_peers) {
20+
return wsv_->getPeers(syncing_peers);
2121
}
2222

2323
boost::optional<PeerQuery::wPeer> PeerQueryWsv::getLedgerPeerByPublicKey(

irohad/ametsuchi/impl/peer_query_wsv.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ namespace iroha {
2929
* Fetch peers stored in ledger
3030
* @return list of peers in insertion to ledger order
3131
*/
32-
boost::optional<std::vector<wPeer>> getLedgerPeers() override;
32+
boost::optional<std::vector<wPeer>> getLedgerPeers(
33+
bool syncing_peers) override;
3334

3435
/**
3536
* Fetch peer with given public key from ledger

irohad/ametsuchi/impl/postgres_command_executor.cpp

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,26 @@ namespace iroha {
697697
AND NOT (SELECT * FROM has_root_perm) THEN 2
698698
WHEN NOT (SELECT * FROM has_perm) THEN 2)"});
699699

700+
add_sync_peer_statements_ = makeCommandStatements(
701+
sql_,
702+
R"(
703+
WITH %s
704+
inserted AS (
705+
INSERT INTO sync_peer(public_key, address, tls_certificate)
706+
(
707+
SELECT lower(:pubkey), :address, :tls_certificate
708+
%s
709+
) RETURNING (1)
710+
)
711+
SELECT CASE WHEN EXISTS (SELECT * FROM inserted) THEN 0
712+
%s
713+
ELSE 1 END AS result)",
714+
{(boost::format(R"(has_perm AS (%s),)")
715+
% checkAccountRolePermission(Role::kAddPeer, ":creator"))
716+
.str(),
717+
"WHERE (SELECT * FROM has_perm)",
718+
"WHEN NOT (SELECT * from has_perm) THEN 2"});
719+
700720
compare_and_set_account_detail_statements_ = makeCommandStatements(
701721
sql_,
702722
R"(
@@ -1176,6 +1196,40 @@ namespace iroha {
11761196
R"( AND (SELECT * FROM has_perm))",
11771197
R"( WHEN NOT (SELECT * FROM has_perm) THEN 2 )"});
11781198

1199+
remove_sync_peer_statements_ = makeCommandStatements(
1200+
sql_,
1201+
R"(
1202+
WITH %s
1203+
removed AS (
1204+
DELETE FROM sync_peer WHERE public_key = lower(:pubkey)
1205+
%s
1206+
RETURNING (1)
1207+
)
1208+
SELECT CASE
1209+
WHEN EXISTS (SELECT * FROM removed) THEN 0
1210+
%s
1211+
ELSE 1
1212+
END AS result)",
1213+
{(boost::format(R"(
1214+
has_perm AS (%s),
1215+
get_peer AS (
1216+
SELECT * from sync_peer WHERE public_key = lower(:pubkey) LIMIT 1
1217+
),
1218+
check_peers AS (
1219+
SELECT 1 WHERE (SELECT COUNT(*) FROM sync_peer) > 0
1220+
),)")
1221+
% checkAccountRolePermission(
1222+
Role::kAddPeer, Role::kRemovePeer, ":creator"))
1223+
.str(),
1224+
R"(
1225+
AND (SELECT * FROM has_perm)
1226+
AND EXISTS (SELECT * FROM get_peer)
1227+
AND EXISTS (SELECT * FROM check_peers))",
1228+
R"(
1229+
WHEN NOT EXISTS (SELECT * from get_peer) THEN 3
1230+
WHEN NOT EXISTS (SELECT * from check_peers) THEN 4
1231+
WHEN NOT (SELECT * from has_perm) THEN 2)"});
1232+
11791233
set_quorum_statements_ = makeCommandStatements(
11801234
sql_,
11811235
R"(
@@ -1499,8 +1553,12 @@ namespace iroha {
14991553
bool do_validation) {
15001554
auto &peer = command.peer();
15011555

1502-
StatementExecutor executor(
1503-
add_peer_statements_, do_validation, "AddPeer", perm_converter_);
1556+
StatementExecutor executor(peer.isSyncingPeer()
1557+
? add_sync_peer_statements_
1558+
: add_peer_statements_,
1559+
do_validation,
1560+
"AddPeer",
1561+
perm_converter_);
15041562
executor.use("creator", creator_account_id);
15051563
executor.use("address", peer.address());
15061564
executor.use("pubkey", peer.pubkey());
@@ -1795,6 +1853,16 @@ namespace iroha {
17951853
bool do_validation) {
17961854
auto pubkey = command.pubkey();
17971855

1856+
{
1857+
StatementExecutor executor(remove_sync_peer_statements_,
1858+
do_validation,
1859+
"RemovePeer",
1860+
perm_converter_);
1861+
executor.use("creator", creator_account_id);
1862+
executor.use("pubkey", pubkey);
1863+
executor.execute();
1864+
}
1865+
17981866
StatementExecutor executor(remove_peer_statements_,
17991867
do_validation,
18001868
"RemovePeer",

irohad/ametsuchi/impl/postgres_command_executor.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ namespace iroha {
255255

256256
std::unique_ptr<CommandStatements> add_asset_quantity_statements_;
257257
std::unique_ptr<CommandStatements> add_peer_statements_;
258+
std::unique_ptr<CommandStatements> add_sync_peer_statements_;
258259
std::unique_ptr<CommandStatements> add_signatory_statements_;
259260
std::unique_ptr<CommandStatements> append_role_statements_;
260261
std::unique_ptr<CommandStatements>
@@ -266,6 +267,7 @@ namespace iroha {
266267
std::unique_ptr<CommandStatements> detach_role_statements_;
267268
std::unique_ptr<CommandStatements> grant_permission_statements_;
268269
std::unique_ptr<CommandStatements> remove_peer_statements_;
270+
std::unique_ptr<CommandStatements> remove_sync_peer_statements_;
269271
std::unique_ptr<CommandStatements> remove_signatory_statements_;
270272
std::unique_ptr<CommandStatements> revoke_permission_statements_;
271273
std::unique_ptr<CommandStatements> set_account_detail_statements_;

irohad/ametsuchi/impl/postgres_specific_query_executor.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,9 @@ namespace iroha {
14811481
R"(WITH has_perms AS ({})
14821482
SELECT public_key, address, tls_certificate, perm FROM peer
14831483
RIGHT OUTER JOIN has_perms ON TRUE
1484+
UNION
1485+
SELECT public_key, address, tls_certificate, perm FROM sync_peer
1486+
RIGHT OUTER JOIN has_perms ON TRUE
14841487
)",
14851488
getAccountRolePermissionCheckSql(Role::kGetPeers));
14861489

@@ -1500,12 +1503,10 @@ namespace iroha {
15001503
if (peer_key and address) {
15011504
peers.push_back(
15021505
std::make_shared<shared_model::plain::Peer>(
1503-
*address, *std::move(peer_key), tls_certificate));
1504-
} else {
1505-
log_->error(
1506-
"Address or public key not set for some peer!");
1507-
assert(peer_key);
1508-
assert(address);
1506+
*address,
1507+
*std::move(peer_key),
1508+
tls_certificate,
1509+
false));
15091510
}
15101511
});
15111512
}

irohad/ametsuchi/impl/postgres_wsv_command.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,10 @@ namespace iroha {
314314
WsvCommandResult PostgresWsvCommand::insertPeer(
315315
const shared_model::interface::Peer &peer) {
316316
soci::statement st = sql_.prepare
317-
<< "INSERT INTO peer(public_key, address, tls_certificate)"
318-
" VALUES (lower(:pk), :address, :tls_certificate)";
317+
<< fmt::format("INSERT INTO {}(public_key, address, "
318+
"tls_certificate) VALUES (lower(:pk), :address, "
319+
":tls_certificate)",
320+
peer.isSyncingPeer() ? "sync_peer" : "peer");
319321
st.exchange(soci::use(peer.pubkey()));
320322
st.exchange(soci::use(peer.address()));
321323
st.exchange(soci::use(peer.tlsCertificate()));
@@ -329,8 +331,12 @@ namespace iroha {
329331
WsvCommandResult PostgresWsvCommand::deletePeer(
330332
const shared_model::interface::Peer &peer) {
331333
soci::statement st = sql_.prepare
332-
<< "DELETE FROM peer WHERE public_key = lower(:pk) AND address = "
333-
":address";
334+
<< fmt::format("DELETE FROM {} WHERE public_key = "
335+
"lower(:pk) AND address = :address",
336+
peer.isSyncingPeer() ? "sync_peer" : "peer"
337+
338+
);
339+
334340
st.exchange(soci::use(peer.pubkey()));
335341
st.exchange(soci::use(peer.address()));
336342

irohad/ametsuchi/impl/postgres_wsv_query.cpp

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
namespace {
1818
template <typename T>
1919
boost::optional<std::vector<std::shared_ptr<shared_model::interface::Peer>>>
20-
getPeersFromSociRowSet(T &&rowset) {
20+
getPeersFromSociRowSet(T &&rowset, bool syncing_peer) {
2121
return iroha::ametsuchi::flatMapValues<
2222
std::vector<std::shared_ptr<shared_model::interface::Peer>>>(
2323
std::forward<T>(rowset),
2424
[&](auto &public_key, auto &address, auto &tls_certificate) {
2525
return boost::make_optional(
26-
std::make_shared<shared_model::plain::Peer>(
27-
address, std::move(public_key), tls_certificate));
26+
std::make_shared<shared_model::plain::Peer>(address,
27+
std::move(public_key),
28+
tls_certificate,
29+
syncing_peer));
2830
});
2931
}
3032
} // namespace
@@ -69,15 +71,19 @@ namespace iroha {
6971
}
7072

7173
boost::optional<std::vector<std::shared_ptr<shared_model::interface::Peer>>>
72-
PostgresWsvQuery::getPeers() {
74+
PostgresWsvQuery::getPeers(bool syncing_peers) {
7375
using T = boost::
7476
tuple<std::string, AddressType, std::optional<TLSCertificateType>>;
7577
auto result = execute<T>([&] {
76-
return (sql_.prepare
77-
<< "SELECT public_key, address, tls_certificate FROM peer");
78+
return (
79+
sql_.prepare
80+
<< (syncing_peers
81+
? "SELECT public_key, address, tls_certificate FROM "
82+
"sync_peer"
83+
: "SELECT public_key, address, tls_certificate FROM peer"));
7884
});
7985

80-
return getPeersFromSociRowSet(result);
86+
return getPeersFromSociRowSet(result, syncing_peers);
8187
}
8288

8389
iroha::expected::Result<size_t, std::string> PostgresWsvQuery::count(
@@ -92,9 +98,9 @@ namespace iroha {
9298
return iroha::expected::makeError(msg);
9399
}
94100

95-
iroha::expected::Result<size_t, std::string>
96-
PostgresWsvQuery::countPeers() {
97-
return count("peer");
101+
iroha::expected::Result<size_t, std::string> PostgresWsvQuery::countPeers(
102+
bool syncing_peers) {
103+
return count(syncing_peers ? "sync_peer" : "peer");
98104
}
99105

100106
iroha::expected::Result<size_t, std::string>
@@ -117,13 +123,13 @@ namespace iroha {
117123
std::string target_public_key{public_key};
118124
auto result = execute<T>([&] {
119125
return (sql_.prepare << R"(
120-
SELECT public_key, address, tls_certificate
121-
FROM peer
122-
WHERE public_key = :public_key)",
126+
SELECT public_key, address, tls_certificate FROM peer WHERE public_key = :public_key
127+
UNION
128+
SELECT public_key, address, tls_certificate FROM sync_peer WHERE public_key = :public_key)",
123129
soci::use(target_public_key, "public_key"));
124130
});
125131

126-
return getPeersFromSociRowSet(result) | [](auto &&peers)
132+
return getPeersFromSociRowSet(result, false) | [](auto &&peers)
127133
-> boost::optional<
128134
std::shared_ptr<shared_model::interface::Peer>> {
129135
if (!peers.empty()) {

irohad/ametsuchi/impl/postgres_wsv_query.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ namespace iroha {
2626

2727
boost::optional<
2828
std::vector<std::shared_ptr<shared_model::interface::Peer>>>
29-
getPeers() override;
29+
getPeers(bool syncing_peers) override;
3030

31-
iroha::expected::Result<size_t, std::string> countPeers() override;
31+
iroha::expected::Result<size_t, std::string> countPeers(
32+
bool syncing_peers) override;
3233
iroha::expected::Result<size_t, std::string> countDomains() override;
3334
iroha::expected::Result<size_t, std::string> countTransactions() override;
3435

0 commit comments

Comments
 (0)