3838#include " InvocationContext.h"
3939#include " LoggerImpl.h"
4040#include " MessageAccessor.h"
41+ #include " NamingScheme.h"
4142#include " Signature.h"
4243#include " rocketmq/MQMessageExt.h"
4344#include " rocketmq/MessageListener.h"
@@ -160,7 +161,24 @@ void ClientImpl::getRouteFor(const std::string& topic,
160161void ClientImpl::setAccessPoint (rmq::Endpoints* endpoints) {
161162 std::vector<std::pair<std::string, std::uint16_t >> pairs;
162163 {
163- std::vector<std::string> name_server_list = name_server_resolver_->resolve ();
164+ std::string naming_address = name_server_resolver_->resolve ();
165+ absl::string_view host_port_csv;
166+
167+ if (absl::StartsWith (naming_address, NamingScheme::DnsPrefix)) {
168+ endpoints->set_scheme (rmq::AddressScheme::DOMAIN_NAME);
169+ host_port_csv = absl::StripPrefix (naming_address, NamingScheme::DnsPrefix);
170+ } else if (absl::StartsWith (naming_address, NamingScheme::IPv4Prefix)) {
171+ endpoints->set_scheme (rmq::AddressScheme::IPv4);
172+ host_port_csv = absl::StripPrefix (naming_address, NamingScheme::IPv4Prefix);
173+ } else if (absl::StartsWith (naming_address, NamingScheme::IPv6Prefix)) {
174+ endpoints->set_scheme (rmq::AddressScheme::IPv6);
175+ host_port_csv = absl::StripPrefix (naming_address, NamingScheme::IPv6Prefix);
176+ } else {
177+ SPDLOG_WARN (" Unsupported naming scheme" );
178+ }
179+
180+ std::vector<std::string> name_server_list = absl::StrSplit (host_port_csv, ' ,' );
181+
164182 for (const auto & name_server_item : name_server_list) {
165183 std::string::size_type pos = name_server_item.rfind (' :' );
166184 if (std::string::npos == pos) {
@@ -179,20 +197,12 @@ void ClientImpl::setAccessPoint(rmq::Endpoints* endpoints) {
179197 address->set_host (host_port.first );
180198 endpoints->mutable_addresses ()->AddAllocated (address);
181199 }
182-
183- if (MixAll::isIPv4 (pairs.begin ()->first )) {
184- endpoints->set_scheme (rmq::AddressScheme::IPv4);
185- } else if (absl::StrContains (pairs.begin ()->first , ' :' )) {
186- endpoints->set_scheme (rmq::AddressScheme::IPv6);
187- } else {
188- endpoints->set_scheme (rmq::AddressScheme::DOMAIN_NAME);
189- }
190200 }
191201}
192202
193203void ClientImpl::fetchRouteFor (const std::string& topic,
194204 const std::function<void (const std::error_code&, const TopicRouteDataPtr&)>& cb) {
195- std::string name_server = name_server_resolver_->current ();
205+ std::string name_server = name_server_resolver_->resolve ();
196206 if (name_server.empty ()) {
197207 SPDLOG_WARN (" No name server available" );
198208 return ;
@@ -201,7 +211,7 @@ void ClientImpl::fetchRouteFor(const std::string& topic,
201211 auto callback = [this , topic, name_server, cb](const std::error_code& ec, const TopicRouteDataPtr& route) {
202212 if (ec) {
203213 SPDLOG_WARN (" Failed to resolve route for topic={} from {}" , topic, name_server);
204- std::string name_server_changed = name_server_resolver_->next ();
214+ std::string name_server_changed = name_server_resolver_->resolve ();
205215 if (!name_server_changed.empty ()) {
206216 SPDLOG_INFO (" Change current name server from {} to {}" , name_server, name_server_changed);
207217 }
0 commit comments