diff --git a/irohad/torii/impl/command_service_impl.cpp b/irohad/torii/impl/command_service_impl.cpp index 2b8728fdd8..7e466654cf 100644 --- a/irohad/torii/impl/command_service_impl.cpp +++ b/irohad/torii/impl/command_service_impl.cpp @@ -96,7 +96,8 @@ namespace torii { shared_model::interface::StatelessFailedTxResponse, shared_model::interface::StatefulFailedTxResponse, shared_model::interface::CommittedTxResponse, - shared_model::interface::MstExpiredResponse>::value; + shared_model::interface::MstExpiredResponse, + shared_model::interface::RejectTxResponse>::value; rxcpp::observable< std::shared_ptr> diff --git a/irohad/torii/impl/command_service_transport_grpc.cpp b/irohad/torii/impl/command_service_transport_grpc.cpp index 6dc585ecbb..62dca24060 100644 --- a/irohad/torii/impl/command_service_transport_grpc.cpp +++ b/irohad/torii/impl/command_service_transport_grpc.cpp @@ -175,7 +175,7 @@ namespace torii { rxcpp::schedulers::run_loop rl; auto current_thread = - rxcpp::observe_on_one_worker(rxcpp::schedulers::make_run_loop(rl)); + rxcpp::synchronize_in_one_worker(rxcpp::schedulers::make_run_loop(rl)); rxcpp::composite_subscription subscription; @@ -184,18 +184,24 @@ namespace torii { auto client_id_format = boost::format("Peer: '%s', %s"); std::string client_id = (client_id_format % context->peer() % hash.toString()).str(); - + bool last_tx_status_received = false; + auto status_bus = command_service_->getStatusStream(hash) + .finally([&last_tx_status_received] { + last_tx_status_received = true; + }) + .publish() + .ref_count(); auto consensus_gate_observable = consensus_gate_objects_ // a dummy start_with lets us don't wait for the consensus event // on further combine_latest - .start_with(ConsensusGateEvent{}); + .start_with(ConsensusGateEvent{}) + .publish() + .ref_count(); boost::optional last_tx_status; auto rounds_counter{0}; - std::mutex stream_write_mutex; - command_service_ - ->getStatusStream(hash) + status_bus // convert to transport objects .map([&](auto response) { log_->info("mapped {}, {}", *response, client_id); @@ -203,16 +209,16 @@ namespace torii { shared_model::proto::TransactionResponse>(response) ->getTransport(); }) - .combine_latest(consensus_gate_observable) + .combine_latest(current_thread, consensus_gate_observable) .map([](const auto &tuple) { return std::get<0>(tuple); }) // complete the observable if client is disconnected or too many // rounds have passed without tx status change - .take_while([=, &rounds_counter, &last_tx_status, &stream_write_mutex]( - const auto &response) { - // TODO [IR-249] akvinikym 23.01.19: remove the mutex after - // ensuring only one thread can be here - std::lock_guard lg{stream_write_mutex}; - + .take_while([=, + &rounds_counter, + &last_tx_status, + // last_tx_status_received has to be passed by reference to + // prevent accessing its outdated state + &last_tx_status_received](const auto &response) { if (context->IsCancelled()) { log_->debug("client unsubscribed, {}", client_id); return false; @@ -240,6 +246,11 @@ namespace torii { } log_->debug("status written, {}", client_id); + if (last_tx_status_received) { + // force stream to end because no more tx statuses will arrive. + // it is thread safe because of synchronization on current_thread + return false; + } return true; }) .subscribe(subscription,