diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index bd4d89f5b70..ce919998c22 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -39,6 +39,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.proxy.Socks5ProxyHandler; @@ -59,11 +60,11 @@ */ public class NettyClient extends AbstractClient { - private static final String SOCKS_PROXY_HOST = "socksProxyHost"; + public static final String SOCKS_PROXY_HOST = "socksProxyHost"; - private static final String SOCKS_PROXY_PORT = "socksProxyPort"; + public static final String SOCKS_PROXY_PORT = "socksProxyPort"; - private static final String DEFAULT_SOCKS_PROXY_PORT = "1080"; + public static final String DEFAULT_SOCKS_PROXY_PORT = "1080"; /** * netty client bootstrap @@ -123,33 +124,38 @@ protected void initBootstrap(NettyClientHandler nettyClientHandler) { @Override protected void initChannel(SocketChannel ch) throws Exception { - int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); + URL url = getUrl(); + ChannelPipeline pipeline = ch.pipeline(); + int heartbeatInterval = UrlUtils.getHeartbeat(url); if (sslContext != null) { - ch.pipeline().addLast("negotiation", new SslClientTlsHandler(sslContext)); + pipeline.addLast("negotiation", new SslClientTlsHandler(sslContext)); } - NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); - ch.pipeline() // .addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug + NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), url, NettyClient.this); + pipeline // .addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); - String socksProxyHost = - ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST); - if (socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) { - int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty( - getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); - Socks5ProxyHandler socks5ProxyHandler = - new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); - ch.pipeline().addFirst(socks5ProxyHandler); - } + configureSocks5Proxy(url, pipeline); } }); } - private boolean isFilteredAddress(String host) { + static void configureSocks5Proxy(URL url, ChannelPipeline pipeline) { + String socksProxyHost = ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SOCKS_PROXY_HOST); + if (StringUtils.isNotBlank(socksProxyHost) && !isFilteredAddress(url.getHost())) { + int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty( + url.getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); + Socks5ProxyHandler socks5ProxyHandler = + new Socks5ProxyHandler(InetSocketAddress.createUnresolved(socksProxyHost, socksProxyPort)); + pipeline.addFirst(socks5ProxyHandler); + } + } + + static boolean isFilteredAddress(String host) { // filter local address return StringUtils.isEquals(NetUtils.getLocalHost(), host) || NetUtils.isLocalHost(host); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java index 8fcec6ddc1d..40540c1f8df 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java @@ -48,6 +48,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT; +import static org.apache.dubbo.remoting.transport.netty4.NettyClient.configureSocks5Proxy; import static org.apache.dubbo.remoting.transport.netty4.NettyEventLoopFactory.socketChannelClass; public final class NettyConnectionClient extends AbstractNettyConnectionClient { @@ -70,6 +71,7 @@ protected void initConnectionClient() { super.initConnectionClient(); } + @Override protected void initBootstrap() { channelInitializedPromiseRef = new AtomicReference<>(); Bootstrap bootstrap = new Bootstrap(); @@ -87,8 +89,9 @@ protected void initBootstrap() { bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { - NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch, getUrl(), getChannelHandler()); + URL url = getUrl(); ChannelPipeline pipeline = ch.pipeline(); + NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch, url, getChannelHandler()); NettySslContextOperator nettySslContextOperator = new NettySslContextOperator(); if (sslContext != null) { @@ -97,13 +100,13 @@ protected void initChannel(SocketChannel ch) { // pipeline.addLast("logging", new LoggingHandler(LogLevel.INFO)); //for debug - int heartbeat = UrlUtils.getHeartbeat(getUrl()); + int heartbeat = UrlUtils.getHeartbeat(url); pipeline.addLast("client-idle-handler", new IdleStateHandler(heartbeat, 0, 0, MILLISECONDS)); pipeline.addLast(Constants.CONNECTION_HANDLER_NAME, connectionHandler); NettyConfigOperator operator = new NettyConfigOperator(nettyChannel, getChannelHandler()); - protocol.configClientPipeline(getUrl(), operator, nettySslContextOperator); + protocol.configClientPipeline(url, operator, nettySslContextOperator); ChannelHandlerContext http2FrameCodecHandlerCtx = pipeline.context(Http2FrameCodec.class); if (http2FrameCodecHandlerCtx == null) { @@ -124,7 +127,8 @@ protected void initChannel(SocketChannel ch) { // set null but do not close this client, it will be reconnecting in the future ch.closeFuture().addListener(channelFuture -> clearNettyChannel()); - // TODO support Socks5 + // support Socks5 + configureSocks5Proxy(url, pipeline); // set channel initialized promise to success if necessary. Promise channelInitializedPromise = channelInitializedPromiseRef.get(); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClientTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClientTest.java new file mode 100644 index 00000000000..8f49300f0a4 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClientTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.transport.netty4; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.url.component.ServiceConfigURL; +import org.apache.dubbo.config.ApplicationConfig; +import org.apache.dubbo.config.context.ConfigManager; +import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.rpc.model.ModuleModel; + +import java.net.InetSocketAddress; +import java.util.Map; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.proxy.Socks5ProxyHandler; +import org.junit.jupiter.api.Test; + +import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT; +import static org.apache.dubbo.remoting.transport.netty4.NettyClient.SOCKS_PROXY_HOST; +import static org.apache.dubbo.remoting.transport.netty4.NettyClient.SOCKS_PROXY_PORT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; + +class NettyConnectionClientTest { + + @Test + void shouldInsertSocks5ProxyHandlerWhenSocksProxyConfigured() throws Exception { + URL url = createUrl("192.168.1.10", 20880, "proxy.example.com", "2081"); + EmbeddedChannel channel = new EmbeddedChannel(); + + NettyClient.configureSocks5Proxy(url, channel.pipeline()); + + Socks5ProxyHandler handler = channel.pipeline().get(Socks5ProxyHandler.class); + assertInstanceOf(Socks5ProxyHandler.class, handler); + assertEquals("proxy.example.com", ((InetSocketAddress) handler.proxyAddress()).getHostString()); + assertEquals(2081, ((InetSocketAddress) handler.proxyAddress()).getPort()); + } + + @Test + void shouldNotInsertSocks5ProxyHandlerWhenSocksProxyHostBlank() throws Exception { + URL url = createUrl("192.168.1.10", 20880, " ", "2081"); + EmbeddedChannel channel = new EmbeddedChannel(); + + NettyClient.configureSocks5Proxy(url, channel.pipeline()); + + assertNull(channel.pipeline().get(Socks5ProxyHandler.class)); + } + + @Test + void shouldNotInsertSocks5ProxyHandlerWhenTargetAddressIsLocal() throws Exception { + URL url = createUrl("127.0.0.1", 20880, "proxy.example.com", "2081"); + EmbeddedChannel channel = new EmbeddedChannel(); + + NettyClient.configureSocks5Proxy(url, channel.pipeline()); + + assertNull(channel.pipeline().get(Socks5ProxyHandler.class)); + } + + private URL createUrl(String host, int port, String socksProxyHost, String socksProxyPort) { + URL url = new ServiceConfigURL("tri", host, port); + + ApplicationModel applicationModel = ApplicationModel.defaultModel(); + ApplicationConfig applicationConfig = new ApplicationConfig("provider-app"); + applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT); + applicationModel.getApplicationConfigManager().setApplication(applicationConfig); + ConfigManager configManager = new ConfigManager(applicationModel); + configManager.setApplication(applicationConfig); + configManager.getApplication(); + applicationModel.setConfigManager(configManager); + + Map properties = + applicationModel.modelEnvironment().getSystemConfiguration().getProperties(); + if (socksProxyHost != null) { + properties.put(SOCKS_PROXY_HOST, socksProxyHost); + } + if (socksProxyPort != null) { + properties.put(SOCKS_PROXY_PORT, socksProxyPort); + } + + url = url.setScopeModel(applicationModel); + ModuleModel moduleModel = applicationModel.getDefaultModule(); + return url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel); + } +}