From d10726c71f39fb52436f345ae4786864befb1a71 Mon Sep 17 00:00:00 2001 From: Konstantin Munichev Date: Mon, 1 Apr 2019 22:24:06 +0300 Subject: [PATCH 1/4] Implement strategy for batch resending from cache Signed-off-by: Konstantin Munichev --- irohad/main/application.cpp | 5 + irohad/main/impl/on_demand_ordering_init.cpp | 10 + irohad/main/impl/on_demand_ordering_init.hpp | 11 +- irohad/ordering/CMakeLists.txt | 1 + .../impl/on_demand_connection_manager.cpp | 48 +++- .../impl/on_demand_connection_manager.hpp | 4 + .../ordering/impl/on_demand_ordering_gate.cpp | 13 +- .../ordering/impl/on_demand_ordering_gate.hpp | 3 + .../on_demand_resend_strategy.cpp | 106 +++++++++ .../on_demand_resend_strategy.hpp | 46 ++++ .../ordering_gate_resend_strategy.hpp | 67 ++++++ test/module/irohad/ordering/CMakeLists.txt | 6 + .../on_demand_connection_manager_test.cpp | 3 + .../ordering/on_demand_ordering_gate_test.cpp | 3 + .../on_demand_resend_strategy_test.cpp | 205 ++++++++++++++++++ .../module/irohad/ordering/ordering_mocks.hpp | 18 ++ 16 files changed, 541 insertions(+), 8 deletions(-) create mode 100644 irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp create mode 100644 irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp create mode 100644 irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp create mode 100644 test/module/irohad/ordering/on_demand_resend_strategy_test.cpp diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index bb6c7e3287..5d4236cb14 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -37,6 +37,7 @@ #include "network/impl/peer_communication_service_impl.hpp" #include "ordering/impl/on_demand_common.hpp" #include "ordering/impl/on_demand_ordering_gate.hpp" +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" #include "simulator/impl/simulator.hpp" #include "synchronizer/impl/synchronizer_impl.hpp" @@ -347,6 +348,9 @@ void Irohad::initOrderingGate() { auto factory = std::make_unique>(); + auto batch_resend_strategy = + std::make_shared(); + const uint64_t kCounter = 0, kMaxLocalCounter = 2; // reject_delay and local_counter are local mutable variables of lambda const auto kMaxDelay(max_rounds_delay_); @@ -386,6 +390,7 @@ void Irohad::initOrderingGate() { std::move(factory), proposal_factory, persistent_cache, + std::move(batch_resend_strategy), delay, log_manager_->getChild("Ordering")); log_->info("[Init] => init ordering gate - [{}]", diff --git a/irohad/main/impl/on_demand_ordering_init.cpp b/irohad/main/impl/on_demand_ordering_init.cpp index 00214a0dcd..9dd31ad78e 100644 --- a/irohad/main/impl/on_demand_ordering_init.cpp +++ b/irohad/main/impl/on_demand_ordering_init.cpp @@ -65,6 +65,8 @@ namespace iroha { std::shared_ptr> async_call, std::shared_ptr proposal_transport_factory, + std::shared_ptr + batch_resend_strategy, std::chrono::milliseconds delay, std::vector initial_hashes, const logger::LoggerManagerTreePtr &ordering_log_manager) { @@ -196,6 +198,7 @@ namespace iroha { delay, ordering_log_manager), peers, + std::move(batch_resend_strategy), ordering_log_manager->getChild("ConnectionManager")->getLogger()); } @@ -206,6 +209,8 @@ namespace iroha { std::shared_ptr proposal_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, size_t max_number_of_transactions, @@ -252,6 +257,7 @@ namespace iroha { std::move(cache), std::move(proposal_factory), std::move(tx_cache), + std::move(batch_resend_strategy), max_number_of_transactions, ordering_log_manager->getChild("Gate")->getLogger()); } @@ -292,6 +298,8 @@ namespace iroha { proposal_factory, std::shared_ptr proposal_transport_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, logger::LoggerManagerTreePtr ordering_log_manager) { @@ -310,12 +318,14 @@ namespace iroha { createConnectionManager(std::move(peer_query_factory), std::move(async_call), std::move(proposal_transport_factory), + batch_resend_strategy, delay, std::move(initial_hashes), ordering_log_manager), std::make_shared(), std::move(proposal_factory), std::move(tx_cache), + batch_resend_strategy, std::move(delay_func), max_number_of_transactions, ordering_log_manager); diff --git a/irohad/main/impl/on_demand_ordering_init.hpp b/irohad/main/impl/on_demand_ordering_init.hpp index 554ae4d7ef..340873fd03 100644 --- a/irohad/main/impl/on_demand_ordering_init.hpp +++ b/irohad/main/impl/on_demand_ordering_init.hpp @@ -19,6 +19,7 @@ #include "ordering.grpc.pb.h" #include "ordering/impl/on_demand_os_server_grpc.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" #include "ordering/on_demand_os_transport.hpp" @@ -57,6 +58,8 @@ namespace iroha { std::shared_ptr> async_call, std::shared_ptr proposal_transport_factory, + std::shared_ptr + batch_resend_strategy, std::chrono::milliseconds delay, std::vector initial_hashes, const logger::LoggerManagerTreePtr &ordering_log_manager); @@ -72,6 +75,8 @@ namespace iroha { std::shared_ptr proposal_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, size_t max_number_of_transactions, @@ -94,11 +99,11 @@ namespace iroha { OnDemandOrderingInit(logger::LoggerPtr log); ~OnDemandOrderingInit(); - /** * Initializes on-demand ordering gate and ordering sevice components * - * @param max_number_of_transactions maximum number of transaction in a proposal + * @param max_number_of_transactions maximum number of transaction in a + * proposal * @param delay timeout for ordering service response on proposal request * @param initial_hashes seeds for peer list permutations for first k * rounds they are required since hash of block i defines round i + k @@ -134,6 +139,8 @@ namespace iroha { proposal_factory, std::shared_ptr proposal_transport_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, logger::LoggerManagerTreePtr ordering_log_manager); diff --git a/irohad/ordering/CMakeLists.txt b/irohad/ordering/CMakeLists.txt index 2efd5c8fab..f1aa110bb3 100644 --- a/irohad/ordering/CMakeLists.txt +++ b/irohad/ordering/CMakeLists.txt @@ -54,6 +54,7 @@ add_library(on_demand_ordering_gate impl/on_demand_ordering_gate.cpp impl/ordering_gate_cache/ordering_gate_cache.cpp impl/ordering_gate_cache/on_demand_cache.cpp + impl/ordering_gate_cache/on_demand_resend_strategy.cpp ) target_link_libraries(on_demand_ordering_gate on_demand_common diff --git a/irohad/ordering/impl/on_demand_connection_manager.cpp b/irohad/ordering/impl/on_demand_connection_manager.cpp index 6a4e98e77d..101e57203d 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.cpp +++ b/irohad/ordering/impl/on_demand_connection_manager.cpp @@ -7,6 +7,7 @@ #include #include "interfaces/iroha_internal/proposal.hpp" +#include "interfaces/iroha_internal/transaction_batch_impl.hpp" #include "logger/logger.hpp" #include "ordering/impl/on_demand_common.hpp" @@ -16,9 +17,11 @@ using namespace iroha::ordering; OnDemandConnectionManager::OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log) : log_(std::move(log)), factory_(std::move(factory)), + batch_resend_strategy_(std::move(batch_resend_strategy)), subscription_(peers.subscribe([this](const auto &peers) { // exclusive lock std::lock_guard lock(mutex_); @@ -30,8 +33,12 @@ OnDemandConnectionManager::OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, CurrentPeers initial_peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log) - : OnDemandConnectionManager(std::move(factory), peers, std::move(log)) { + : OnDemandConnectionManager(std::move(factory), + peers, + std::move(batch_resend_strategy), + std::move(log)) { // using start_with(initial_peers) results in deadlock initializeConnections(initial_peers); } @@ -55,10 +62,41 @@ void OnDemandConnectionManager::onBatches(CollectionType batches) { * RejectReject CommitReject RejectCommit CommitCommit */ - connections_.peers[kRejectRejectConsumer]->onBatches(batches); - connections_.peers[kRejectCommitConsumer]->onBatches(batches); - connections_.peers[kCommitRejectConsumer]->onBatches(batches); - connections_.peers[kCommitCommitConsumer]->onBatches(batches); + CollectionType reject_reject_batches{}; + CollectionType reject_commit_batches{}; + CollectionType commit_reject_batches{}; + CollectionType commit_commit_batches{}; + + for (const auto &batch : batches) { + auto rounds = batch_resend_strategy_->extract(batch); + auto current_round = batch_resend_strategy_->getCurrentRound(); + + if (rounds.find(nextRejectRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_commit_batches.push_back(batch); + } + if (rounds.find(nextRejectRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_commit_batches.push_back(batch); + } + } + + connections_.peers[kRejectRejectConsumer]->onBatches( + std::move(reject_reject_batches)); + connections_.peers[kRejectCommitConsumer]->onBatches( + std::move(reject_commit_batches)); + connections_.peers[kCommitRejectConsumer]->onBatches( + std::move(commit_reject_batches)); + connections_.peers[kCommitCommitConsumer]->onBatches( + std::move(commit_commit_batches)); } boost::optional> diff --git a/irohad/ordering/impl/on_demand_connection_manager.hpp b/irohad/ordering/impl/on_demand_connection_manager.hpp index 2d8206ac2c..d874dcda1b 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.hpp +++ b/irohad/ordering/impl/on_demand_connection_manager.hpp @@ -12,6 +12,7 @@ #include #include "logger/logger_fwd.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" namespace iroha { namespace ordering { @@ -53,12 +54,14 @@ namespace iroha { OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log); OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, CurrentPeers initial_peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log); ~OnDemandConnectionManager() override; @@ -85,6 +88,7 @@ namespace iroha { logger::LoggerPtr log_; std::shared_ptr factory_; + std::shared_ptr batch_resend_strategy_; rxcpp::composite_subscription subscription_; CurrentConnections connections_; diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index feeb64d8e7..ed13aac930 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -42,6 +42,7 @@ OnDemandOrderingGate::OnDemandOrderingGate( std::shared_ptr cache, std::shared_ptr factory, std::shared_ptr tx_cache, + std::shared_ptr batch_resend_strategy, size_t transaction_limit, logger::LoggerPtr log) : log_(std::move(log)), @@ -60,6 +61,8 @@ OnDemandOrderingGate::OnDemandOrderingGate( this->sendCachedTransactions(event); + batch_resend_strategy_->setCurrentRound(current_round); + // request proposal for the current round auto proposal = this->processProposalRequest( network_client_->onRequestProposal(current_round)); @@ -69,7 +72,8 @@ OnDemandOrderingGate::OnDemandOrderingGate( })), cache_(std::move(cache)), proposal_factory_(std::move(factory)), - tx_cache_(std::move(tx_cache)) {} + tx_cache_(std::move(tx_cache)), + batch_resend_strategy_(std::move(batch_resend_strategy)) {} OnDemandOrderingGate::~OnDemandOrderingGate() { events_subscription_.unsubscribe(); @@ -79,6 +83,8 @@ void OnDemandOrderingGate::propagateBatch( std::shared_ptr batch) { cache_->addToBack({batch}); + batch_resend_strategy_->feed(batch); + network_client_->onBatches( transport::OdOsNotification::CollectionType{batch}); } @@ -109,6 +115,7 @@ void OnDemandOrderingGate::sendCachedTransactions( [this](const BlockEvent &block_event) { // block committed, remove transactions from cache cache_->remove(block_event.hashes); + batch_resend_strategy_->remove(block_event.hashes); }, [](const EmptyEvent &) { // no blocks committed, no transactions to remove @@ -117,6 +124,10 @@ void OnDemandOrderingGate::sendCachedTransactions( auto batches = cache_->pop(); cache_->addToBack(batches); + for (const auto &batch : batches) { + batch_resend_strategy_->readyToUse(batch); + } + // get only transactions which fit to next proposal auto end_iterator = batches.begin(); auto current_number_of_transactions = 0u; diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index d2a5592636..d133a24cf2 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -17,6 +17,7 @@ #include "interfaces/iroha_internal/unsafe_proposal_factory.hpp" #include "logger/logger_fwd.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" namespace iroha { @@ -66,6 +67,7 @@ namespace iroha { std::shared_ptr factory, std::shared_ptr tx_cache, + std::shared_ptr batch_resend_strategy, size_t transaction_limit, logger::LoggerPtr log); @@ -109,6 +111,7 @@ namespace iroha { std::shared_ptr tx_cache_; rxcpp::subjects::subject proposal_notifier_; + std::shared_ptr batch_resend_strategy_; }; } // namespace ordering diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp new file mode 100644 index 0000000000..f4b9a4bc04 --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp @@ -0,0 +1,106 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" + +#include +#include +#include "interfaces/iroha_internal/transaction_batch.hpp" +#include "interfaces/transaction.hpp" +#include "ordering/impl/on_demand_common.hpp" + +using namespace iroha::ordering; + +bool OnDemandResendStrategy::feed( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + auto res = + sent_batches_.emplace(batch, std::make_pair(current_round_, false)); + return res.second; +} + +bool OnDemandResendStrategy::readyToUse( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + auto batch_found = std::find_if( + sent_batches_.begin(), sent_batches_.end(), [&batch](auto saved_batch) { + return *(saved_batch.first) == *batch; + }); + if (batch_found != sent_batches_.end()) { + batch_found->second.second = true; + return true; + } + return false; +} + +OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + std::set valid_rounds{ + nextCommitRound(nextCommitRound(current_round_)), + nextCommitRound(nextRejectRound(current_round_)), + nextRejectRound(nextCommitRound(current_round_)), + nextRejectRound(nextRejectRound(current_round_))}; + + auto saved_round_iterator = std::find_if( + sent_batches_.begin(), sent_batches_.end(), [&batch](auto saved_batch) { + return *(saved_batch.first) == *batch; + }); + if (saved_round_iterator == sent_batches_.end()) { + return valid_rounds; + } + if (not saved_round_iterator->second.second) { + return valid_rounds; + } + auto saved_round = saved_round_iterator->second.first; + + if (nextCommitRound(nextCommitRound(saved_round)) == current_round_) { + // do nothing + } else if (nextRejectRound(nextCommitRound(saved_round)) == current_round_) { + valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); + } else if (nextCommitRound(nextRejectRound(saved_round)) == current_round_) { + valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); + } else if (nextRejectRound(nextRejectRound(saved_round)) == current_round_) { + valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); + valid_rounds.erase(nextRejectRound(nextCommitRound(current_round_))); + valid_rounds.erase(nextCommitRound(nextCommitRound(current_round_))); + } else { + BOOST_ASSERT_MSG( + false, + "Current round can't be reached after 2 rounds from the saved one"); + } + + saved_round_iterator->second.first = current_round_; + + return valid_rounds; +} + +void OnDemandResendStrategy::remove( + const cache::OrderingGateCache::HashesSetType &hashes) { + std::shared_lock lock(access_mutex_); + for (auto it = sent_batches_.begin(); it != sent_batches_.end();) { + cache::OrderingGateCache::HashesSetType restored_hashes{}; + for (auto tx : it->first->transactions()) { + restored_hashes.insert(tx->hash()); + } + if (hashes == restored_hashes) { + sent_batches_.erase(it); + return; + } else { + ++it; + } + } +} + +void OnDemandResendStrategy::setCurrentRound( + const iroha::consensus::Round ¤t_round) { + std::shared_lock lock(access_mutex_); + current_round_ = current_round; +} + +iroha::consensus::Round OnDemandResendStrategy::getCurrentRound() const { + std::shared_lock lock(access_mutex_); + return current_round_; +} diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp new file mode 100644 index 0000000000..f0bcb63d48 --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp @@ -0,0 +1,46 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_ON_DEMAND_RESEND_STRATEGY_HPP +#define IROHA_ON_DEMAND_RESEND_STRATEGY_HPP + +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" + +#include +#include + +namespace iroha { + namespace ordering { + class OnDemandResendStrategy : public OrderingGateResendStrategy { + public: + bool feed(std::shared_ptr + batch) override; + + bool readyToUse(std::shared_ptr + batch) override; + + RoundSetType extract( + std::shared_ptr batch) + override; + + void remove( + const cache::OrderingGateCache::HashesSetType &hashes) override; + + void setCurrentRound(const consensus::Round ¤t_round) override; + + consensus::Round getCurrentRound() const override; + + private: + std::unordered_map< + std::shared_ptr, + std::pair> + sent_batches_; + consensus::Round current_round_; + mutable std::shared_timed_mutex access_mutex_; + }; + } // namespace ordering +} // namespace iroha + +#endif // IROHA_ON_DEMAND_RESEND_STRATEGY_HPP diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp new file mode 100644 index 0000000000..04f5344277 --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp @@ -0,0 +1,67 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP +#define IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP + +#include +#include "consensus/round.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" + +namespace iroha { + namespace ordering { + /** + * Strategy for batch resend interface + */ + class OrderingGateResendStrategy { + public: + /** + * Inserts new batch into strategy + * @param batch - batch to insert + * @return true if operation success + */ + virtual bool feed( + std::shared_ptr batch) = 0; + + /** + * Get inserted before batch ready to extract + * @param batch - batch to get ready + * @return false if batch doesn't exist + */ + virtual bool readyToUse( + std::shared_ptr batch) = 0; + + using RoundSetType = std::set; + + /** + * Returns collection of interested future rounds for inserted and ready + * batch + * @param batch - batch to examine + */ + virtual RoundSetType extract( + std::shared_ptr batch) = 0; + + /** + * Removes batch with given hashes + * @param hashes to remove batch with + */ + virtual void remove( + const cache::OrderingGateCache::HashesSetType &hashes) = 0; + + /** + * Sets current round to a given one + */ + virtual void setCurrentRound(const consensus::Round ¤t_round) = 0; + + /** + * Returns saved round + */ + virtual consensus::Round getCurrentRound() const = 0; + }; + + } // namespace ordering +} // namespace iroha + +#endif // IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP diff --git a/test/module/irohad/ordering/CMakeLists.txt b/test/module/irohad/ordering/CMakeLists.txt index 9ab4475f02..28192a4eaf 100644 --- a/test/module/irohad/ordering/CMakeLists.txt +++ b/test/module/irohad/ordering/CMakeLists.txt @@ -40,3 +40,9 @@ target_link_libraries(on_demand_cache_test on_demand_ordering_gate shared_model_interfaces_factories ) + +addtest(on_demand_resend_strategy_test on_demand_resend_strategy_test.cpp) +target_link_libraries(on_demand_resend_strategy_test + on_demand_ordering_gate + shared_model_interfaces_factories + ) diff --git a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp index 3fd78165b3..27f24e110d 100644 --- a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp +++ b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp @@ -33,6 +33,7 @@ ACTION_P(CreateAndSave, var) { struct OnDemandConnectionManagerTest : public ::testing::Test { void SetUp() override { factory = std::make_shared(); + strategy = std::make_shared(); auto set = [this](auto &field, auto &ptr) { field = std::make_shared(); @@ -49,6 +50,7 @@ struct OnDemandConnectionManagerTest : public ::testing::Test { factory, peers.get_observable(), cpeers, + strategy, getTestLogger("OsConnectionManager")); } @@ -58,6 +60,7 @@ struct OnDemandConnectionManagerTest : public ::testing::Test { rxcpp::subjects::subject peers; std::shared_ptr factory; + std::shared_ptr strategy; std::shared_ptr manager; }; diff --git a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp index e73a179559..6c2d522b97 100644 --- a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp +++ b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp @@ -41,6 +41,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { auto ufactory = std::make_unique>(); factory = ufactory.get(); tx_cache = std::make_shared(); + strategy = std::make_shared(); ON_CALL(*tx_cache, check(testing::Matcher(_))) .WillByDefault( @@ -53,6 +54,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { cache, std::move(ufactory), tx_cache, + strategy, 1000, getTestLogger("OrderingGate")); } @@ -62,6 +64,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { std::shared_ptr notification; NiceMock *factory; std::shared_ptr tx_cache; + std::shared_ptr strategy; std::shared_ptr ordering_gate; std::shared_ptr cache; diff --git a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp new file mode 100644 index 0000000000..e7dc0621f7 --- /dev/null +++ b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp @@ -0,0 +1,205 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" + +#include +#include "module/shared_model/interface_mocks.hpp" +#include "ordering/impl/on_demand_common.hpp" + +using namespace iroha::ordering; +using namespace iroha::consensus; + +using ::testing::_; +using ::testing::Return; + +/** + * @given OnDemandResendStrategy instance + * @when same batch is fed to the instance twice + * @then first feeding succeeds, second one fails + */ +TEST(OnDemandResendStrategyTest, Feed) { + OnDemandResendStrategy strategy; + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + ASSERT_TRUE(strategy.feed(batch)); + ASSERT_FALSE(strategy.feed(batch)); +} + +/** + * @given OnDemandResendStrategy instance + * @when readyToUse is called before and after batch is fed + * @then first call fails, second succeeds + */ +TEST(OnDemandResendStrategyTest, ReadyToUse) { + OnDemandResendStrategy strategy; + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + ASSERT_FALSE(strategy.readyToUse(batch)); + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); + ASSERT_TRUE(strategy.readyToUse(batch)); +} + +/** + * @given OnDemandResendStrategy instance + * @when extract is called without any other prior calls + * @then all possible future rounds are returned + */ +TEST(OnDemandResendStrategyTest, ExtractNonExisting) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + OnDemandResendStrategy::RoundSetType rounds{ + nextCommitRound(nextCommitRound(round)), + nextCommitRound(nextRejectRound(round)), + nextRejectRound(nextCommitRound(round)), + nextRejectRound(nextRejectRound(round))}; + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed and then extract are called for the same batch + * @then all possible future rounds are returned + */ +TEST(OnDemandResendStrategyTest, ExtractNonReady) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + OnDemandResendStrategy::RoundSetType rounds{ + nextCommitRound(nextCommitRound(round)), + nextCommitRound(nextRejectRound(round)), + nextRejectRound(nextCommitRound(round)), + nextRejectRound(nextRejectRound(round))}; + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for 2 commits to + * future and then extract is called for the same batch + * @then all possible future rounds are returned + */ +TEST(OnDemandResendStrategyTest, ExtractCommitCommit) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy.readyToUse(batch); + round = Round(3, 0); + strategy.setCurrentRound(round); + OnDemandResendStrategy::RoundSetType rounds{ + nextCommitRound(nextCommitRound(round)), + nextCommitRound(nextRejectRound(round)), + nextRejectRound(nextCommitRound(round)), + nextRejectRound(nextRejectRound(round))}; + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (commit, reject) + * to future and then extract is called for the same batch + * @then all rounds except (reject, commit) are returned + */ +TEST(OnDemandResendStrategyTest, ExtractCommitReject) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy.readyToUse(batch); + round = Round(2, 1); + strategy.setCurrentRound(round); + OnDemandResendStrategy::RoundSetType rounds{ + nextCommitRound(nextCommitRound(round)), + nextRejectRound(nextCommitRound(round)), + nextRejectRound(nextRejectRound(round))}; + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (reject, commit) + * to future and then extract is called for the same batch + * @then all rounds except (reject, commit) are returned + */ +TEST(OnDemandResendStrategyTest, ExtractRejectCommit) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy.readyToUse(batch); + round = Round(2, 0); + strategy.setCurrentRound(round); + OnDemandResendStrategy::RoundSetType rounds{ + nextCommitRound(nextCommitRound(round)), + nextRejectRound(nextCommitRound(round)), + nextRejectRound(nextRejectRound(round))}; + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (reject, reject) + * to future and then extract is called for the same batch + * @then (reject, reject) are returned + */ +TEST(OnDemandResendStrategyTest, ExtractRejectReject) { + Round round(1, 1); + OnDemandResendStrategy strategy; + strategy.setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy.feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy.readyToUse(batch); + round = Round(1, 3); + strategy.setCurrentRound(round); + OnDemandResendStrategy::RoundSetType rounds{ + nextRejectRound(nextRejectRound(round))}; + ASSERT_EQ(strategy.extract(batch), rounds); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, remove and feed are called for the same batch + * @then both feed calls succeed + */ +TEST(OnDemandResendStrategyTest, Remove) { + shared_model::interface::types::HashType hash("hash"); + auto tx = createMockTransactionWithHash(hash); + auto batch = createMockBatchWithTransactions({tx}, ""); + OnDemandResendStrategy strategy; + ASSERT_TRUE(strategy.feed(batch)); + strategy.remove({hash}); + ASSERT_TRUE(strategy.feed(batch)); +} + +/** + * @given OnDemandResendStrategy instance + * @when setCurrentRound for (1, 1) is called + * @then getCurrentRound returns (1, 1) + */ +TEST(OnDemandResendStrategyTest, SetGetRound) { + OnDemandResendStrategy strategy; + strategy.setCurrentRound(Round(1, 1)); + ASSERT_EQ(strategy.getCurrentRound(), Round(1, 1)); +} diff --git a/test/module/irohad/ordering/ordering_mocks.hpp b/test/module/irohad/ordering/ordering_mocks.hpp index c867a72768..8acb64f1a9 100644 --- a/test/module/irohad/ordering/ordering_mocks.hpp +++ b/test/module/irohad/ordering/ordering_mocks.hpp @@ -10,6 +10,7 @@ #include "module/irohad/ordering/mock_on_demand_os_notification.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" #include "ordering/on_demand_os_transport.hpp" @@ -45,6 +46,23 @@ namespace iroha { MOCK_METHOD1(onCollaborationOutcome, void(consensus::Round)); }; + struct MockOrderingGateResendStrategy : public OrderingGateResendStrategy { + MOCK_METHOD1( + feed, + bool(std::shared_ptr)); + MOCK_METHOD1( + readyToUse, + bool(std::shared_ptr)); + MOCK_METHOD1( + extract, + std::set( + std::shared_ptr)); + MOCK_METHOD1(remove, + void(const cache::OrderingGateCache::HashesSetType &hashes)); + MOCK_METHOD1(setCurrentRound, void(const consensus::Round &)); + MOCK_CONST_METHOD0(getCurrentRound, consensus::Round()); + }; + } // namespace ordering } // namespace iroha From 328e888b586f757a14723a8b02fcba944f628e51 Mon Sep 17 00:00:00 2001 From: Konstantin Munichev Date: Thu, 4 Apr 2019 18:01:27 +0300 Subject: [PATCH 2/4] Fix review issues Signed-off-by: Konstantin Munichev --- .../on_demand_resend_strategy.cpp | 14 +++----------- .../on_demand_resend_strategy.hpp | 4 +++- .../ordering_gate_cache/ordering_gate_cache.cpp | 7 +++++++ .../ordering_gate_cache/ordering_gate_cache.hpp | 14 ++++++++++++-- .../ordering/on_demand_resend_strategy_test.cpp | 1 + 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp index f4b9a4bc04..24abeed9b8 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp @@ -24,10 +24,7 @@ bool OnDemandResendStrategy::feed( bool OnDemandResendStrategy::readyToUse( std::shared_ptr batch) { std::shared_lock lock(access_mutex_); - auto batch_found = std::find_if( - sent_batches_.begin(), sent_batches_.end(), [&batch](auto saved_batch) { - return *(saved_batch.first) == *batch; - }); + auto batch_found = sent_batches_.find(batch); if (batch_found != sent_batches_.end()) { batch_found->second.second = true; return true; @@ -44,10 +41,7 @@ OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( nextRejectRound(nextCommitRound(current_round_)), nextRejectRound(nextRejectRound(current_round_))}; - auto saved_round_iterator = std::find_if( - sent_batches_.begin(), sent_batches_.end(), [&batch](auto saved_batch) { - return *(saved_batch.first) == *batch; - }); + auto saved_round_iterator = sent_batches_.find(batch); if (saved_round_iterator == sent_batches_.end()) { return valid_rounds; } @@ -67,9 +61,7 @@ OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( valid_rounds.erase(nextRejectRound(nextCommitRound(current_round_))); valid_rounds.erase(nextCommitRound(nextCommitRound(current_round_))); } else { - BOOST_ASSERT_MSG( - false, - "Current round can't be reached after 2 rounds from the saved one"); + // do nothing } saved_round_iterator->second.first = current_round_; diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp index f0bcb63d48..d0c4e629b6 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp @@ -35,7 +35,9 @@ namespace iroha { private: std::unordered_map< std::shared_ptr, - std::pair> + std::pair, + cache::OrderingGateCache::BatchPointerHasher, + cache::OrderingGateCache::BatchPointerComparator> sent_batches_; consensus::Round current_round_; mutable std::shared_timed_mutex access_mutex_; diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp index f884eb8767..2d7e7d599d 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp @@ -17,6 +17,13 @@ namespace iroha { return hasher_(a->reducedHash()); } + bool OrderingGateCache::BatchPointerComparator::operator()( + const std::shared_ptr &l, + const std::shared_ptr &r) + const { + return *l == *r; + } + } // namespace cache } // namespace ordering } // namespace iroha diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp index e949a1a502..3fa9dced1b 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp @@ -24,7 +24,7 @@ namespace iroha { * Cache for transactions sent to ordering gate */ class OrderingGateCache { - private: + public: /** * Hasher for the shared pointer on the batch. Uses batch's reduced hash */ @@ -36,7 +36,17 @@ namespace iroha { &a) const; }; - public: + /** + * Comparator for shared_ptr + */ + struct BatchPointerComparator { + bool operator()( + const std::shared_ptr + &l, + const std::shared_ptr + &r) const; + }; + /// type of the element in cache container. Set is used as it allows to /// remove batch from BatchSet with O(1) complexity, which is the case /// in remove method diff --git a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp index e7dc0621f7..1fd20bab2c 100644 --- a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp +++ b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp @@ -24,6 +24,7 @@ TEST(OnDemandResendStrategyTest, Feed) { OnDemandResendStrategy strategy; shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); + EXPECT_CALL(*batch, Equals(_)).Times(1).WillRepeatedly(Return(true)); ASSERT_TRUE(strategy.feed(batch)); ASSERT_FALSE(strategy.feed(batch)); } From 978421da925f4d179646f3ddda5f6d68c71719c9 Mon Sep 17 00:00:00 2001 From: Konstantin Munichev Date: Thu, 11 Apr 2019 22:56:48 +0300 Subject: [PATCH 3/4] Use set_difference in extract function Signed-off-by: Konstantin Munichev --- .../on_demand_resend_strategy.cpp | 46 +++++++++---------- .../on_demand_resend_strategy.hpp | 2 + 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp index 24abeed9b8..f881132ad9 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp @@ -35,38 +35,27 @@ bool OnDemandResendStrategy::readyToUse( OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( std::shared_ptr batch) { std::shared_lock lock(access_mutex_); - std::set valid_rounds{ - nextCommitRound(nextCommitRound(current_round_)), - nextCommitRound(nextRejectRound(current_round_)), - nextRejectRound(nextCommitRound(current_round_)), - nextRejectRound(nextRejectRound(current_round_))}; + auto reachable_from_current = reachableInTwoRounds(current_round_); auto saved_round_iterator = sent_batches_.find(batch); if (saved_round_iterator == sent_batches_.end()) { - return valid_rounds; + return reachable_from_current; } if (not saved_round_iterator->second.second) { - return valid_rounds; - } - auto saved_round = saved_round_iterator->second.first; - - if (nextCommitRound(nextCommitRound(saved_round)) == current_round_) { - // do nothing - } else if (nextRejectRound(nextCommitRound(saved_round)) == current_round_) { - valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); - } else if (nextCommitRound(nextRejectRound(saved_round)) == current_round_) { - valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); - } else if (nextRejectRound(nextRejectRound(saved_round)) == current_round_) { - valid_rounds.erase(nextCommitRound(nextRejectRound(current_round_))); - valid_rounds.erase(nextRejectRound(nextCommitRound(current_round_))); - valid_rounds.erase(nextCommitRound(nextCommitRound(current_round_))); - } else { - // do nothing + return reachable_from_current; } + auto saved_round = saved_round_iterator->second.first; saved_round_iterator->second.first = current_round_; - return valid_rounds; + auto reachable_from_saved = reachableInTwoRounds(saved_round); + RoundSetType target_rounds{}; + std::set_difference(reachable_from_current.begin(), + reachable_from_current.end(), + reachable_from_saved.begin(), + reachable_from_saved.end(), + std::inserter(target_rounds, target_rounds.begin())); + return target_rounds; } void OnDemandResendStrategy::remove( @@ -96,3 +85,14 @@ iroha::consensus::Round OnDemandResendStrategy::getCurrentRound() const { std::shared_lock lock(access_mutex_); return current_round_; } + +OnDemandResendStrategy::RoundSetType +OnDemandResendStrategy::reachableInTwoRounds( + const consensus::Round &round) const { + RoundSetType reachable_rounds{}; + reachable_rounds.insert(nextCommitRound(nextCommitRound(round))); + reachable_rounds.insert(nextCommitRound(nextRejectRound(round))); + reachable_rounds.insert(nextRejectRound(nextCommitRound(round))); + reachable_rounds.insert(nextRejectRound(nextRejectRound(round))); + return reachable_rounds; +} diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp index d0c4e629b6..027ea112a1 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp @@ -33,6 +33,8 @@ namespace iroha { consensus::Round getCurrentRound() const override; private: + RoundSetType reachableInTwoRounds(const consensus::Round &round) const; + std::unordered_map< std::shared_ptr, std::pair, From f1e13e68f4c56fd3b4742f102d364695de166084 Mon Sep 17 00:00:00 2001 From: Konstantin Munichev Date: Sat, 13 Apr 2019 19:16:49 +0300 Subject: [PATCH 4/4] Fix more review issues Signed-off-by: Konstantin Munichev --- .../impl/on_demand_connection_manager.cpp | 49 +--- .../impl/on_demand_connection_manager.hpp | 5 +- .../on_demand_resend_strategy.cpp | 109 +++++-- .../on_demand_resend_strategy.hpp | 18 +- .../ordering_gate_resend_strategy.hpp | 17 +- .../on_demand_connection_manager_test.cpp | 11 +- .../on_demand_resend_strategy_test.cpp | 269 +++++++++++------- .../module/irohad/ordering/ordering_mocks.hpp | 8 +- 8 files changed, 285 insertions(+), 201 deletions(-) diff --git a/irohad/ordering/impl/on_demand_connection_manager.cpp b/irohad/ordering/impl/on_demand_connection_manager.cpp index 101e57203d..b80dcbb998 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.cpp +++ b/irohad/ordering/impl/on_demand_connection_manager.cpp @@ -10,6 +10,7 @@ #include "interfaces/iroha_internal/transaction_batch_impl.hpp" #include "logger/logger.hpp" #include "ordering/impl/on_demand_common.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" using namespace iroha; using namespace iroha::ordering; @@ -49,54 +50,8 @@ OnDemandConnectionManager::~OnDemandConnectionManager() { void OnDemandConnectionManager::onBatches(CollectionType batches) { std::shared_lock lock(mutex_); - /* - * Transactions are always sent to the round after the next round (+2) - * There are 4 possibilities - all combinations of commits and rejects in the - * following two rounds. This can be visualised as a diagram, where: o - - * current round, x - next round, v - target round - * - * 0 1 2 0 1 2 0 1 2 0 1 2 - * 0 o x v 0 o . . 0 o x . 0 o . . - * 1 . . . 1 x v . 1 v . . 1 x . . - * 2 . . . 2 . . . 2 . . . 2 v . . - * RejectReject CommitReject RejectCommit CommitCommit - */ - CollectionType reject_reject_batches{}; - CollectionType reject_commit_batches{}; - CollectionType commit_reject_batches{}; - CollectionType commit_commit_batches{}; - - for (const auto &batch : batches) { - auto rounds = batch_resend_strategy_->extract(batch); - auto current_round = batch_resend_strategy_->getCurrentRound(); - - if (rounds.find(nextRejectRound(nextRejectRound(current_round))) - != rounds.end()) { - reject_reject_batches.push_back(batch); - } - if (rounds.find(nextCommitRound(nextRejectRound(current_round))) - != rounds.end()) { - reject_commit_batches.push_back(batch); - } - if (rounds.find(nextRejectRound(nextCommitRound(current_round))) - != rounds.end()) { - commit_reject_batches.push_back(batch); - } - if (rounds.find(nextCommitRound(nextCommitRound(current_round))) - != rounds.end()) { - commit_commit_batches.push_back(batch); - } - } - - connections_.peers[kRejectRejectConsumer]->onBatches( - std::move(reject_reject_batches)); - connections_.peers[kRejectCommitConsumer]->onBatches( - std::move(reject_commit_batches)); - connections_.peers[kCommitRejectConsumer]->onBatches( - std::move(commit_reject_batches)); - connections_.peers[kCommitCommitConsumer]->onBatches( - std::move(commit_commit_batches)); + batch_resend_strategy_->sendBatches(batches, connections_); } boost::optional> diff --git a/irohad/ordering/impl/on_demand_connection_manager.hpp b/irohad/ordering/impl/on_demand_connection_manager.hpp index d874dcda1b..d4ce9d5ed8 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.hpp +++ b/irohad/ordering/impl/on_demand_connection_manager.hpp @@ -12,11 +12,12 @@ #include #include "logger/logger_fwd.hpp" -#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" namespace iroha { namespace ordering { + class OrderingGateResendStrategy; + /** * Proxy class which redirects requests to appropriate peers */ @@ -71,7 +72,6 @@ namespace iroha { boost::optional> onRequestProposal( consensus::Round round) override; - private: /** * Corresponding connections created by OdOsNotificationFactory * @see PeerType for individual descriptions @@ -80,6 +80,7 @@ namespace iroha { PeerCollectionType> peers; }; + private: /** * Initialize corresponding peers in connections_ using factory_ * @param peers to initialize connections with diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp index f881132ad9..08715c15fa 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp @@ -32,6 +32,92 @@ bool OnDemandResendStrategy::readyToUse( return false; } +void OnDemandResendStrategy::remove( + const cache::OrderingGateCache::HashesSetType &hashes) { + std::shared_lock lock(access_mutex_); + for (auto it = sent_batches_.begin(); it != sent_batches_.end();) { + cache::OrderingGateCache::HashesSetType restored_hashes{}; + for (auto tx : it->first->transactions()) { + restored_hashes.insert(tx->hash()); + } + if (hashes == restored_hashes) { + sent_batches_.erase(it); + return; + } else { + ++it; + } + } +} + +void OnDemandResendStrategy::setCurrentRound( + const iroha::consensus::Round ¤t_round) { + std::shared_lock lock(access_mutex_); + current_round_ = current_round; +} + +void OnDemandResendStrategy::sendBatches( + OnDemandConnectionManager::CollectionType batches, + const iroha::ordering::OnDemandConnectionManager::CurrentConnections + &connections) { + /* + * Transactions are always sent to the round after the next round (+2) + * There are 4 possibilities - all combinations of commits and rejects in + the + * following two rounds. This can be visualised as a diagram, where: o - + * current round, x - next round, v - target round + * + * 0 1 2 0 1 2 0 1 2 0 1 2 + * 0 o x v 0 o . . 0 o x . 0 o . . + * 1 . . . 1 x v . 1 v . . 1 x . . + * 2 . . . 2 . . . 2 . . . 2 v . . + * RejectReject CommitReject RejectCommit CommitCommit + */ + + for (const auto &batch : batches) { + OnDemandConnectionManager::CollectionType reject_reject_batches{}; + OnDemandConnectionManager::CollectionType reject_commit_batches{}; + OnDemandConnectionManager::CollectionType commit_reject_batches{}; + OnDemandConnectionManager::CollectionType commit_commit_batches{}; + + auto rounds = extract(batch); + auto current_round = getCurrentRound(); + + if (rounds.find(nextRejectRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_commit_batches.push_back(batch); + } + if (rounds.find(nextRejectRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_commit_batches.push_back(batch); + } + + if (not reject_reject_batches.empty()) { + connections.peers[OnDemandConnectionManager::kRejectRejectConsumer] + ->onBatches(std::move(reject_reject_batches)); + } + if (not reject_commit_batches.empty()) { + connections.peers[OnDemandConnectionManager::kRejectCommitConsumer] + ->onBatches(std::move(reject_commit_batches)); + } + if (not commit_reject_batches.empty()) { + connections.peers[OnDemandConnectionManager::kCommitRejectConsumer] + ->onBatches(std::move(commit_reject_batches)); + } + if (not commit_commit_batches.empty()) { + connections.peers[OnDemandConnectionManager::kCommitCommitConsumer] + ->onBatches(std::move(commit_commit_batches)); + } + } +} + OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( std::shared_ptr batch) { std::shared_lock lock(access_mutex_); @@ -58,29 +144,6 @@ OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( return target_rounds; } -void OnDemandResendStrategy::remove( - const cache::OrderingGateCache::HashesSetType &hashes) { - std::shared_lock lock(access_mutex_); - for (auto it = sent_batches_.begin(); it != sent_batches_.end();) { - cache::OrderingGateCache::HashesSetType restored_hashes{}; - for (auto tx : it->first->transactions()) { - restored_hashes.insert(tx->hash()); - } - if (hashes == restored_hashes) { - sent_batches_.erase(it); - return; - } else { - ++it; - } - } -} - -void OnDemandResendStrategy::setCurrentRound( - const iroha::consensus::Round ¤t_round) { - std::shared_lock lock(access_mutex_); - current_round_ = current_round; -} - iroha::consensus::Round OnDemandResendStrategy::getCurrentRound() const { std::shared_lock lock(access_mutex_); return current_round_; diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp index 027ea112a1..72253b27b5 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp @@ -21,18 +21,26 @@ namespace iroha { bool readyToUse(std::shared_ptr batch) override; - RoundSetType extract( - std::shared_ptr batch) - override; - void remove( const cache::OrderingGateCache::HashesSetType &hashes) override; void setCurrentRound(const consensus::Round ¤t_round) override; - consensus::Round getCurrentRound() const override; + void sendBatches(OnDemandConnectionManager::CollectionType batches, + const OnDemandConnectionManager::CurrentConnections + &connections) override; private: + /** + * Returns collection of interested future rounds for inserted and ready + * batch + * @param batch - batch to examine + */ + RoundSetType extract( + std::shared_ptr batch); + + consensus::Round getCurrentRound() const; + RoundSetType reachableInTwoRounds(const consensus::Round &round) const; std::unordered_map< diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp index 04f5344277..517899d985 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp @@ -8,6 +8,7 @@ #include #include "consensus/round.hpp" +#include "ordering/impl/on_demand_connection_manager.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" namespace iroha { @@ -35,14 +36,6 @@ namespace iroha { using RoundSetType = std::set; - /** - * Returns collection of interested future rounds for inserted and ready - * batch - * @param batch - batch to examine - */ - virtual RoundSetType extract( - std::shared_ptr batch) = 0; - /** * Removes batch with given hashes * @param hashes to remove batch with @@ -56,9 +49,13 @@ namespace iroha { virtual void setCurrentRound(const consensus::Round ¤t_round) = 0; /** - * Returns saved round + * Sends batches in respect the strategy + * @param batches - batches to send + * @param connections - connections to send the batch through */ - virtual consensus::Round getCurrentRound() const = 0; + virtual void sendBatches( + OnDemandConnectionManager::CollectionType batches, + const OnDemandConnectionManager::CurrentConnections &connections) = 0; }; } // namespace ordering diff --git a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp index 27f24e110d..710660b26f 100644 --- a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp +++ b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp @@ -82,16 +82,7 @@ TEST_F(OnDemandConnectionManagerTest, FactoryUsed) { */ TEST_F(OnDemandConnectionManagerTest, onBatches) { OdOsNotification::CollectionType collection; - - auto set_expect = [&](OnDemandConnectionManager::PeerType type) { - EXPECT_CALL(*connections[type], onBatches(collection)).Times(1); - }; - - set_expect(OnDemandConnectionManager::kRejectRejectConsumer); - set_expect(OnDemandConnectionManager::kRejectCommitConsumer); - set_expect(OnDemandConnectionManager::kCommitRejectConsumer); - set_expect(OnDemandConnectionManager::kCommitCommitConsumer); - + EXPECT_CALL(*strategy, sendBatches(collection, testing::_)).Times(1); manager->onBatches(collection); } diff --git a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp index 1fd20bab2c..52faafc4f8 100644 --- a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp +++ b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp @@ -6,27 +6,67 @@ #include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" #include +#include +#include "module/irohad/ordering/mock_on_demand_os_notification.hpp" +#include "module/irohad/ordering/ordering_mocks.hpp" #include "module/shared_model/interface_mocks.hpp" #include "ordering/impl/on_demand_common.hpp" +using namespace iroha; using namespace iroha::ordering; +using namespace iroha::ordering::transport; using namespace iroha::consensus; using ::testing::_; +using ::testing::ByMove; +using ::testing::Ref; using ::testing::Return; +ACTION_P(CreateAndSave, var) { + auto result = std::make_unique(); + *var = result.get(); + return std::unique_ptr(std::move(result)); +} + +struct OnDemandResendStrategyTest : public ::testing::Test { + void SetUp() override { + factory = std::make_shared(); + strategy = std::make_shared(); + + auto set = [this](auto &field, auto &ptr, auto &conn) { + field = std::make_shared(); + + EXPECT_CALL(*factory, create(Ref(*field))) + .WillRepeatedly(CreateAndSave(&ptr)); + + conn = factory->create(*field); + }; + + for (auto &&triple : + boost::combine(cpeers.peers, connections, connections_obj.peers)) { + set(boost::get<0>(triple), boost::get<1>(triple), boost::get<2>(triple)); + } + } + + OnDemandConnectionManager::CurrentConnections connections_obj; + OnDemandConnectionManager::CurrentPeers cpeers; + OnDemandConnectionManager::PeerCollectionType + connections; + std::shared_ptr factory; + std::shared_ptr strategy; +}; + /** * @given OnDemandResendStrategy instance * @when same batch is fed to the instance twice * @then first feeding succeeds, second one fails */ -TEST(OnDemandResendStrategyTest, Feed) { - OnDemandResendStrategy strategy; +TEST_F(OnDemandResendStrategyTest, Feed) { shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); EXPECT_CALL(*batch, Equals(_)).Times(1).WillRepeatedly(Return(true)); - ASSERT_TRUE(strategy.feed(batch)); - ASSERT_FALSE(strategy.feed(batch)); + ASSERT_TRUE(strategy->feed(batch)); + ASSERT_FALSE(strategy->feed(batch)); } /** @@ -34,149 +74,192 @@ TEST(OnDemandResendStrategyTest, Feed) { * @when readyToUse is called before and after batch is fed * @then first call fails, second succeeds */ -TEST(OnDemandResendStrategyTest, ReadyToUse) { - OnDemandResendStrategy strategy; +TEST_F(OnDemandResendStrategyTest, ReadyToUse) { shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - ASSERT_FALSE(strategy.readyToUse(batch)); - strategy.feed(batch); + ASSERT_FALSE(strategy->readyToUse(batch)); + strategy->feed(batch); EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); - ASSERT_TRUE(strategy.readyToUse(batch)); + ASSERT_TRUE(strategy->readyToUse(batch)); } /** * @given OnDemandResendStrategy instance - * @when extract is called without any other prior calls - * @then all possible future rounds are returned + * @when sendBatches is called without any other prior calls + * @then onBatches is called for all consumers */ -TEST(OnDemandResendStrategyTest, ExtractNonExisting) { +TEST_F(OnDemandResendStrategyTest, ExtractNonExisting) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - OnDemandResendStrategy::RoundSetType rounds{ - nextCommitRound(nextCommitRound(round)), - nextCommitRound(nextRejectRound(round)), - nextRejectRound(nextCommitRound(round)), - nextRejectRound(nextRejectRound(round))}; - ASSERT_EQ(strategy.extract(batch), rounds); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** * @given OnDemandResendStrategy instance - * @when feed and then extract are called for the same batch - * @then all possible future rounds are returned + * @when feed and then sendBatches are called for the same batch + * @then onBatches is called for all consumers */ -TEST(OnDemandResendStrategyTest, ExtractNonReady) { +TEST_F(OnDemandResendStrategyTest, ExtractNonReady) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - OnDemandResendStrategy::RoundSetType rounds{ - nextCommitRound(nextCommitRound(round)), - nextCommitRound(nextRejectRound(round)), - nextRejectRound(nextCommitRound(round)), - nextRejectRound(nextRejectRound(round))}; - strategy.feed(batch); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); - ASSERT_EQ(strategy.extract(batch), rounds); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** * @given OnDemandResendStrategy instance * @when feed, readyToUse are called, current round is set for 2 commits to - * future and then extract is called for the same batch - * @then all possible future rounds are returned + * future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers */ -TEST(OnDemandResendStrategyTest, ExtractCommitCommit) { +TEST_F(OnDemandResendStrategyTest, ExtractCommitCommit) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - strategy.feed(batch); + strategy->feed(batch); EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); - strategy.readyToUse(batch); + strategy->readyToUse(batch); round = Round(3, 0); - strategy.setCurrentRound(round); - OnDemandResendStrategy::RoundSetType rounds{ - nextCommitRound(nextCommitRound(round)), - nextCommitRound(nextRejectRound(round)), - nextRejectRound(nextCommitRound(round)), - nextRejectRound(nextRejectRound(round))}; - ASSERT_EQ(strategy.extract(batch), rounds); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** * @given OnDemandResendStrategy instance * @when feed, readyToUse are called, current round is set for (commit, reject) - * to future and then extract is called for the same batch - * @then all rounds except (reject, commit) are returned + * to future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers except (reject, commit) */ -TEST(OnDemandResendStrategyTest, ExtractCommitReject) { +TEST_F(OnDemandResendStrategyTest, ExtractCommitReject) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - strategy.feed(batch); + strategy->feed(batch); EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); - strategy.readyToUse(batch); + strategy->readyToUse(batch); round = Round(2, 1); - strategy.setCurrentRound(round); - OnDemandResendStrategy::RoundSetType rounds{ - nextCommitRound(nextCommitRound(round)), - nextRejectRound(nextCommitRound(round)), - nextRejectRound(nextRejectRound(round))}; - ASSERT_EQ(strategy.extract(batch), rounds); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** * @given OnDemandResendStrategy instance * @when feed, readyToUse are called, current round is set for (reject, commit) - * to future and then extract is called for the same batch - * @then all rounds except (reject, commit) are returned + * to future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers except (reject, commit) */ -TEST(OnDemandResendStrategyTest, ExtractRejectCommit) { +TEST_F(OnDemandResendStrategyTest, ExtractRejectCommit) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - strategy.feed(batch); + strategy->feed(batch); EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); - strategy.readyToUse(batch); + strategy->readyToUse(batch); round = Round(2, 0); - strategy.setCurrentRound(round); - OnDemandResendStrategy::RoundSetType rounds{ - nextCommitRound(nextCommitRound(round)), - nextRejectRound(nextCommitRound(round)), - nextRejectRound(nextRejectRound(round))}; - ASSERT_EQ(strategy.extract(batch), rounds); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** * @given OnDemandResendStrategy instance * @when feed, readyToUse are called, current round is set for (reject, reject) - * to future and then extract is called for the same batch - * @then (reject, reject) are returned + * to future and then sendBatches is called for the same batch + * @then onBatches is called for (reject, commit) consumer */ -TEST(OnDemandResendStrategyTest, ExtractRejectReject) { +TEST_F(OnDemandResendStrategyTest, ExtractRejectReject) { Round round(1, 1); - OnDemandResendStrategy strategy; - strategy.setCurrentRound(round); + strategy->setCurrentRound(round); shared_model::interface::types::HashType hash("hash"); auto batch = createMockBatchWithHash(hash); - strategy.feed(batch); + strategy->feed(batch); EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); - strategy.readyToUse(batch); + strategy->readyToUse(batch); round = Round(1, 3); - strategy.setCurrentRound(round); - OnDemandResendStrategy::RoundSetType rounds{ - nextRejectRound(nextRejectRound(round))}; - ASSERT_EQ(strategy.extract(batch), rounds); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); } /** @@ -184,23 +267,11 @@ TEST(OnDemandResendStrategyTest, ExtractRejectReject) { * @when feed, remove and feed are called for the same batch * @then both feed calls succeed */ -TEST(OnDemandResendStrategyTest, Remove) { +TEST_F(OnDemandResendStrategyTest, Remove) { shared_model::interface::types::HashType hash("hash"); auto tx = createMockTransactionWithHash(hash); auto batch = createMockBatchWithTransactions({tx}, ""); - OnDemandResendStrategy strategy; - ASSERT_TRUE(strategy.feed(batch)); - strategy.remove({hash}); - ASSERT_TRUE(strategy.feed(batch)); -} - -/** - * @given OnDemandResendStrategy instance - * @when setCurrentRound for (1, 1) is called - * @then getCurrentRound returns (1, 1) - */ -TEST(OnDemandResendStrategyTest, SetGetRound) { - OnDemandResendStrategy strategy; - strategy.setCurrentRound(Round(1, 1)); - ASSERT_EQ(strategy.getCurrentRound(), Round(1, 1)); + ASSERT_TRUE(strategy->feed(batch)); + strategy->remove({hash}); + ASSERT_TRUE(strategy->feed(batch)); } diff --git a/test/module/irohad/ordering/ordering_mocks.hpp b/test/module/irohad/ordering/ordering_mocks.hpp index 8acb64f1a9..3e582e2e56 100644 --- a/test/module/irohad/ordering/ordering_mocks.hpp +++ b/test/module/irohad/ordering/ordering_mocks.hpp @@ -53,14 +53,12 @@ namespace iroha { MOCK_METHOD1( readyToUse, bool(std::shared_ptr)); - MOCK_METHOD1( - extract, - std::set( - std::shared_ptr)); MOCK_METHOD1(remove, void(const cache::OrderingGateCache::HashesSetType &hashes)); MOCK_METHOD1(setCurrentRound, void(const consensus::Round &)); - MOCK_CONST_METHOD0(getCurrentRound, consensus::Round()); + MOCK_METHOD2(sendBatches, + void(OnDemandConnectionManager::CollectionType, + const OnDemandConnectionManager::CurrentConnections &)); }; } // namespace ordering