diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index 9d06253e04..bc052acbea 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -38,6 +38,7 @@ #include "network/impl/peer_communication_service_impl.hpp" #include "ordering/impl/on_demand_common.hpp" #include "ordering/impl/on_demand_ordering_gate.hpp" +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" #include "simulator/impl/simulator.hpp" #include "synchronizer/impl/synchronizer_impl.hpp" @@ -361,6 +362,9 @@ void Irohad::initOrderingGate() { auto factory = std::make_unique>(validators_config_); + auto batch_resend_strategy = + std::make_shared(); + const uint64_t kCounter = 0, kMaxLocalCounter = 2; // reject_delay and local_counter are local mutable variables of lambda const auto kMaxDelay(max_rounds_delay_); @@ -400,6 +404,7 @@ void Irohad::initOrderingGate() { std::move(factory), proposal_factory, persistent_cache, + std::move(batch_resend_strategy), delay, log_manager_->getChild("Ordering")); log_->info("[Init] => init ordering gate - [{}]", diff --git a/irohad/main/impl/on_demand_ordering_init.cpp b/irohad/main/impl/on_demand_ordering_init.cpp index 22fe384ffb..c39c4e7445 100644 --- a/irohad/main/impl/on_demand_ordering_init.cpp +++ b/irohad/main/impl/on_demand_ordering_init.cpp @@ -67,6 +67,8 @@ namespace iroha { std::shared_ptr> async_call, std::shared_ptr proposal_transport_factory, + std::shared_ptr + batch_resend_strategy, std::chrono::milliseconds delay, std::vector initial_hashes, const logger::LoggerManagerTreePtr &ordering_log_manager) { @@ -195,6 +197,7 @@ namespace iroha { delay, ordering_log_manager), peers, + std::move(batch_resend_strategy), ordering_log_manager->getChild("ConnectionManager")->getLogger()); } @@ -205,6 +208,8 @@ namespace iroha { std::shared_ptr proposal_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, size_t max_number_of_transactions, @@ -264,6 +269,7 @@ namespace iroha { std::move(cache), std::move(proposal_factory), std::move(tx_cache), + std::move(batch_resend_strategy), max_number_of_transactions, ordering_log_manager->getChild("Gate")->getLogger()); } @@ -305,6 +311,8 @@ namespace iroha { proposal_factory, std::shared_ptr proposal_transport_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, logger::LoggerManagerTreePtr ordering_log_manager) { @@ -323,12 +331,14 @@ namespace iroha { createConnectionManager(std::move(peer_query_factory), std::move(async_call), std::move(proposal_transport_factory), + batch_resend_strategy, delay, std::move(initial_hashes), ordering_log_manager), std::make_shared(), std::move(proposal_factory), std::move(tx_cache), + batch_resend_strategy, std::move(delay_func), max_number_of_transactions, ordering_log_manager); diff --git a/irohad/main/impl/on_demand_ordering_init.hpp b/irohad/main/impl/on_demand_ordering_init.hpp index b566ac46a0..90a06a2a49 100644 --- a/irohad/main/impl/on_demand_ordering_init.hpp +++ b/irohad/main/impl/on_demand_ordering_init.hpp @@ -20,6 +20,7 @@ #include "ordering.grpc.pb.h" #include "ordering/impl/on_demand_os_server_grpc.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" #include "ordering/on_demand_os_transport.hpp" @@ -58,6 +59,8 @@ namespace iroha { std::shared_ptr> async_call, std::shared_ptr proposal_transport_factory, + std::shared_ptr + batch_resend_strategy, std::chrono::milliseconds delay, std::vector initial_hashes, const logger::LoggerManagerTreePtr &ordering_log_manager); @@ -73,6 +76,8 @@ namespace iroha { std::shared_ptr proposal_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, size_t max_number_of_transactions, @@ -98,7 +103,6 @@ namespace iroha { OnDemandOrderingInit(logger::LoggerPtr log); ~OnDemandOrderingInit(); - /** * Initializes on-demand ordering gate and ordering sevice components * @@ -140,6 +144,8 @@ namespace iroha { proposal_factory, std::shared_ptr proposal_transport_factory, std::shared_ptr tx_cache, + std::shared_ptr + batch_resend_strategy, std::function delay_func, logger::LoggerManagerTreePtr ordering_log_manager); diff --git a/irohad/ordering/CMakeLists.txt b/irohad/ordering/CMakeLists.txt index 2efd5c8fab..f1aa110bb3 100644 --- a/irohad/ordering/CMakeLists.txt +++ b/irohad/ordering/CMakeLists.txt @@ -54,6 +54,7 @@ add_library(on_demand_ordering_gate impl/on_demand_ordering_gate.cpp impl/ordering_gate_cache/ordering_gate_cache.cpp impl/ordering_gate_cache/on_demand_cache.cpp + impl/ordering_gate_cache/on_demand_resend_strategy.cpp ) target_link_libraries(on_demand_ordering_gate on_demand_common diff --git a/irohad/ordering/impl/on_demand_connection_manager.cpp b/irohad/ordering/impl/on_demand_connection_manager.cpp index bff96d5335..6aa13ecc10 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.cpp +++ b/irohad/ordering/impl/on_demand_connection_manager.cpp @@ -7,8 +7,10 @@ #include #include "interfaces/iroha_internal/proposal.hpp" +#include "interfaces/iroha_internal/transaction_batch_impl.hpp" #include "logger/logger.hpp" #include "ordering/impl/on_demand_common.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" using namespace iroha; using namespace iroha::ordering; @@ -16,9 +18,11 @@ using namespace iroha::ordering; OnDemandConnectionManager::OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log) : log_(std::move(log)), factory_(std::move(factory)), + batch_resend_strategy_(std::move(batch_resend_strategy)), subscription_(peers.subscribe( [this](const auto &peers) { this->initializeConnections(peers); })) {} @@ -26,8 +30,12 @@ OnDemandConnectionManager::OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, CurrentPeers initial_peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log) - : OnDemandConnectionManager(std::move(factory), peers, std::move(log)) { + : OnDemandConnectionManager(std::move(factory), + peers, + std::move(batch_resend_strategy), + std::move(log)) { // using start_with(initial_peers) results in deadlock initializeConnections(initial_peers); } @@ -50,15 +58,7 @@ void OnDemandConnectionManager::onBatches(CollectionType batches) { * RejectReject CommitReject RejectCommit CommitCommit */ - auto propagate = [&](auto consumer) { - std::shared_lock lock(mutex_); - connections_.peers[consumer]->onBatches(batches); - }; - - propagate(kRejectRejectConsumer); - propagate(kRejectCommitConsumer); - propagate(kCommitRejectConsumer); - propagate(kCommitCommitConsumer); + batch_resend_strategy_->sendBatches(batches, connections_); } boost::optional> diff --git a/irohad/ordering/impl/on_demand_connection_manager.hpp b/irohad/ordering/impl/on_demand_connection_manager.hpp index 2d8206ac2c..d4ce9d5ed8 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.hpp +++ b/irohad/ordering/impl/on_demand_connection_manager.hpp @@ -16,6 +16,8 @@ namespace iroha { namespace ordering { + class OrderingGateResendStrategy; + /** * Proxy class which redirects requests to appropriate peers */ @@ -53,12 +55,14 @@ namespace iroha { OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log); OnDemandConnectionManager( std::shared_ptr factory, rxcpp::observable peers, CurrentPeers initial_peers, + std::shared_ptr batch_resend_strategy, logger::LoggerPtr log); ~OnDemandConnectionManager() override; @@ -68,7 +72,6 @@ namespace iroha { boost::optional> onRequestProposal( consensus::Round round) override; - private: /** * Corresponding connections created by OdOsNotificationFactory * @see PeerType for individual descriptions @@ -77,6 +80,7 @@ namespace iroha { PeerCollectionType> peers; }; + private: /** * Initialize corresponding peers in connections_ using factory_ * @param peers to initialize connections with @@ -85,6 +89,7 @@ namespace iroha { logger::LoggerPtr log_; std::shared_ptr factory_; + std::shared_ptr batch_resend_strategy_; rxcpp::composite_subscription subscription_; CurrentConnections connections_; diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index 715bad47be..ab82c21de9 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -31,6 +31,7 @@ OnDemandOrderingGate::OnDemandOrderingGate( std::shared_ptr cache, std::shared_ptr factory, std::shared_ptr tx_cache, + std::shared_ptr batch_resend_strategy, size_t transaction_limit, logger::LoggerPtr log) : log_(std::move(log)), @@ -43,6 +44,7 @@ OnDemandOrderingGate::OnDemandOrderingGate( log_->debug("Asking to remove {} transactions from cache.", hashes->size()); cache_->remove(*hashes); + batch_resend_strategy_->remove(*hashes); })), round_switch_subscription_( round_switch_events.subscribe([this](auto event) { @@ -53,6 +55,8 @@ OnDemandOrderingGate::OnDemandOrderingGate( this->sendCachedTransactions(); + batch_resend_strategy_->setCurrentRound(event.next_round); + // request proposal for the current round auto proposal = this->processProposalRequest( network_client_->onRequestProposal(event.next_round)); @@ -65,7 +69,8 @@ OnDemandOrderingGate::OnDemandOrderingGate( cache_(std::move(cache)), proposal_factory_(std::move(factory)), tx_cache_(std::move(tx_cache)), - proposal_notifier_(proposal_notifier_lifetime_) {} + proposal_notifier_(proposal_notifier_lifetime_), + batch_resend_strategy_(std::move(batch_resend_strategy)) {} OnDemandOrderingGate::~OnDemandOrderingGate() { proposal_notifier_lifetime_.unsubscribe(); @@ -77,6 +82,8 @@ void OnDemandOrderingGate::propagateBatch( std::shared_ptr batch) { cache_->addToBack({batch}); + batch_resend_strategy_->feed(batch); + network_client_->onBatches( transport::OdOsNotification::CollectionType{batch}); } @@ -104,9 +111,14 @@ OnDemandOrderingGate::processProposalRequest( void OnDemandOrderingGate::sendCachedTransactions() { // TODO mboldyrev 22.03.2019 IR-425 // make cache_->getBatchesForRound(current_round) that respects sync + auto batches = cache_->pop(); cache_->addToBack(batches); + for (const auto &batch : batches) { + batch_resend_strategy_->readyToUse(batch); + } + // get only transactions which fit to next proposal auto end_iterator = batches.begin(); auto current_number_of_transactions = 0u; diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index 3b10343240..8aa75b1fb7 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -17,6 +17,7 @@ #include "interfaces/iroha_internal/unsafe_proposal_factory.hpp" #include "logger/logger_fwd.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" namespace iroha { @@ -55,6 +56,7 @@ namespace iroha { std::shared_ptr factory, std::shared_ptr tx_cache, + std::shared_ptr batch_resend_strategy, size_t transaction_limit, logger::LoggerPtr log); @@ -100,6 +102,7 @@ namespace iroha { rxcpp::composite_subscription proposal_notifier_lifetime_; rxcpp::subjects::subject proposal_notifier_; + std::shared_ptr batch_resend_strategy_; }; } // namespace ordering diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp new file mode 100644 index 0000000000..08715c15fa --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.cpp @@ -0,0 +1,161 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" + +#include +#include +#include "interfaces/iroha_internal/transaction_batch.hpp" +#include "interfaces/transaction.hpp" +#include "ordering/impl/on_demand_common.hpp" + +using namespace iroha::ordering; + +bool OnDemandResendStrategy::feed( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + auto res = + sent_batches_.emplace(batch, std::make_pair(current_round_, false)); + return res.second; +} + +bool OnDemandResendStrategy::readyToUse( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + auto batch_found = sent_batches_.find(batch); + if (batch_found != sent_batches_.end()) { + batch_found->second.second = true; + return true; + } + return false; +} + +void OnDemandResendStrategy::remove( + const cache::OrderingGateCache::HashesSetType &hashes) { + std::shared_lock lock(access_mutex_); + for (auto it = sent_batches_.begin(); it != sent_batches_.end();) { + cache::OrderingGateCache::HashesSetType restored_hashes{}; + for (auto tx : it->first->transactions()) { + restored_hashes.insert(tx->hash()); + } + if (hashes == restored_hashes) { + sent_batches_.erase(it); + return; + } else { + ++it; + } + } +} + +void OnDemandResendStrategy::setCurrentRound( + const iroha::consensus::Round ¤t_round) { + std::shared_lock lock(access_mutex_); + current_round_ = current_round; +} + +void OnDemandResendStrategy::sendBatches( + OnDemandConnectionManager::CollectionType batches, + const iroha::ordering::OnDemandConnectionManager::CurrentConnections + &connections) { + /* + * Transactions are always sent to the round after the next round (+2) + * There are 4 possibilities - all combinations of commits and rejects in + the + * following two rounds. This can be visualised as a diagram, where: o - + * current round, x - next round, v - target round + * + * 0 1 2 0 1 2 0 1 2 0 1 2 + * 0 o x v 0 o . . 0 o x . 0 o . . + * 1 . . . 1 x v . 1 v . . 1 x . . + * 2 . . . 2 . . . 2 . . . 2 v . . + * RejectReject CommitReject RejectCommit CommitCommit + */ + + for (const auto &batch : batches) { + OnDemandConnectionManager::CollectionType reject_reject_batches{}; + OnDemandConnectionManager::CollectionType reject_commit_batches{}; + OnDemandConnectionManager::CollectionType commit_reject_batches{}; + OnDemandConnectionManager::CollectionType commit_commit_batches{}; + + auto rounds = extract(batch); + auto current_round = getCurrentRound(); + + if (rounds.find(nextRejectRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextRejectRound(current_round))) + != rounds.end()) { + reject_commit_batches.push_back(batch); + } + if (rounds.find(nextRejectRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_reject_batches.push_back(batch); + } + if (rounds.find(nextCommitRound(nextCommitRound(current_round))) + != rounds.end()) { + commit_commit_batches.push_back(batch); + } + + if (not reject_reject_batches.empty()) { + connections.peers[OnDemandConnectionManager::kRejectRejectConsumer] + ->onBatches(std::move(reject_reject_batches)); + } + if (not reject_commit_batches.empty()) { + connections.peers[OnDemandConnectionManager::kRejectCommitConsumer] + ->onBatches(std::move(reject_commit_batches)); + } + if (not commit_reject_batches.empty()) { + connections.peers[OnDemandConnectionManager::kCommitRejectConsumer] + ->onBatches(std::move(commit_reject_batches)); + } + if (not commit_commit_batches.empty()) { + connections.peers[OnDemandConnectionManager::kCommitCommitConsumer] + ->onBatches(std::move(commit_commit_batches)); + } + } +} + +OnDemandResendStrategy::RoundSetType OnDemandResendStrategy::extract( + std::shared_ptr batch) { + std::shared_lock lock(access_mutex_); + auto reachable_from_current = reachableInTwoRounds(current_round_); + + auto saved_round_iterator = sent_batches_.find(batch); + if (saved_round_iterator == sent_batches_.end()) { + return reachable_from_current; + } + if (not saved_round_iterator->second.second) { + return reachable_from_current; + } + + auto saved_round = saved_round_iterator->second.first; + saved_round_iterator->second.first = current_round_; + + auto reachable_from_saved = reachableInTwoRounds(saved_round); + RoundSetType target_rounds{}; + std::set_difference(reachable_from_current.begin(), + reachable_from_current.end(), + reachable_from_saved.begin(), + reachable_from_saved.end(), + std::inserter(target_rounds, target_rounds.begin())); + return target_rounds; +} + +iroha::consensus::Round OnDemandResendStrategy::getCurrentRound() const { + std::shared_lock lock(access_mutex_); + return current_round_; +} + +OnDemandResendStrategy::RoundSetType +OnDemandResendStrategy::reachableInTwoRounds( + const consensus::Round &round) const { + RoundSetType reachable_rounds{}; + reachable_rounds.insert(nextCommitRound(nextCommitRound(round))); + reachable_rounds.insert(nextCommitRound(nextRejectRound(round))); + reachable_rounds.insert(nextRejectRound(nextCommitRound(round))); + reachable_rounds.insert(nextRejectRound(nextRejectRound(round))); + return reachable_rounds; +} diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp new file mode 100644 index 0000000000..72253b27b5 --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp @@ -0,0 +1,58 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_ON_DEMAND_RESEND_STRATEGY_HPP +#define IROHA_ON_DEMAND_RESEND_STRATEGY_HPP + +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" + +#include +#include + +namespace iroha { + namespace ordering { + class OnDemandResendStrategy : public OrderingGateResendStrategy { + public: + bool feed(std::shared_ptr + batch) override; + + bool readyToUse(std::shared_ptr + batch) override; + + void remove( + const cache::OrderingGateCache::HashesSetType &hashes) override; + + void setCurrentRound(const consensus::Round ¤t_round) override; + + void sendBatches(OnDemandConnectionManager::CollectionType batches, + const OnDemandConnectionManager::CurrentConnections + &connections) override; + + private: + /** + * Returns collection of interested future rounds for inserted and ready + * batch + * @param batch - batch to examine + */ + RoundSetType extract( + std::shared_ptr batch); + + consensus::Round getCurrentRound() const; + + RoundSetType reachableInTwoRounds(const consensus::Round &round) const; + + std::unordered_map< + std::shared_ptr, + std::pair, + cache::OrderingGateCache::BatchPointerHasher, + cache::OrderingGateCache::BatchPointerComparator> + sent_batches_; + consensus::Round current_round_; + mutable std::shared_timed_mutex access_mutex_; + }; + } // namespace ordering +} // namespace iroha + +#endif // IROHA_ON_DEMAND_RESEND_STRATEGY_HPP diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp index f884eb8767..2d7e7d599d 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.cpp @@ -17,6 +17,13 @@ namespace iroha { return hasher_(a->reducedHash()); } + bool OrderingGateCache::BatchPointerComparator::operator()( + const std::shared_ptr &l, + const std::shared_ptr &r) + const { + return *l == *r; + } + } // namespace cache } // namespace ordering } // namespace iroha diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp index e949a1a502..3fa9dced1b 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp @@ -24,7 +24,7 @@ namespace iroha { * Cache for transactions sent to ordering gate */ class OrderingGateCache { - private: + public: /** * Hasher for the shared pointer on the batch. Uses batch's reduced hash */ @@ -36,7 +36,17 @@ namespace iroha { &a) const; }; - public: + /** + * Comparator for shared_ptr + */ + struct BatchPointerComparator { + bool operator()( + const std::shared_ptr + &l, + const std::shared_ptr + &r) const; + }; + /// type of the element in cache container. Set is used as it allows to /// remove batch from BatchSet with O(1) complexity, which is the case /// in remove method diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp new file mode 100644 index 0000000000..517899d985 --- /dev/null +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp @@ -0,0 +1,64 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP +#define IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP + +#include +#include "consensus/round.hpp" +#include "ordering/impl/on_demand_connection_manager.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" + +namespace iroha { + namespace ordering { + /** + * Strategy for batch resend interface + */ + class OrderingGateResendStrategy { + public: + /** + * Inserts new batch into strategy + * @param batch - batch to insert + * @return true if operation success + */ + virtual bool feed( + std::shared_ptr batch) = 0; + + /** + * Get inserted before batch ready to extract + * @param batch - batch to get ready + * @return false if batch doesn't exist + */ + virtual bool readyToUse( + std::shared_ptr batch) = 0; + + using RoundSetType = std::set; + + /** + * Removes batch with given hashes + * @param hashes to remove batch with + */ + virtual void remove( + const cache::OrderingGateCache::HashesSetType &hashes) = 0; + + /** + * Sets current round to a given one + */ + virtual void setCurrentRound(const consensus::Round ¤t_round) = 0; + + /** + * Sends batches in respect the strategy + * @param batches - batches to send + * @param connections - connections to send the batch through + */ + virtual void sendBatches( + OnDemandConnectionManager::CollectionType batches, + const OnDemandConnectionManager::CurrentConnections &connections) = 0; + }; + + } // namespace ordering +} // namespace iroha + +#endif // IROHA_ORDERING_GATE_RESEND_STRATEGY_HPP diff --git a/test/module/irohad/ordering/CMakeLists.txt b/test/module/irohad/ordering/CMakeLists.txt index 9ab4475f02..28192a4eaf 100644 --- a/test/module/irohad/ordering/CMakeLists.txt +++ b/test/module/irohad/ordering/CMakeLists.txt @@ -40,3 +40,9 @@ target_link_libraries(on_demand_cache_test on_demand_ordering_gate shared_model_interfaces_factories ) + +addtest(on_demand_resend_strategy_test on_demand_resend_strategy_test.cpp) +target_link_libraries(on_demand_resend_strategy_test + on_demand_ordering_gate + shared_model_interfaces_factories + ) diff --git a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp index 3fd78165b3..710660b26f 100644 --- a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp +++ b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp @@ -33,6 +33,7 @@ ACTION_P(CreateAndSave, var) { struct OnDemandConnectionManagerTest : public ::testing::Test { void SetUp() override { factory = std::make_shared(); + strategy = std::make_shared(); auto set = [this](auto &field, auto &ptr) { field = std::make_shared(); @@ -49,6 +50,7 @@ struct OnDemandConnectionManagerTest : public ::testing::Test { factory, peers.get_observable(), cpeers, + strategy, getTestLogger("OsConnectionManager")); } @@ -58,6 +60,7 @@ struct OnDemandConnectionManagerTest : public ::testing::Test { rxcpp::subjects::subject peers; std::shared_ptr factory; + std::shared_ptr strategy; std::shared_ptr manager; }; @@ -79,16 +82,7 @@ TEST_F(OnDemandConnectionManagerTest, FactoryUsed) { */ TEST_F(OnDemandConnectionManagerTest, onBatches) { OdOsNotification::CollectionType collection; - - auto set_expect = [&](OnDemandConnectionManager::PeerType type) { - EXPECT_CALL(*connections[type], onBatches(collection)).Times(1); - }; - - set_expect(OnDemandConnectionManager::kRejectRejectConsumer); - set_expect(OnDemandConnectionManager::kRejectCommitConsumer); - set_expect(OnDemandConnectionManager::kCommitRejectConsumer); - set_expect(OnDemandConnectionManager::kCommitCommitConsumer); - + EXPECT_CALL(*strategy, sendBatches(collection, testing::_)).Times(1); manager->onBatches(collection); } diff --git a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp index acec450493..73f65b69c2 100644 --- a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp +++ b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp @@ -41,6 +41,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { auto ufactory = std::make_unique>(); factory = ufactory.get(); tx_cache = std::make_shared(); + strategy = std::make_shared(); ON_CALL(*tx_cache, check(testing::Matcher(_))) .WillByDefault( @@ -54,9 +55,9 @@ class OnDemandOrderingGateTest : public ::testing::Test { cache, std::move(ufactory), tx_cache, + strategy, 1000, getTestLogger("OrderingGate")); - auto peer = makePeer("127.0.0.1", shared_model::crypto::PublicKey("111")); auto ledger_peers = std::make_shared(PeerList{peer}); ledger_state = @@ -71,6 +72,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { std::shared_ptr notification; NiceMock *factory; std::shared_ptr tx_cache; + std::shared_ptr strategy; std::shared_ptr ordering_gate; std::shared_ptr cache; diff --git a/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp new file mode 100644 index 0000000000..52faafc4f8 --- /dev/null +++ b/test/module/irohad/ordering/on_demand_resend_strategy_test.cpp @@ -0,0 +1,277 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ordering/impl/ordering_gate_cache/on_demand_resend_strategy.hpp" + +#include +#include +#include "module/irohad/ordering/mock_on_demand_os_notification.hpp" +#include "module/irohad/ordering/ordering_mocks.hpp" +#include "module/shared_model/interface_mocks.hpp" +#include "ordering/impl/on_demand_common.hpp" + +using namespace iroha; +using namespace iroha::ordering; +using namespace iroha::ordering::transport; +using namespace iroha::consensus; + +using ::testing::_; +using ::testing::ByMove; +using ::testing::Ref; +using ::testing::Return; + +ACTION_P(CreateAndSave, var) { + auto result = std::make_unique(); + *var = result.get(); + return std::unique_ptr(std::move(result)); +} + +struct OnDemandResendStrategyTest : public ::testing::Test { + void SetUp() override { + factory = std::make_shared(); + strategy = std::make_shared(); + + auto set = [this](auto &field, auto &ptr, auto &conn) { + field = std::make_shared(); + + EXPECT_CALL(*factory, create(Ref(*field))) + .WillRepeatedly(CreateAndSave(&ptr)); + + conn = factory->create(*field); + }; + + for (auto &&triple : + boost::combine(cpeers.peers, connections, connections_obj.peers)) { + set(boost::get<0>(triple), boost::get<1>(triple), boost::get<2>(triple)); + } + } + + OnDemandConnectionManager::CurrentConnections connections_obj; + OnDemandConnectionManager::CurrentPeers cpeers; + OnDemandConnectionManager::PeerCollectionType + connections; + std::shared_ptr factory; + std::shared_ptr strategy; +}; + +/** + * @given OnDemandResendStrategy instance + * @when same batch is fed to the instance twice + * @then first feeding succeeds, second one fails + */ +TEST_F(OnDemandResendStrategyTest, Feed) { + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + EXPECT_CALL(*batch, Equals(_)).Times(1).WillRepeatedly(Return(true)); + ASSERT_TRUE(strategy->feed(batch)); + ASSERT_FALSE(strategy->feed(batch)); +} + +/** + * @given OnDemandResendStrategy instance + * @when readyToUse is called before and after batch is fed + * @then first call fails, second succeeds + */ +TEST_F(OnDemandResendStrategyTest, ReadyToUse) { + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + ASSERT_FALSE(strategy->readyToUse(batch)); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); + ASSERT_TRUE(strategy->readyToUse(batch)); +} + +/** + * @given OnDemandResendStrategy instance + * @when sendBatches is called without any other prior calls + * @then onBatches is called for all consumers + */ +TEST_F(OnDemandResendStrategyTest, ExtractNonExisting) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed and then sendBatches are called for the same batch + * @then onBatches is called for all consumers + */ +TEST_F(OnDemandResendStrategyTest, ExtractNonReady) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy->feed(batch); + + EXPECT_CALL(*batch, Equals(_)).WillOnce(Return(true)); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for 2 commits to + * future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers + */ +TEST_F(OnDemandResendStrategyTest, ExtractCommitCommit) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy->readyToUse(batch); + round = Round(3, 0); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (commit, reject) + * to future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers except (reject, commit) + */ +TEST_F(OnDemandResendStrategyTest, ExtractCommitReject) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy->readyToUse(batch); + round = Round(2, 1); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (reject, commit) + * to future and then sendBatches is called for the same batch + * @then onBatches is called for all consumers except (reject, commit) + */ +TEST_F(OnDemandResendStrategyTest, ExtractRejectCommit) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy->readyToUse(batch); + round = Round(2, 0); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + EXPECT_CALL(*connections[OnDemandConnectionManager::kCommitCommitConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, readyToUse are called, current round is set for (reject, reject) + * to future and then sendBatches is called for the same batch + * @then onBatches is called for (reject, commit) consumer + */ +TEST_F(OnDemandResendStrategyTest, ExtractRejectReject) { + Round round(1, 1); + strategy->setCurrentRound(round); + shared_model::interface::types::HashType hash("hash"); + auto batch = createMockBatchWithHash(hash); + strategy->feed(batch); + EXPECT_CALL(*batch, Equals(_)).Times(2).WillRepeatedly(Return(true)); + strategy->readyToUse(batch); + round = Round(1, 3); + strategy->setCurrentRound(round); + + EXPECT_CALL(*connections[OnDemandConnectionManager::kRejectRejectConsumer], + onBatches(OnDemandConnectionManager::CollectionType{batch})) + .Times(1); + + strategy->sendBatches(OnDemandConnectionManager::CollectionType{batch}, + connections_obj); +} + +/** + * @given OnDemandResendStrategy instance + * @when feed, remove and feed are called for the same batch + * @then both feed calls succeed + */ +TEST_F(OnDemandResendStrategyTest, Remove) { + shared_model::interface::types::HashType hash("hash"); + auto tx = createMockTransactionWithHash(hash); + auto batch = createMockBatchWithTransactions({tx}, ""); + ASSERT_TRUE(strategy->feed(batch)); + strategy->remove({hash}); + ASSERT_TRUE(strategy->feed(batch)); +} diff --git a/test/module/irohad/ordering/ordering_mocks.hpp b/test/module/irohad/ordering/ordering_mocks.hpp index c867a72768..3e582e2e56 100644 --- a/test/module/irohad/ordering/ordering_mocks.hpp +++ b/test/module/irohad/ordering/ordering_mocks.hpp @@ -10,6 +10,7 @@ #include "module/irohad/ordering/mock_on_demand_os_notification.hpp" #include "ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp" +#include "ordering/impl/ordering_gate_cache/ordering_gate_resend_strategy.hpp" #include "ordering/on_demand_ordering_service.hpp" #include "ordering/on_demand_os_transport.hpp" @@ -45,6 +46,21 @@ namespace iroha { MOCK_METHOD1(onCollaborationOutcome, void(consensus::Round)); }; + struct MockOrderingGateResendStrategy : public OrderingGateResendStrategy { + MOCK_METHOD1( + feed, + bool(std::shared_ptr)); + MOCK_METHOD1( + readyToUse, + bool(std::shared_ptr)); + MOCK_METHOD1(remove, + void(const cache::OrderingGateCache::HashesSetType &hashes)); + MOCK_METHOD1(setCurrentRound, void(const consensus::Round &)); + MOCK_METHOD2(sendBatches, + void(OnDemandConnectionManager::CollectionType, + const OnDemandConnectionManager::CurrentConnections &)); + }; + } // namespace ordering } // namespace iroha