Skip to content

Commit

Permalink
Support custom resolver (#361)
Browse files Browse the repository at this point in the history
* The custom resolver can be invoked before the default resolver in
order to resolve the endpoint in its own way.
  • Loading branch information
greensky00 authored Jun 17, 2022
1 parent 031827b commit 3c44584
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 52 deletions.
91 changes: 72 additions & 19 deletions include/libnuraft/asio_service_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.

#include <functional>
#include <string>
#include <system_error>

namespace nuraft {

Expand Down Expand Up @@ -59,6 +60,12 @@ struct asio_service_meta_cb_params {
uint64_t commit_idx_;
};

/**
* Response callback function for customer resolvers.
*/
using asio_service_custom_resolver_response =
std::function< void(const std::string&, const std::string&, std::error_code) >;

/**
* Options used for initialization of Asio service.
*/
Expand All @@ -76,56 +83,102 @@ struct asio_service_options {
, read_resp_meta_(nullptr)
, invoke_resp_cb_on_empty_meta_(true)
, verify_sn_(nullptr)
, custom_resolver_(nullptr)
{}

// Number of ASIO worker threads.
// If zero, it will be automatically set to number of cores.
/**
* Number of ASIO worker threads.
* If zero, it will be automatically set to number of cores.
*/
size_t thread_pool_size_;

// Lifecycle callback function on worker thread start.
/**
* Lifecycle callback function on worker thread start.
*/
std::function< void(uint32_t) > worker_start_;

// Lifecycle callback function on worker thread stop.
/**
* Lifecycle callback function on worker thread stop.
*/
std::function< void(uint32_t) > worker_stop_;

// If `true`, enable SSL/TLS secure connection.
/**
* If `true`, enable SSL/TLS secure connection.
*/
bool enable_ssl_;

// If `true`, skip certificate verification.
/**
* If `true`, skip certificate verification.
*/
bool skip_verification_;

// Path to certification & key files.
/**
* Path to server certificate file.
*/
std::string server_cert_file_;

/**
* Path to server key file.
*/
std::string server_key_file_;

/**
* Path to root certificate file.
*/
std::string root_cert_file_;

// Callback function for writing Raft RPC request metadata.
/**
* Callback function for writing Raft RPC request metadata.
*/
std::function< std::string(const asio_service_meta_cb_params&) > write_req_meta_;

// Callback function for reading and verifying Raft RPC request metadata.
// If it returns false, the request will be discarded.
/**
* Callback function for reading and verifying Raft RPC request metadata.
* If it returns `false`, the request will be discarded.
*/
std::function< bool( const asio_service_meta_cb_params&,
const std::string& ) > read_req_meta_;

// If `true`, it will invoke `read_req_meta_` even though
// the received meta is empty.
/**
* If `true`, it will invoke `read_req_meta_` even though
* the received meta is empty.
*/
bool invoke_req_cb_on_empty_meta_;

// Callback function for writing Raft RPC response metadata.
/**
* Callback function for writing Raft RPC response metadata.
*/
std::function< std::string(const asio_service_meta_cb_params&) > write_resp_meta_;

// Callback function for reading and verifying Raft RPC response metadata.
// If it returns false, the response will be ignored.
/**
* Callback function for reading and verifying Raft RPC response metadata.
* If it returns false, the response will be ignored.
*/
std::function< bool( const asio_service_meta_cb_params&,
const std::string& ) > read_resp_meta_;

// If `true`, it will invoke `read_resp_meta_` even though
// the received meta is empty.
/**
* If `true`, it will invoke `read_resp_meta_` even though
* the received meta is empty.
*/
bool invoke_resp_cb_on_empty_meta_;

// Callback function for verifying certificate subject name.
// If not given, subject name will not be verified.
/**
* Callback function for verifying certificate subject name.
* If not given, subject name will not be verified.
*/
std::function< bool(const std::string&) > verify_sn_;

/**
* Custom IP address resolver. If given, it will be invoked
* before the connection is established.
*
* If you want to selectively bypass some hosts, just pass the given
* host and port to the response function as they are.
*/
std::function< void( const std::string&,
const std::string&,
asio_service_custom_resolver_response ) > custom_resolver_;
};

}
Expand Down
105 changes: 72 additions & 33 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1017,39 +1017,37 @@ class asio_rpc_client
break;
}

asio::ip::tcp::resolver::query q
( host_, port_, asio::ip::tcp::resolver::query::all_matching );

resolver_.async_resolve
( q,
[self, this, req, when_done, send_timeout_ms]
( std::error_code err,
asio::ip::tcp::resolver::iterator itor ) -> void
{
if (!err) {
asio::async_connect
( socket(),
itor,
std::bind( &asio_rpc_client::connected,
self,
req,
when_done,
send_timeout_ms,
std::placeholders::_1,
std::placeholders::_2 ) );
} else {
ptr<resp_msg> rsp;
ptr<rpc_exception> except
( cs_new<rpc_exception>
( lstrfmt("failed to resolve host %s "
"due to error %d, %s")
.fmt( host_.c_str(),
err.value(),
err.message().c_str() ),
req ) );
when_done(rsp, except);
}
} );
if (impl_->get_options().custom_resolver_) {
impl_->get_options().custom_resolver_(
host_,
port_,
[this, self, req, when_done, send_timeout_ms]
( const std::string& resolved_host,
const std::string& resolved_port,
std::error_code err ) {
if (!err) {
p_in( "custom resolver: %s:%s to %s:%s",
host_.c_str(), port_.c_str(),
resolved_host.c_str(), resolved_port.c_str() );
execute_resolver(self, req, resolved_host, resolved_port,
when_done, send_timeout_ms);
} else {
ptr<resp_msg> rsp;
ptr<rpc_exception> except
( cs_new<rpc_exception>
( lstrfmt("failed to resolve host %s by given "
"custom resolver "
"due to error %d, %s")
.fmt( host_.c_str(),
err.value(),
err.message().c_str() ),
req ) );
when_done(rsp, except);
}
} );
} else {
execute_resolver(self, req, host_, port_, when_done, send_timeout_ms);
}
return;
}

Expand Down Expand Up @@ -1174,6 +1172,47 @@ class asio_rpc_client
std::placeholders::_2 ) );
}
private:
void execute_resolver(ptr<asio_rpc_client> self,
ptr<req_msg> req,
const std::string& host,
const std::string& port,
rpc_handler when_done,
uint64_t send_timeout_ms) {
asio::ip::tcp::resolver::query q
( host, port, asio::ip::tcp::resolver::query::all_matching );

resolver_.async_resolve
( q,
[self, this, req, when_done, host, port, send_timeout_ms]
( std::error_code err,
asio::ip::tcp::resolver::iterator itor ) -> void
{
if (!err) {
asio::async_connect
( socket(),
itor,
std::bind( &asio_rpc_client::connected,
self,
req,
when_done,
send_timeout_ms,
std::placeholders::_1,
std::placeholders::_2 ) );
} else {
ptr<resp_msg> rsp;
ptr<rpc_exception> except
( cs_new<rpc_exception>
( lstrfmt("failed to resolve host %s "
"due to error %d, %s")
.fmt( host.c_str(),
err.value(),
err.message().c_str() ),
req ) );
when_done(rsp, except);
}
} );
}

void set_busy_flag(bool to) {
if (to == true) {
bool exp = false;
Expand Down
83 changes: 83 additions & 0 deletions tests/unit/asio_service_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -2463,6 +2463,86 @@ int parallel_log_append_test() {
return 0;
}

int custom_resolver_test() {
reset_log_files();

std::string s1_addr = "S1:1234";
std::string s2_addr = "S2:1234";
std::string s3_addr = "S3:1234";

RaftAsioPkg s1(1, s1_addr);
RaftAsioPkg s2(2, s2_addr);
RaftAsioPkg s3(3, s3_addr);
std::vector<RaftAsioPkg*> pkgs = {&s1, &s2, &s3};

// Enable custom resolver.
s1.useCustomResolver = s2.useCustomResolver = s3.useCustomResolver = true;

_msg("launching asio-raft servers\n");
CHK_Z( launch_servers(pkgs, false) );

_msg("organizing raft group\n");
CHK_Z( make_group(pkgs) );

// Set async mode.
for (auto& entry: pkgs) {
RaftAsioPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
param.parallel_log_appending_ = true;
pp->raftServer->update_params(param);
}

// Append messages asynchronously.
const size_t NUM = 10;
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
std::list<ulong> idx_list;
std::mutex idx_list_lock;
auto do_async_append = [&]() {
handlers.clear();
idx_list.clear();
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

cmd_result< ptr<buffer> >::handler_type my_handler =
std::bind( async_handler,
&idx_list,
&idx_list_lock,
std::placeholders::_1,
std::placeholders::_2 );
ret->when_ready( my_handler );

handlers.push_back(ret);
}
};
do_async_append();

TestSuite::sleep_sec(1, "wait for replication");

// Now all async handlers should have result.
{
std::lock_guard<std::mutex> l(idx_list_lock);
CHK_EQ(NUM, idx_list.size());
}

// State machine should be identical.
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

s1.raftServer->shutdown();
s2.raftServer->shutdown();
s3.raftServer->shutdown();
TestSuite::sleep_sec(1, "shutting down");

SimpleLogger::shutdown();
return 0;
}


} // namespace asio_service_test;
using namespace asio_service_test;

Expand Down Expand Up @@ -2565,6 +2645,9 @@ int main(int argc, char** argv) {
ts.doTest( "parallel log append test",
parallel_log_append_test );

ts.doTest( "custom resolver test",
custom_resolver_test );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
#else
Expand Down
Loading

0 comments on commit 3c44584

Please sign in to comment.