Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Fix Status Stream Performance Degradation (#2073)
Browse files Browse the repository at this point in the history
* Add Rejected to a list of tx final statuses
* Finish tx status stream when as soon as the last status appeared
* Simplify thread affinity management. Remove redundant lock_guard

No more forced awaiting two consensus rounds

Signed-off-by: Igor Egorov <[email protected]>
  • Loading branch information
Igor Egorov authored Feb 4, 2019
1 parent 46c30c6 commit 7fc63c0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
3 changes: 2 additions & 1 deletion irohad/torii/impl/command_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ namespace torii {
shared_model::interface::StatelessFailedTxResponse,
shared_model::interface::StatefulFailedTxResponse,
shared_model::interface::CommittedTxResponse,
shared_model::interface::MstExpiredResponse>::value;
shared_model::interface::MstExpiredResponse,
shared_model::interface::RejectTxResponse>::value;

rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
Expand Down
37 changes: 24 additions & 13 deletions irohad/torii/impl/command_service_transport_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ namespace torii {
rxcpp::schedulers::run_loop rl;

auto current_thread =
rxcpp::observe_on_one_worker(rxcpp::schedulers::make_run_loop(rl));
rxcpp::synchronize_in_one_worker(rxcpp::schedulers::make_run_loop(rl));

rxcpp::composite_subscription subscription;

Expand All @@ -184,35 +184,41 @@ namespace torii {
auto client_id_format = boost::format("Peer: '%s', %s");
std::string client_id =
(client_id_format % context->peer() % hash.toString()).str();

bool last_tx_status_received = false;
auto status_bus = command_service_->getStatusStream(hash)
.finally([&last_tx_status_received] {
last_tx_status_received = true;
})
.publish()
.ref_count();
auto consensus_gate_observable =
consensus_gate_objects_
// a dummy start_with lets us don't wait for the consensus event
// on further combine_latest
.start_with(ConsensusGateEvent{});
.start_with(ConsensusGateEvent{})
.publish()
.ref_count();

boost::optional<iroha::protocol::TxStatus> last_tx_status;
auto rounds_counter{0};
std::mutex stream_write_mutex;
command_service_
->getStatusStream(hash)
status_bus
// convert to transport objects
.map([&](auto response) {
log_->info("mapped {}, {}", *response, client_id);
return std::static_pointer_cast<
shared_model::proto::TransactionResponse>(response)
->getTransport();
})
.combine_latest(consensus_gate_observable)
.combine_latest(current_thread, consensus_gate_observable)
.map([](const auto &tuple) { return std::get<0>(tuple); })
// complete the observable if client is disconnected or too many
// rounds have passed without tx status change
.take_while([=, &rounds_counter, &last_tx_status, &stream_write_mutex](
const auto &response) {
// TODO [IR-249] akvinikym 23.01.19: remove the mutex after
// ensuring only one thread can be here
std::lock_guard<std::mutex> lg{stream_write_mutex};

.take_while([=,
&rounds_counter,
&last_tx_status,
// last_tx_status_received has to be passed by reference to
// prevent accessing its outdated state
&last_tx_status_received](const auto &response) {
if (context->IsCancelled()) {
log_->debug("client unsubscribed, {}", client_id);
return false;
Expand Down Expand Up @@ -240,6 +246,11 @@ namespace torii {
}

log_->debug("status written, {}", client_id);
if (last_tx_status_received) {
// force stream to end because no more tx statuses will arrive.
// it is thread safe because of synchronization on current_thread
return false;
}
return true;
})
.subscribe(subscription,
Expand Down

0 comments on commit 7fc63c0

Please sign in to comment.