Skip to content

Commit

Permalink
grpc client retry policy enable
Browse files Browse the repository at this point in the history
Signed-off-by: Mikhail Boldyrev <[email protected]>
  • Loading branch information
MBoldyrev committed Jul 24, 2019
1 parent 848c29c commit 1657d3a
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "interfaces/iroha_internal/transaction_batch.hpp"
#include "interfaces/transaction.hpp"
#include "logger/logger.hpp"
#include "network/impl/grpc_channel_builder.hpp"
#include "validators/field_validator.hpp"

using namespace iroha;
Expand All @@ -22,8 +23,7 @@ using namespace iroha::network;
using iroha::ConstRefState;
namespace {
auto default_sender_factory = [](const shared_model::interface::Peer &to) {
return transport::MstTransportGrpc::NewStub(
grpc::CreateChannel(to.address(), grpc::InsecureChannelCredentials()));
return createClient<transport::MstTransportGrpc>(to.address());
};
}
void sendStateAsyncImpl(
Expand Down
2 changes: 1 addition & 1 deletion irohad/network/impl/block_loader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ BlockLoaderImpl::findPeer(const shared_model::crypto::PublicKey &pubkey) {
return *it;
}

proto::Loader::Stub &BlockLoaderImpl::getPeerStub(
proto::Loader::StubInterface &BlockLoaderImpl::getPeerStub(
const shared_model::interface::Peer &peer) {
auto it = peer_connections_.find(peer.address());
if (it == peer_connections_.end()) {
Expand Down
4 changes: 2 additions & 2 deletions irohad/network/impl/block_loader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ namespace iroha {
* @param peer for connecting
* @return RPC stub
*/
proto::Loader::Stub &getPeerStub(
proto::Loader::StubInterface &getPeerStub(
const shared_model::interface::Peer &peer);

std::unordered_map<shared_model::interface::types::AddressType,
std::unique_ptr<proto::Loader::Stub>>
std::unique_ptr<proto::Loader::StubInterface>>
peer_connections_;
std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory_;
shared_model::proto::ProtoBlockFactory block_factory_;
Expand Down
58 changes: 49 additions & 9 deletions irohad/network/impl/grpc_channel_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,67 @@
#ifndef IROHA_GRPC_CHANNEL_BUILDER_HPP
#define IROHA_GRPC_CHANNEL_BUILDER_HPP

#include <limits>
#include <memory>

#include <grpc++/grpc++.h>
#include <boost/format.hpp>

namespace iroha {
namespace network {
namespace details {
constexpr unsigned int kMaxRequestMessageBytes =
std::numeric_limits<int>::max();
constexpr unsigned int kMaxResponseMessageBytes =
std::numeric_limits<int>::max();

template <typename T>
grpc::ChannelArguments getChannelArguments() {
grpc::ChannelArguments args;
args.SetServiceConfigJSON((boost::format(R"(
{
"methodConfig": [ {
"name": [
{ "service": "%1%" }
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "5s",
"maxBackoff": "120s",
"backoffMultiplier": 1.6,
"retryableStatusCodes": [
"UNKNOWN",
"DEADLINE_EXCEEDED",
"ABORTED",
"INTERNAL"
]
},
"maxRequestMessageBytes": %2%,
"maxResponseMessageBytes": %3%
} ]
})") % T::service_full_name()
% kMaxRequestMessageBytes
% kMaxResponseMessageBytes)
.str());
return args;
}
} // namespace details

/**
* Creates client which is capable of sending and receiving
* messages of INT_MAX bytes size
* messages of INT_MAX bytes size with retry policy (see
* details::getChannelArguments()).
* @tparam T type for gRPC stub, e.g. proto::Yac
* @param address ip address for connection, ipv4:port
* @return gRPC stub of parametrized type
*/
template <typename T>
auto createClient(const grpc::string &address) {
// in order to bypass built-in limitation of gRPC message size
grpc::ChannelArguments args;
args.SetMaxSendMessageSize(INT_MAX);
args.SetMaxReceiveMessageSize(INT_MAX);

return T::NewStub(grpc::CreateCustomChannel(
address, grpc::InsecureChannelCredentials(), args));
std::unique_ptr<typename T::StubInterface> createClient(
const grpc::string &address) {
return T::NewStub(
grpc::CreateCustomChannel(address,
grpc::InsecureChannelCredentials(),
details::getChannelArguments<T>()));
}
} // namespace network
} // namespace iroha
Expand Down
2 changes: 2 additions & 0 deletions irohad/torii/impl/query_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "torii/query_client.hpp"

#include <grpc++/channel.h>
#include <grpc++/grpc++.h>
#include "network/impl/grpc_channel_builder.hpp"

namespace torii_utils {
Expand Down
7 changes: 3 additions & 4 deletions irohad/torii/query_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#include <endpoint.grpc.pb.h>
#include <endpoint.pb.h>
#include <grpc++/channel.h>
#include <grpc++/grpc++.h>
#include <memory>
#include <thread>

Expand Down Expand Up @@ -45,12 +43,13 @@ namespace torii_utils {

std::string ip_;
size_t port_;
std::unique_ptr<iroha::protocol::QueryService_v1::Stub> stub_;
std::unique_ptr<iroha::protocol::QueryService_v1::StubInterface> stub_;
};

/**
* QueryAsyncClient
// Impelent here if we need this.
// Implement here if we need this.
class QueryAsyncClient {
};
Expand Down

0 comments on commit 1657d3a

Please sign in to comment.