Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Merge pull request #2055 from hyperledger/hotfix/rc2
Browse files Browse the repository at this point in the history
Hotfix for segfault and atomic batches
  • Loading branch information
Igor Egorov authored Jan 25, 2019
2 parents 108b2af + 3c41103 commit 46c30c6
Show file tree
Hide file tree
Showing 27 changed files with 550 additions and 186 deletions.
35 changes: 28 additions & 7 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "validators/default_validator.hpp"
#include "validators/field_validator.hpp"
#include "validators/protobuf/proto_block_validator.hpp"
#include "validators/protobuf/proto_proposal_validator.hpp"
#include "validators/protobuf/proto_query_validator.hpp"
#include "validators/protobuf/proto_transaction_validator.hpp"

Expand Down Expand Up @@ -189,6 +190,26 @@ void Irohad::initNetworkClient() {
}

void Irohad::initFactories() {
// proposal factory
std::shared_ptr<
shared_model::validation::AbstractValidator<iroha::protocol::Transaction>>
proto_transaction_validator = std::make_shared<
shared_model::validation::ProtoTransactionValidator>();
std::unique_ptr<shared_model::validation::AbstractValidator<
shared_model::interface::Proposal>>
proposal_validator = std::make_unique<
shared_model::validation::DefaultProposalValidator>();
std::unique_ptr<
shared_model::validation::AbstractValidator<iroha::protocol::Proposal>>
proto_proposal_validator =
std::make_unique<shared_model::validation::ProtoProposalValidator>(
proto_transaction_validator);
proposal_factory =
std::make_shared<shared_model::proto::ProtoTransportFactory<
shared_model::interface::Proposal,
shared_model::proto::Proposal>>(std::move(proposal_validator),
std::move(proto_proposal_validator));

// transaction factories
transaction_batch_factory_ =
std::make_shared<shared_model::interface::TransactionBatchFactoryImpl>();
Expand All @@ -198,10 +219,6 @@ void Irohad::initFactories() {
transaction_validator =
std::make_unique<shared_model::validation::
DefaultOptionalSignedTransactionValidator>();
std::unique_ptr<
shared_model::validation::AbstractValidator<iroha::protocol::Transaction>>
proto_transaction_validator = std::make_unique<
shared_model::validation::ProtoTransactionValidator>();
transaction_factory =
std::make_shared<shared_model::proto::ProtoTransportFactory<
shared_model::interface::Transaction,
Expand Down Expand Up @@ -301,6 +318,7 @@ void Irohad::initOrderingGate() {
transaction_batch_factory_,
async_call_,
std::move(factory),
proposal_factory,
persistent_cache,
{blocks.back()->height(), 1},
delay);
Expand Down Expand Up @@ -363,7 +381,8 @@ void Irohad::initConsensusGate() {
vote_delay_,
async_call_,
common_objects_factory_);

consensus_gate->onOutcome().subscribe(
consensus_gate_objects.get_subscriber());
log_->info("[Init] => consensus gate");
}

Expand Down Expand Up @@ -466,8 +485,10 @@ void Irohad::initTransactionCommandService() {
transaction_factory,
batch_parser,
transaction_batch_factory_,
consensus_gate,
2);
consensus_gate_objects.get_observable().map([](const auto &) {
return ::torii::CommandServiceTransportGrpc::ConsensusGateEvent{};
}),
2); // TODO 18.01.2019 igor-egorov, make it configurable IR-230

log_->info("[Init] => command service");
}
Expand Down
7 changes: 7 additions & 0 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ class Irohad {
// persistent cache
std::shared_ptr<iroha::ametsuchi::TxPresenceCache> persistent_cache;

// proposal factory
std::shared_ptr<shared_model::interface::AbstractTransportFactory<
shared_model::interface::Proposal,
iroha::protocol::Proposal>>
proposal_factory;

// ordering gate
std::shared_ptr<iroha::network::OrderingGate> ordering_gate;

Expand All @@ -210,6 +216,7 @@ class Irohad {

// consensus gate
std::shared_ptr<iroha::network::ConsensusGate> consensus_gate;
rxcpp::subjects::subject<iroha::consensus::GateObject> consensus_gate_objects;

// synchronizer
std::shared_ptr<iroha::synchronizer::Synchronizer> synchronizer;
Expand Down
32 changes: 20 additions & 12 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ namespace iroha {
auto OnDemandOrderingInit::createNotificationFactory(
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::chrono::milliseconds delay) {
return std::make_shared<ordering::transport::OnDemandOsClientGrpcFactory>(
std::move(async_call),
std::move(proposal_transport_factory),
[] { return std::chrono::system_clock::now(); },
delay);
}
Expand All @@ -55,6 +57,7 @@ namespace iroha {
std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::chrono::milliseconds delay,
std::vector<shared_model::interface::types::HashType> initial_hashes) {
// since top block will be the first in notifier observable, hashes of
Expand Down Expand Up @@ -174,7 +177,10 @@ namespace iroha {
.map(map_peers);

return std::make_shared<ordering::OnDemandConnectionManager>(
createNotificationFactory(std::move(async_call), delay), peers);
createNotificationFactory(std::move(async_call),
std::move(proposal_transport_factory),
delay),
peers);
}

auto OnDemandOrderingInit::createGate(
Expand All @@ -187,7 +193,6 @@ namespace iroha {
consensus::Round initial_round,
std::function<std::chrono::seconds(
const synchronizer::SynchronizationEvent &)> delay_func) {

auto map = [](auto commit) {
return matchEvent(
commit,
Expand Down Expand Up @@ -263,6 +268,7 @@ namespace iroha {
async_call,
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round,
std::function<std::chrono::seconds(
Expand All @@ -274,16 +280,18 @@ namespace iroha {
std::move(transaction_factory),
std::move(batch_parser),
std::move(transaction_batch_factory));
return createGate(ordering_service,
createConnectionManager(std::move(peer_query_factory),
std::move(async_call),
delay,
std::move(initial_hashes)),
std::make_shared<ordering::cache::OnDemandCache>(),
std::move(proposal_factory),
std::move(tx_cache),
initial_round,
std::move(delay_func));
return createGate(
ordering_service,
createConnectionManager(std::move(peer_query_factory),
std::move(async_call),
std::move(proposal_transport_factory),
delay,
std::move(initial_hashes)),
std::make_shared<ordering::cache::OnDemandCache>(),
std::move(proposal_factory),
std::move(tx_cache),
initial_round,
std::move(delay_func));
}

} // namespace network
Expand Down
9 changes: 9 additions & 0 deletions irohad/main/impl/on_demand_ordering_init.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ namespace iroha {
* Encapsulates initialization logic for on-demand ordering gate and service
*/
class OnDemandOrderingInit {
public:
using TransportFactoryType =
shared_model::interface::AbstractTransportFactory<
shared_model::interface::Proposal,
iroha::protocol::Proposal>;

private:
/**
* Creates notification factory for individual connections to peers with
Expand All @@ -36,6 +42,7 @@ namespace iroha {
auto createNotificationFactory(
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::chrono::milliseconds delay);

/**
Expand All @@ -47,6 +54,7 @@ namespace iroha {
std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::chrono::milliseconds delay,
std::vector<shared_model::interface::types::HashType> initial_hashes);

Expand Down Expand Up @@ -117,6 +125,7 @@ namespace iroha {
async_call,
std::shared_ptr<shared_model::interface::UnsafeProposalFactory>
proposal_factory,
std::shared_ptr<TransportFactoryType> proposal_transport_factory,
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
consensus::Round initial_round,
std::function<std::chrono::seconds(
Expand Down
26 changes: 25 additions & 1 deletion irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#include "ordering/impl/on_demand_ordering_gate.hpp"

#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/indexed.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/empty.hpp>
#include "ametsuchi/tx_presence_cache.hpp"
#include "common/visitor.hpp"
#include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
#include "ordering/impl/on_demand_common.hpp"

using namespace iroha;
Expand Down Expand Up @@ -108,6 +111,7 @@ OnDemandOrderingGate::processProposalRequest(
boost::optional<std::shared_ptr<shared_model::interface::Proposal>>
OnDemandOrderingGate::removeReplays(
shared_model::interface::Proposal &&proposal) const {
std::vector<bool> proposal_txs_validation_results;
auto tx_is_not_processed = [this](const auto &tx) {
auto tx_result = tx_cache_->check(tx.hash());
if (not tx_result) {
Expand All @@ -125,8 +129,28 @@ OnDemandOrderingGate::removeReplays(
return false;
});
};

shared_model::interface::TransactionBatchParserImpl batch_parser;

auto batches = batch_parser.parseBatches(proposal.transactions());
for (auto &batch : batches) {
bool batch_validation_result =
std::all_of(batch.begin(), batch.end(), tx_is_not_processed);
proposal_txs_validation_results.insert(
proposal_txs_validation_results.end(),
batch.size(),
batch_validation_result);
}

auto unprocessed_txs =
boost::adaptors::filter(proposal.transactions(), tx_is_not_processed);
proposal.transactions() | boost::adaptors::indexed()
| boost::adaptors::filtered(
[proposal_txs_validation_results =
std::move(proposal_txs_validation_results)](const auto &el) {
return proposal_txs_validation_results.at(el.index());
})
| boost::adaptors::transformed(
[](const auto &el) -> decltype(auto) { return el.value(); });

auto result = proposal_factory_->unsafeCreateProposal(
proposal.height(), proposal.createdTime(), unprocessed_txs);
Expand Down
19 changes: 17 additions & 2 deletions irohad/ordering/impl/on_demand_os_client_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ OnDemandOsClientGrpc::OnDemandOsClientGrpc(
std::unique_ptr<proto::OnDemandOrdering::StubInterface> stub,
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_factory,
std::function<TimepointType()> time_provider,
std::chrono::milliseconds proposal_request_timeout,
logger::Logger log)
: log_(std::move(log)),
stub_(std::move(stub)),
async_call_(std::move(async_call)),
proposal_factory_(std::move(proposal_factory)),
time_provider_(std::move(time_provider)),
proposal_request_timeout_(proposal_request_timeout) {}

Expand Down Expand Up @@ -64,16 +66,28 @@ OnDemandOsClientGrpc::onRequestProposal(consensus::Round round) {
if (not response.has_proposal()) {
return boost::none;
}
return ProposalType{std::make_unique<shared_model::proto::Proposal>(
std::move(response.proposal()))};
return proposal_factory_->build(response.proposal())
.match(
[&](iroha::expected::Value<
std::unique_ptr<shared_model::interface::Proposal>> &v)
-> boost::optional<OdOsNotification::ProposalType> {
return ProposalType{std::move(v).value};
},
[this](iroha::expected::Error<TransportFactoryType::Error> &error)
-> boost::optional<OdOsNotification::ProposalType> {
log_->info(error.error.error); // error
return {};
});
}

OnDemandOsClientGrpcFactory::OnDemandOsClientGrpcFactory(
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_factory,
std::function<OnDemandOsClientGrpc::TimepointType()> time_provider,
OnDemandOsClientGrpc::TimeoutType proposal_request_timeout)
: async_call_(std::move(async_call)),
proposal_factory_(std::move(proposal_factory)),
time_provider_(time_provider),
proposal_request_timeout_(proposal_request_timeout) {}

Expand All @@ -82,6 +96,7 @@ std::unique_ptr<OdOsNotification> OnDemandOsClientGrpcFactory::create(
return std::make_unique<OnDemandOsClientGrpc>(
network::createClient<proto::OnDemandOrdering>(to.address()),
async_call_,
proposal_factory_,
time_provider_,
proposal_request_timeout_,
logger::log("OnDemandOsClientGrpc"));
Expand Down
10 changes: 10 additions & 0 deletions irohad/ordering/impl/on_demand_os_client_grpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "ordering/on_demand_os_transport.hpp"

#include "interfaces/iroha_internal/abstract_transport_factory.hpp"
#include "network/impl/async_grpc_client.hpp"
#include "ordering.grpc.pb.h"

Expand All @@ -20,6 +21,10 @@ namespace iroha {
*/
class OnDemandOsClientGrpc : public OdOsNotification {
public:
using TransportFactoryType =
shared_model::interface::AbstractTransportFactory<
shared_model::interface::Proposal,
iroha::protocol::Proposal>;
using TimepointType = std::chrono::system_clock::time_point;
using TimeoutType = std::chrono::milliseconds;

Expand All @@ -31,6 +36,7 @@ namespace iroha {
std::unique_ptr<proto::OnDemandOrdering::StubInterface> stub,
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_factory,
std::function<TimepointType()> time_provider,
std::chrono::milliseconds proposal_request_timeout,
logger::Logger log = logger::log("OnDemandOsClientGrpc"));
Expand All @@ -45,15 +51,18 @@ namespace iroha {
std::unique_ptr<proto::OnDemandOrdering::StubInterface> stub_;
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call_;
std::shared_ptr<TransportFactoryType> proposal_factory_;
std::function<TimepointType()> time_provider_;
std::chrono::milliseconds proposal_request_timeout_;
};

class OnDemandOsClientGrpcFactory : public OdOsNotificationFactory {
public:
using TransportFactoryType = OnDemandOsClientGrpc::TransportFactoryType;
OnDemandOsClientGrpcFactory(
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call,
std::shared_ptr<TransportFactoryType> proposal_factory,
std::function<OnDemandOsClientGrpc::TimepointType()> time_provider,
OnDemandOsClientGrpc::TimeoutType proposal_request_timeout);

Expand All @@ -69,6 +78,7 @@ namespace iroha {
private:
std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
async_call_;
std::shared_ptr<TransportFactoryType> proposal_factory_;
std::function<OnDemandOsClientGrpc::TimepointType()> time_provider_;
std::chrono::milliseconds proposal_request_timeout_;
};
Expand Down
5 changes: 3 additions & 2 deletions irohad/torii/impl/command_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ namespace torii {
// prepend initial status
.start_with(initial_status)
// select statuses with requested hash
.filter(
[&](auto response) { return response->transactionHash() == hash; })
.filter([hash](auto response) {
return response->transactionHash() == hash;
})
// successfully complete the observable if final status is received.
// final status is included in the observable
.template lift<ResponsePtrType>([](rxcpp::subscriber<ResponsePtrType>
Expand Down
Loading

0 comments on commit 46c30c6

Please sign in to comment.