diff --git a/irohad/torii/impl/command_service_transport_grpc.cpp b/irohad/torii/impl/command_service_transport_grpc.cpp index 91636ae534..f24d1e41b1 100644 --- a/irohad/torii/impl/command_service_transport_grpc.cpp +++ b/irohad/torii/impl/command_service_transport_grpc.cpp @@ -16,6 +16,7 @@ #include #include "backend/protobuf/transaction_responses/proto_tx_response.hpp" #include "common/combine_latest_until_first_completed.hpp" +#include "common/run_loop_handler.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" #include "interfaces/iroha_internal/transaction_batch_factory.hpp" #include "interfaces/iroha_internal/transaction_batch_parser.hpp" @@ -165,33 +166,6 @@ namespace iroha { return grpc::Status::OK; } - namespace { - void handleEvents(rxcpp::composite_subscription &subscription, - rxcpp::schedulers::run_loop &run_loop) { - std::condition_variable wait_cv; - - run_loop.set_notify_earlier_wakeup( - [&wait_cv](const auto &) { wait_cv.notify_one(); }); - - std::mutex wait_mutex; - std::unique_lock lock(wait_mutex); - while (subscription.is_subscribed() or not run_loop.empty()) { - while (not run_loop.empty() - and run_loop.peek().when <= run_loop.now()) { - run_loop.dispatch(); - } - - if (run_loop.empty()) { - wait_cv.wait(lock, [&run_loop, &subscription]() { - return not subscription.is_subscribed() or not run_loop.empty(); - }); - } else { - wait_cv.wait_until(lock, run_loop.peek().when); - } - } - } - } // namespace - grpc::Status CommandServiceTransportGrpc::StatusStream( grpc::ServerContext *context, const iroha::protocol::TxStatusRequest *request, @@ -271,7 +245,7 @@ namespace iroha { // run loop while subscription is active or there are pending events in // the queue - handleEvents(subscription, rl); + iroha::schedulers::handleEvents(subscription, rl); log_->debug("status stream done, {}", client_id); return grpc::Status::OK; diff --git a/irohad/torii/impl/query_service.cpp b/irohad/torii/impl/query_service.cpp index 8e6abf3d3b..1265ffb66d 100644 --- a/irohad/torii/impl/query_service.cpp +++ b/irohad/torii/impl/query_service.cpp @@ -7,6 +7,7 @@ #include "backend/protobuf/query_responses/proto_block_query_response.hpp" #include "backend/protobuf/query_responses/proto_query_response.hpp" +#include "common/run_loop_handler.hpp" #include "cryptography/default_hash_provider.hpp" #include "interfaces/iroha_internal/abstract_transport_factory.hpp" #include "logger/logger.hpp" @@ -18,7 +19,7 @@ namespace iroha { QueryService::QueryService( std::shared_ptr query_processor, std::shared_ptr query_factory, - logger::LoggerPtr log) + logger::LoggerPtr log) : query_processor_{std::move(query_processor)}, query_factory_{std::move(query_factory)}, log_{std::move(log)} {} @@ -70,59 +71,82 @@ namespace iroha { const iroha::protocol::BlocksQuery *request, grpc::ServerWriter *writer) { log_->debug("Fetching commits"); + + rxcpp::schedulers::run_loop run_loop; + auto current_thread = rxcpp::synchronize_in_one_worker( + rxcpp::schedulers::make_run_loop(run_loop)); + shared_model::proto::TransportBuilder< shared_model::proto::BlocksQuery, shared_model::validation::DefaultSignedBlocksQueryValidator>() .build(*request) .match( - [this, context, request, writer]( + [this, context, request, writer, ¤t_thread, &run_loop]( const iroha::expected::Value &query) { - rxcpp::composite_subscription sub; + rxcpp::composite_subscription subscription; + std::string client_id = + (boost::format("Peer: '%s'") % context->peer()).str(); query_processor_->blocksQueryHandle(query.value) - .as_blocking() - .subscribe( - sub, - [this, context, &sub, request, writer]( - const std::shared_ptr< - shared_model::interface::BlockQueryResponse> - response) { - if (context->IsCancelled()) { - log_->debug("Unsubscribed"); - sub.unsubscribe(); - } else { - iroha::visit_in_place( - response->get(), - [this, writer, request]( - const shared_model::interface::BlockResponse - &block_response) { - log_->debug( - "{} receives committed block", - request->meta().creator_account_id()); - auto proto_block_response = static_cast< - const shared_model::proto::BlockResponse - &>(block_response); - writer->Write( - proto_block_response.getTransport()); - }, - [this, writer, request]( - const shared_model::interface:: - BlockErrorResponse - &block_error_response) { - log_->debug( - "{} received error with message: {}", - request->meta().creator_account_id(), - block_error_response.message()); - auto proto_block_error_response = - static_cast( - block_error_response); - writer->WriteLast( - proto_block_error_response.getTransport(), - grpc::WriteOptions()); - }); - } - }); + .observe_on(current_thread) + .take_while([this, context, request, writer]( + const std::shared_ptr< + shared_model::interface:: + BlockQueryResponse> response) { + if (context->IsCancelled()) { + log_->debug("Unsubscribed from block stream"); + return false; + } else { + auto result = iroha::visit_in_place( + response->get(), + [this, writer, request]( + const shared_model::interface::BlockResponse + &block_response) { + log_->debug("{} receives committed block", + request->meta().creator_account_id()); + auto proto_block_response = static_cast< + const shared_model::proto::BlockResponse &>( + block_response); + bool written = writer->Write( + proto_block_response.getTransport()); + if (not written) { + log_->debug( + "Block stream appears to be closed"); + return false; + } + return true; + }, + [this, writer, request]( + const shared_model::interface:: + BlockErrorResponse &block_error_response) { + log_->debug("{} received error with message: {}", + request->meta().creator_account_id(), + block_error_response.message()); + auto proto_block_error_response = static_cast< + const shared_model::proto::BlockErrorResponse + &>(block_error_response); + writer->WriteLast( + proto_block_error_response.getTransport(), + grpc::WriteOptions()); + return false; + }); + return result; + } + }) + .subscribe(subscription, + [](const auto &) {}, + [&](std::exception_ptr ep) { + log_->error( + "something bad happened during block " + "streaming, client_id {}", + client_id); + }, + [&] { + log_->debug("block stream done, {}", + client_id); + }); + + iroha::schedulers::handleEvents(subscription, run_loop); }, [this, writer](const auto &error) { log_->debug("Stateless invalid: {}", error.error); diff --git a/libs/common/run_loop_handler.hpp b/libs/common/run_loop_handler.hpp new file mode 100644 index 0000000000..de971fc4c6 --- /dev/null +++ b/libs/common/run_loop_handler.hpp @@ -0,0 +1,42 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef IROHA_RUN_LOOP_HANDLER_HPP +#define IROHA_RUN_LOOP_HANDLER_HPP + +#include + +#include + +namespace iroha { + namespace schedulers { + + inline void handleEvents(rxcpp::composite_subscription &subscription, + rxcpp::schedulers::run_loop &run_loop) { + std::condition_variable wait_cv; + + run_loop.set_notify_earlier_wakeup( + [&wait_cv](const auto &) { wait_cv.notify_one(); }); + + std::mutex wait_mutex; + std::unique_lock lock(wait_mutex); + while (subscription.is_subscribed() or not run_loop.empty()) { + while (not run_loop.empty() + and run_loop.peek().when <= run_loop.now()) { + run_loop.dispatch(); + } + + if (run_loop.empty()) { + wait_cv.wait(lock, [&run_loop, &subscription]() { + return not subscription.is_subscribed() or not run_loop.empty(); + }); + } else { + wait_cv.wait_until(lock, run_loop.peek().when); + } + } + } + } // namespace schedulers +} // namespace iroha + +#endif // IROHA_RUN_LOOP_HANDLER_HPP