77
88#include " config.h"
99
10+ #include < Common/getMultipleKeysFromConfig.h>
1011#include < Common/setThreadName.h>
1112
1213#include < Poco/Util/Application.h>
@@ -20,11 +21,11 @@ namespace DB
2021
2122namespace ErrorCodes
2223{
23- extern const int ACCESS_DENIED;
24- extern const int RAFT_ERROR;
25- extern const int NO_ELEMENTS_IN_CONFIG;
26- extern const int SUPPORT_IS_DISABLED;
27- extern const int LOGICAL_ERROR;
24+ extern const int ACCESS_DENIED;
25+ extern const int RAFT_ERROR;
26+ extern const int NO_ELEMENTS_IN_CONFIG;
27+ extern const int SUPPORT_IS_DISABLED;
28+ extern const int LOGICAL_ERROR;
2829}
2930
3031namespace
@@ -125,16 +126,19 @@ MetaStoreServer::MetaStoreServer(
125126 : server_id(server_id_)
126127 , coordination_settings(coordination_settings_)
127128 , state_machine(nuraft::cs_new<MetaStateMachine>(
128- snapshots_queue,
129- getSnapshotsPathFromConfig (config, standalone_metastore),
130- getStoragePathFromConfig(config, standalone_metastore),
131- coordination_settings))
129+ snapshots_queue,
130+ getSnapshotsPathFromConfig (config, standalone_metastore),
131+ getStoragePathFromConfig(config, standalone_metastore),
132+ coordination_settings))
132133 , state_manager(nuraft::cs_new<MetaStateManager>(server_id, " metastore_server" , config, coordination_settings, standalone_metastore))
133134 , log(&Poco::Logger::get (" MetaStoreServer" ))
134135 , namespace_whitelist(getNamespaceWhitelistFromConfig(config))
135136{
136137 if (coordination_settings->quorum_reads )
137138 LOG_WARNING (log, " Quorum reads enabled, MetaStoreServer will work slower." );
139+
140+ enable_ipv6 = config.getBool (" metastore_server.enable_ipv6" , true );
141+ listen_hosts = DB::getMultipleValuesFromConfig (config, " " , " listen_host" );
138142}
139143
140144void MetaStoreServer::startup ()
@@ -158,8 +162,10 @@ void MetaStoreServer::startup()
158162 else
159163 {
160164 params.heart_beat_interval_ = static_cast <nuraft::int32>(coordination_settings->heart_beat_interval_ms .totalMilliseconds ());
161- params.election_timeout_lower_bound_ = static_cast <nuraft::int32>(coordination_settings->election_timeout_lower_bound_ms .totalMilliseconds ());
162- params.election_timeout_upper_bound_ = static_cast <nuraft::int32>(coordination_settings->election_timeout_upper_bound_ms .totalMilliseconds ());
165+ params.election_timeout_lower_bound_
166+ = static_cast <nuraft::int32>(coordination_settings->election_timeout_lower_bound_ms .totalMilliseconds ());
167+ params.election_timeout_upper_bound_
168+ = static_cast <nuraft::int32>(coordination_settings->election_timeout_upper_bound_ms .totalMilliseconds ());
163169 }
164170
165171 params.reserved_log_items_ = static_cast <nuraft::int32>(coordination_settings->reserved_log_items );
@@ -179,8 +185,8 @@ void MetaStoreServer::startup()
179185#if USE_SSL
180186 setSSLParams (asio_opts);
181187#else
182- throw Exception{" SSL support for NuRaft is disabled because proton was built without SSL support. " ,
183- ErrorCodes::SUPPORT_IS_DISABLED};
188+ throw Exception{
189+ " SSL support for NuRaft is disabled because proton was built without SSL support. " , ErrorCodes::SUPPORT_IS_DISABLED};
184190#endif
185191 }
186192
@@ -190,27 +196,34 @@ void MetaStoreServer::startup()
190196 throw Exception (ErrorCodes::RAFT_ERROR, " Cannot allocate RAFT instance" );
191197}
192198
193- void MetaStoreServer::launchRaftServer (
194- const nuraft::raft_params & params,
195- const nuraft::asio_service::options & asio_opts)
199+ void MetaStoreServer::launchRaftServer (const nuraft::raft_params & params, const nuraft::asio_service::options & asio_opts)
196200{
197201 nuraft::raft_server::init_options init_options;
198202
199203 init_options.skip_initial_election_timeout_ = state_manager->shouldStartAsFollower ();
200204 init_options.start_server_in_constructor_ = false ;
201- init_options.raft_callback_ = [this ] (nuraft::cb_func::Type type, nuraft::cb_func::Param * param)
202- {
203- return callbackFunc (type, param);
204- };
205+ init_options.raft_callback_ = [this ](nuraft::cb_func::Type type, nuraft::cb_func::Param * param) { return callbackFunc (type, param); };
205206
206207 nuraft::ptr<nuraft::logger> logger = nuraft::cs_new<LoggerWrapper>(" RaftInstance" , coordination_settings->raft_logs_level );
207208 asio_service = nuraft::cs_new<nuraft::asio_service>(asio_opts, logger);
208- asio_listener = asio_service->create_rpc_listener (state_manager->getPort (), logger);
209209
210- if (!asio_listener)
211- return ;
210+ if (listen_hosts.empty ())
211+ {
212+ auto asio_listener = asio_service->create_rpc_listener (state_manager->getPort (), logger, enable_ipv6);
213+ if (!asio_listener)
214+ throw Exception (ErrorCodes::RAFT_ERROR, " Cannot create MetaStore Raft listener on port {}" , state_manager->getPort ());
212215
213- std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners{asio_listener};
216+ asio_listeners.emplace_back (std::move (asio_listener));
217+ }
218+ else
219+ {
220+ for (const auto & listen_host : listen_hosts)
221+ {
222+ auto asio_listener = asio_service->create_rpc_listener (listen_host, state_manager->getPort (), logger);
223+ if (asio_listener)
224+ asio_listeners.emplace_back (std::move (asio_listener));
225+ }
226+ }
214227
215228 nuraft::ptr<nuraft::delayed_task_scheduler> scheduler = asio_service;
216229 nuraft::ptr<nuraft::rpc_client_factory> rpc_cli_factory = asio_service;
@@ -219,14 +232,15 @@ void MetaStoreServer::launchRaftServer(
219232 nuraft::ptr<nuraft::state_machine> casted_state_machine = state_machine;
220233
221234 // / raft_server creates unique_ptr from it
222- nuraft::context * ctx = new nuraft::context (
223- casted_state_manager, casted_state_machine,
224- asio_listeners, logger, rpc_cli_factory, scheduler, params);
235+ nuraft::context * ctx
236+ = new nuraft::context (casted_state_manager, casted_state_machine, asio_listeners, logger, rpc_cli_factory, scheduler, params);
225237
226238 raft_instance = nuraft::cs_new<nuraft::raft_server>(ctx, init_options);
227239
228240 raft_instance->start_server (init_options.skip_initial_election_timeout_ );
229- asio_listener->listen (raft_instance);
241+
242+ for (const auto & asio_listener : asio_listeners)
243+ asio_listener->listen (raft_instance);
230244}
231245
232246void MetaStoreServer::shutdownRaftServer ()
@@ -242,10 +256,13 @@ void MetaStoreServer::shutdownRaftServer()
242256 raft_instance->shutdown ();
243257 raft_instance.reset ();
244258
245- if ( asio_listener)
259+ for ( const auto & asio_listener : asio_listeners )
246260 {
247- asio_listener->stop ();
248- asio_listener->shutdown ();
261+ if (asio_listener)
262+ {
263+ asio_listener->stop ();
264+ asio_listener->shutdown ();
265+ }
249266 }
250267
251268 if (asio_service)
@@ -317,7 +334,7 @@ std::vector<String> MetaStoreServer::localMultiGetByKeys(const std::vector<Strin
317334 return values;
318335}
319336
320- std::vector<std::pair<String, String> > MetaStoreServer::localRangeGetByNamespace (const String & prefix_, const String & namespace_) const
337+ std::vector<std::pair<String, String>> MetaStoreServer::localRangeGetByNamespace (const String & prefix_, const String & namespace_) const
321338{
322339 Coordination::KVListResponse resp;
323340 Coordination::KVNamespaceAndPrefixHelper helper (checkNamespace (namespace_), prefix_);
@@ -331,9 +348,7 @@ Coordination::KVResponsePtr MetaStoreServer::putRequest(const Coordination::KVRe
331348 Coordination::KVNamespaceAndPrefixHelper helper (checkNamespace (namespace_));
332349 const auto & entry = getBufferFromKVRequest (helper.handle (request));
333350 auto ret = raft_instance->append_entries ({entry});
334- if (ret->get_accepted () &&
335- ret->get_result_code () == nuraft::cmd_result_code::OK &&
336- ret->get ())
351+ if (ret->get_accepted () && ret->get_result_code () == nuraft::cmd_result_code::OK && ret->get ())
337352 return helper.handle (parseKVResponse (ret->get ()));
338353
339354 // / error response
@@ -378,25 +393,22 @@ nuraft::cb_func::ReturnCode MetaStoreServer::callbackFunc(nuraft::cb_func::Type
378393 if (next_index < last_commited || next_index - last_commited <= 1 )
379394 commited_store = true ;
380395
381- auto set_initialized = [this ] ()
382- {
396+ auto set_initialized = [this ]() {
383397 std::unique_lock lock (initialized_mutex);
384398 initialized_flag = true ;
385399 initialized_cv.notify_all ();
386400 };
387401
388402 switch (type)
389403 {
390- case nuraft::cb_func::BecomeLeader:
391- {
404+ case nuraft::cb_func::BecomeLeader: {
392405 // / We become leader and store is empty or we already committed it
393406 if (commited_store || initial_batch_committed)
394407 set_initialized ();
395408 return nuraft::cb_func::ReturnCode::Ok;
396409 }
397410 case nuraft::cb_func::BecomeFollower:
398- case nuraft::cb_func::GotAppendEntryReqFromLeader:
399- {
411+ case nuraft::cb_func::GotAppendEntryReqFromLeader: {
400412 if (param->leaderId != -1 )
401413 {
402414 auto leader_index = raft_instance->get_leader_committed_log_idx ();
@@ -410,13 +422,11 @@ nuraft::cb_func::ReturnCode MetaStoreServer::callbackFunc(nuraft::cb_func::Type
410422 }
411423 return nuraft::cb_func::ReturnCode::Ok;
412424 }
413- case nuraft::cb_func::BecomeFresh:
414- {
425+ case nuraft::cb_func::BecomeFresh: {
415426 set_initialized (); // / We are fresh follower, ready to serve requests.
416427 return nuraft::cb_func::ReturnCode::Ok;
417428 }
418- case nuraft::cb_func::InitialBatchCommited:
419- {
429+ case nuraft::cb_func::InitialBatchCommited: {
420430 if (param->myId == param->leaderId ) // / We have committed our log store and we are leader, ready to serve requests.
421431 set_initialized ();
422432 initial_batch_committed = true ;
0 commit comments