diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java index dc1fe78f2fa..104c00a23a7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java @@ -28,6 +28,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; @@ -35,6 +36,7 @@ import java.util.Enumeration; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,18 @@ public class NettyUtils { private static final int DEFAULT_INET_ADDRESS_COUNT = 1; + /** + * Returns a ThreadFactory which generates daemon threads, and uses + * the passed class's name to generate the thread names. + * + * @param clazz Class to use for generating thread names + * @return Netty DefaultThreadFactory configured to create daemon threads + */ + private static ThreadFactory createThreadFactory(Class clazz) { + String poolName = "zkNetty" + clazz.getSimpleName(); + return new DefaultThreadFactory(poolName, true); + } + /** * If {@link Epoll#isAvailable()} == true, returns a new * {@link EpollEventLoopGroup}, otherwise returns a new @@ -56,9 +70,9 @@ public class NettyUtils { */ public static EventLoopGroup newNioOrEpollEventLoopGroup() { if (Epoll.isAvailable()) { - return new EpollEventLoopGroup(); + return new EpollEventLoopGroup(createThreadFactory(EpollEventLoopGroup.class)); } else { - return new NioEventLoopGroup(); + return new NioEventLoopGroup(createThreadFactory(NioEventLoopGroup.class)); } } @@ -72,9 +86,9 @@ public static EventLoopGroup newNioOrEpollEventLoopGroup() { */ public static EventLoopGroup newNioOrEpollEventLoopGroup(int nThreads) { if (Epoll.isAvailable()) { - return new EpollEventLoopGroup(nThreads); + return new EpollEventLoopGroup(nThreads, createThreadFactory(EpollEventLoopGroup.class)); } else { - return new NioEventLoopGroup(nThreads); + return new NioEventLoopGroup(nThreads, createThreadFactory(NioEventLoopGroup.class)); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java index e27361c3790..e495d34e9e1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java @@ -41,16 +41,19 @@ import java.net.InetSocketAddress; import java.net.ProtocolException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.ClientCnxnSocketNetty; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.common.ClientX509Util; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.quorum.BufferStats; @@ -58,6 +61,7 @@ import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.SSLAuthTest; import org.apache.zookeeper.test.TestByteBufAllocator; +import org.apache.zookeeper.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -323,6 +327,39 @@ public void testEnableDisableThrottling_nonSecure_sequentially() throws Exceptio runEnableDisableThrottling(false, false); } + @Test + public void testNettyUsesDaemonThreads() throws Exception { + assertTrue(serverFactory instanceof NettyServerCnxnFactory, + "Didn't instantiate ServerCnxnFactory with NettyServerCnxnFactory!"); + + // Use Netty in the client to check the threads on both the client and server side + System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, ClientCnxnSocketNetty.class.getName()); + try { + final ZooKeeper zk = createClient(); + final ZooKeeperServer zkServer = serverFactory.getZooKeeperServer(); + final String path = "/a"; + try { + // make sure connection is established + zk.create(path, "test".getBytes(StandardCharsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + List threads = TestUtils.getAllThreads(); + boolean foundThread = false; + for (Thread t : threads) { + if (t.getName().startsWith("zkNetty")) { + foundThread = true; + assertTrue(t.isDaemon(), "All Netty threads started by ZK must deamon threads"); + } + } + assertTrue(foundThread, "Did not find any Netty ZK Threads"); + } finally { + zk.close(); + zkServer.shutdown(); + } + } finally { + System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET); + } + } + private void runEnableDisableThrottling(boolean secure, boolean randomDisableEnable) throws Exception { ClientX509Util x509Util = null; if (secure) { @@ -433,4 +470,5 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta } } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java index e3306c1fe76..8941c05de0e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java @@ -21,6 +21,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.zookeeper.WatchedEvent; /** @@ -71,4 +75,27 @@ public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent assertEquals(expected.getPath(), actual.getPath()); assertEquals(expected.getZxid(), actual.getZxid()); } + + /** + * Return all threads + * + * Code based on commons-lang3 ThreadUtils + * + * @return all active threads + */ + public static List getAllThreads() { + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup != null && threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + + int count = threadGroup.activeCount(); + Thread[] threads; + do { + threads = new Thread[count + count / 2 + 1]; //slightly grow the array size + count = threadGroup.enumerate(threads, true); + //return value of enumerate() must be strictly less than the array size according to javadoc + } while (count >= threads.length); + return Collections.unmodifiableList(Stream.of(threads).limit(count).collect(Collectors.toList())); + } }