diff --git a/artemis-core-client-osgi/pom.xml b/artemis-core-client-osgi/pom.xml index 518816bf060..1709121c962 100644 --- a/artemis-core-client-osgi/pom.xml +++ b/artemis-core-client-osgi/pom.xml @@ -70,7 +70,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", * <_exportcontents>org.apache.activemq.artemis.*;-noimport:=true diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml index 209242c26fb..322d6abd174 100644 --- a/artemis-core-client/pom.xml +++ b/artemis-core-client/pom.xml @@ -89,6 +89,15 @@ io.netty netty-transport-classes-kqueue + + io.netty + netty-transport-native-io_uring + ${netty-transport-native-io_uring-classifier} + + + io.netty + netty-transport-classes-io_uring + io.netty netty-codec-http @@ -109,10 +118,6 @@ io.netty netty-handler-proxy - - io.netty - netty-codec - io.netty netty-codec-socks diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index e7c1c7108df..a47f408ec3d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -349,4 +349,10 @@ public interface ActiveMQClientLogger { @LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO) void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type); + + @LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailability(Throwable e); + + @LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN) + void unableToCheckIoUringAvailabilitynoClass(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java index 4a90401dcca..e56c2ab764d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java @@ -19,6 +19,7 @@ import io.netty.channel.epoll.Epoll; import io.netty.channel.kqueue.KQueue; +import io.netty.channel.uring.IoUring; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.utils.Env; @@ -51,4 +52,17 @@ public static final boolean isKQueueAvailable() { return false; } } + + public static final boolean isIoUringAvailable() { + try { + return Env.isLinuxOs() && IoUring.isAvailable(); + } catch (NoClassDefFoundError noClassDefFoundError) { + ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass(); + return false; + } catch (Throwable e) { + ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e); + return false; + } + } + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index cb7ef817a42..3d1a3bfc0c4 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -16,13 +16,12 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; -import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; - import javax.net.ssl.SNIHostName; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.net.ConnectException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -63,16 +62,19 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueIoHandler; import io.netty.channel.kqueue.KQueueSocketChannel; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringSocketChannel; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; @@ -92,11 +94,11 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.Cookie; -import io.netty.handler.ssl.SslContext; import io.netty.handler.codec.socksx.SocksVersion; import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.resolver.NoopAddressResolverGroup; import io.netty.util.AttributeKey; @@ -128,8 +130,8 @@ import org.apache.activemq.artemis.utils.PasswordMaskingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; +import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; import static org.apache.activemq.artemis.utils.Base64.encodeBytes; public class NettyConnector extends AbstractConnector { @@ -137,6 +139,7 @@ public class NettyConnector extends AbstractConnector { public static String NIO_CONNECTOR_TYPE = "NIO"; public static String EPOLL_CONNECTOR_TYPE = "EPOLL"; public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE"; + public static String IOURING_CONNECTOR_TYPE = "IO_URING"; private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector { private boolean useKQueue; + private boolean useIoUring; + private int remotingThreads; private boolean useGlobalWorkerPool; @@ -404,6 +409,7 @@ public NettyConnector(final Map configuration, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); @@ -528,27 +534,43 @@ public synchronized void start() { return; } - if (remotingThreads == -1) { + boolean defaultRemotingThreads = remotingThreads == -1; + + if (defaultRemotingThreads) { // Default to number of cores * 3 remotingThreads = Runtime.getRuntime().availableProcessors() * 3; } String connectorType; - if (useEpoll && CheckDependencies.isEpollAvailable()) { + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + if (useGlobalWorkerPool) { + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory()))); + } else { + group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory()); + } + + connectorType = IOURING_CONNECTOR_TYPE; + channelClazz = IoUringSocketChannel.class; + + logger.debug("Connector {} using native io_uring", this); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { if (useGlobalWorkerPool) { - group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()))); } else { - group = new EpollEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory()); } connectorType = EPOLL_CONNECTOR_TYPE; channelClazz = EpollSocketChannel.class; logger.debug("Connector {} using native epoll", this); } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { if (useGlobalWorkerPool) { - group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()))); } else { - group = new KQueueEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory()); } connectorType = KQUEUE_CONNECTOR_TYPE; channelClazz = KQueueSocketChannel.class; @@ -556,10 +578,10 @@ public synchronized void start() { } else { if (useGlobalWorkerPool) { channelClazz = NioSocketChannel.class; - group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); + group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()))); } else { channelClazz = NioSocketChannel.class; - group = new NioEventLoopGroup(remotingThreads); + group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory()); } connectorType = NIO_CONNECTOR_TYPE; channelClazz = NioSocketChannel.class; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index e7464975dfe..e20cea8ab4f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -68,6 +68,8 @@ public class TransportConstants { public static final String USE_KQUEUE_PROP_NAME = "useKQueue"; + public static final String USE_IOURING_PROP_NAME = "useIoUring"; + /** * @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME */ @@ -214,6 +216,8 @@ public class TransportConstants { public static final boolean DEFAULT_USE_KQUEUE = true; + public static final boolean DEFAULT_USE_IOURING = false; + public static final boolean DEFAULT_USE_INVM = false; public static final boolean DEFAULT_USE_SERVLET = false; @@ -422,6 +426,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME); //noinspection deprecation allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME); @@ -497,6 +502,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) { allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME); allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME); allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME); diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml index d26ee3cc16c..8140f6ea9dc 100644 --- a/artemis-features/src/main/resources/features.xml +++ b/artemis-features/src/main/resources/features.xml @@ -33,7 +33,8 @@ mvn:io.netty/netty-resolver/${netty.version} mvn:io.netty/netty-transport/${netty.version} mvn:io.netty/netty-buffer/${netty.version} - mvn:io.netty/netty-codec/${netty.version} + mvn:io.netty/netty-codec-base/${netty.version} + mvn:io.netty/netty-codec-compression/${netty.version} mvn:io.netty/netty-codec-socks/${netty.version} mvn:io.netty/netty-codec-http/${netty.version} mvn:io.netty/netty-handler/${netty.version} @@ -43,6 +44,8 @@ mvn:io.netty/netty-transport-native-epoll/${netty.version} mvn:io.netty/netty-transport-classes-kqueue/${netty.version} mvn:io.netty/netty-transport-native-kqueue/${netty.version} + mvn:io.netty/netty-transport-classes-io_uring/${netty.version} + mvn:io.netty/netty-transport-native-io_uring/${netty.version} mvn:io.netty/netty-transport-native-unix-common/${netty.version} diff --git a/artemis-jms-client-osgi/pom.xml b/artemis-jms-client-osgi/pom.xml index d44ec81a819..1d42f0cbb11 100644 --- a/artemis-jms-client-osgi/pom.xml +++ b/artemis-jms-client-osgi/pom.xml @@ -78,7 +78,7 @@ org.glassfish.json*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", * <_exportcontents>org.apache.activemq.artemis.*;-noimport:=true diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml index a096d76f109..72d7e8883a9 100644 --- a/artemis-pom/pom.xml +++ b/artemis-pom/pom.xml @@ -389,12 +389,6 @@ ${netty.version} - - io.netty - netty-codec - ${netty.version} - - io.netty netty-codec-http @@ -457,6 +451,19 @@ ${netty-transport-native-kqueue-classifier} + + io.netty + netty-transport-classes-io_uring + ${netty.version} + + + + io.netty + netty-transport-native-io_uring + ${netty.version} + ${netty-transport-native-io_uring-classifier} + + org.apache.qpid proton-j diff --git a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml index 7309a6f423b..a7ad845c01d 100644 --- a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml +++ b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml @@ -90,10 +90,6 @@ io.netty netty-transport - - io.netty - netty-codec - org.osgi osgi.cmpn diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml index fb5d89c516e..4c8685ec673 100644 --- a/artemis-protocols/artemis-mqtt-protocol/pom.xml +++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml @@ -69,10 +69,6 @@ io.netty netty-transport - - io.netty - netty-codec - io.netty netty-common diff --git a/artemis-protocols/artemis-openwire-protocol/pom.xml b/artemis-protocols/artemis-openwire-protocol/pom.xml index 43f14bd6fbf..2f18b2ef55d 100644 --- a/artemis-protocols/artemis-openwire-protocol/pom.xml +++ b/artemis-protocols/artemis-openwire-protocol/pom.xml @@ -96,10 +96,6 @@ io.netty netty-transport - - io.netty - netty-codec - org.osgi osgi.cmpn diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml index 456a359e0b7..7fbf291004e 100644 --- a/artemis-server-osgi/pom.xml +++ b/artemis-server-osgi/pom.xml @@ -128,7 +128,7 @@ org.glassfish.json*;resolution:=optional, org.postgresql*;resolution:=optional, de.dentrassi.crypto.pem;resolution:=optional, - io.netty.buffer;io.netty.*;version="[4.1,5)", + io.netty.*;version="[4.2,5)", java.net.http*;resolution:=optional, com.sun.net.httpserver*;resolution:=optional, * diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml index 9a0fe8982b8..fe4f7de1c3d 100644 --- a/artemis-server/pom.xml +++ b/artemis-server/pom.xml @@ -132,10 +132,6 @@ io.netty netty-transport-classes-kqueue - - io.netty - netty-codec - commons-beanutils commons-beanutils diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 6a089813ec8..f8cd3d1f3e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -49,19 +49,22 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.WriteBufferWaterMark; -import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueIoHandler; import io.netty.channel.kqueue.KQueueServerSocketChannel; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalServerChannel; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.uring.IoUringIoHandler; +import io.netty.channel.uring.IoUringServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.util.ResourceLeakDetector; @@ -112,6 +115,7 @@ public class NettyAcceptor extends AbstractAcceptor { public static final String NIO_ACCEPTOR_TYPE = "NIO"; public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL"; public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE"; + public static final String IOURING_ACCEPTOR_TYPE = "IO_URING"; static { // Disable default Netty leak detection if the Netty leak detection level system properties are not in use @@ -148,6 +152,8 @@ public class NettyAcceptor extends AbstractAcceptor { private final boolean useKQueue; + private final boolean useIoUring; + private final ProtocolHandler protocolHandler; private final String host; @@ -280,6 +286,7 @@ public NettyAcceptor(final String name, useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); + useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); @@ -429,26 +436,37 @@ public synchronized void start() throws Exception { eventLoopGroup = new DefaultEventLoopGroup(); } else { - if (remotingThreads == -1) { + boolean defaultRemotingThreads = remotingThreads == -1; + + if (defaultRemotingThreads) { // Default to number of cores * 3 remotingThreads = Runtime.getRuntime().availableProcessors() * 3; } - if (useEpoll && CheckDependencies.isEpollAvailable()) { + if (useIoUring && CheckDependencies.isIoUringAvailable()) { + //IO_URING should default to 1 remotingThread unless specified in config + remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; + + channelClazz = IoUringServerSocketChannel.class; + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), IoUringIoHandler.newFactory()); + acceptorType = IOURING_ACCEPTOR_TYPE; + + logger.debug("Acceptor using native io_uring"); + } else if (useEpoll && CheckDependencies.isEpollAvailable()) { channelClazz = EpollServerSocketChannel.class; - eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), EpollIoHandler.newFactory()); acceptorType = EPOLL_ACCEPTOR_TYPE; logger.debug("Acceptor using native epoll"); } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { channelClazz = KQueueServerSocketChannel.class; - eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), KQueueIoHandler.newFactory()); acceptorType = KQUEUE_ACCEPTOR_TYPE; logger.debug("Acceptor using native kqueue"); } else { channelClazz = NioServerSocketChannel.class; - eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader()))); + eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), NioIoHandler.newFactory()); acceptorType = NIO_ACCEPTOR_TYPE; logger.debug("Acceptor using nio"); } diff --git a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java index 4b4897eaf54..2e7d935784b 100644 --- a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java +++ b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java @@ -55,8 +55,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; @@ -1005,7 +1006,7 @@ private void createRandomJettyFiles(File dir, int num, List collector) thr } private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException { - EventLoopGroup group = new NioEventLoopGroup(); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override @@ -1018,7 +1019,7 @@ protected void initChannel(Channel ch) throws Exception { } private Channel getSslChannel(int port, SslHandler sslHandler, ClientHandler clientHandler) throws InterruptedException { - EventLoopGroup group = new NioEventLoopGroup(); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override diff --git a/docs/user-manual/configuring-transports.adoc b/docs/user-manual/configuring-transports.adoc index a9d8bea7c7e..265d4e080aa 100644 --- a/docs/user-manual/configuring-transports.adoc +++ b/docs/user-manual/configuring-transports.adoc @@ -244,14 +244,14 @@ These Native transports add features specific to a particular platform, generate Both Clients and Server can benefit from this. -Current Supported Platforms. +Currently supported platforms: * Linux running 64bit JVM * MacOS running 64bit JVM -Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected. +Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected. -If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO. +If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO. ==== Linux Native Transport @@ -264,6 +264,22 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is Setting this to `false` will force the use of Java NIO instead of epoll. Default is `true` +Additionally, Apache ActiveMQ Artemis offers support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring. + +The following properties are specific to this native transport: + +useIoUring:: +enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected. +Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO. +Default is `false` + +[WARNING] +==== +[#io_uring-warning] +IO_URING support is a recent addition to the broker and should be considered `experimental` at this stage. +Using it _could_ introduce unwanted side effects. As such, thourough testing and verification is advised before use in any production or otherwise critical environment. +==== + ==== MacOS Native Transport On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue. diff --git a/pom.xml b/pom.xml index 2183ac21235..623f2990c03 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ 10.25.0 5.18.0 4.0.5 - 4.1.121.Final + 4.2.2.Final 2.2.2 5.8.0 3.9.3 @@ -269,6 +269,7 @@ linux-x86_64 osx-x86_64 + linux-x86_64 false diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java index 65c7b9ae2d2..002506c4adb 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java @@ -36,7 +36,8 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; @@ -144,8 +145,8 @@ public void run() { logger.info("Proxying {} to {}", localPort, remotePort); // Configure the bootstrap. - EventLoopGroup bossGroup = new NioEventLoopGroup(1); - EventLoopGroup workerGroup = new NioEventLoopGroup(); + EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); + EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java index 234a46fa76d..9280b8b8434 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java @@ -17,20 +17,13 @@ package org.apache.activemq.transport.netty; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.net.URI; import java.security.Principal; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import static java.util.function.Function.identity; - -import io.netty.channel.ChannelPromise; -import io.netty.util.ReferenceCounted; -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; @@ -42,15 +35,23 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslHandler; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.function.Function.identity; /** * TCP based transport that uses Netty as the underlying IO layer. @@ -125,7 +126,7 @@ public void connect() throws IOException { sslHandler = null; } - group = new NioEventLoopGroup(1); + group = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory()); bootstrap = new Bootstrap(); bootstrap.group(group); diff --git a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java index 9b470c8ae6d..444c6fb0d78 100644 --- a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java +++ b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.isolated.web; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -37,8 +32,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; @@ -62,6 +58,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * This test leaks a thread named org.eclipse.jetty.util.RolloverFileOutputStream which is why it is isolated now. In * the future Jetty might fix this. @@ -211,7 +212,7 @@ public void testLargeResponseHeaderConfiguration() throws Exception { } private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException { - EventLoopGroup group = new NioEventLoopGroup(); + EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java index 925ede3a77c..638e5529b06 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java @@ -26,8 +26,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -51,8 +52,8 @@ public void testHttpAuthority() throws Exception { int port = 61616; CountDownLatch requestTested = new CountDownLatch(1); AtomicBoolean failed = new AtomicBoolean(false); - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workerGroup = new NioEventLoopGroup(); + EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java index ba17a06c8b6..228e247b3c0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java @@ -27,8 +27,10 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -107,8 +109,8 @@ public NettyConnectorWithHTTPUpgradeTest(Boolean useSSL) { private ServerLocator locator; private String acceptorName; - private NioEventLoopGroup bossGroup; - private NioEventLoopGroup workerGroup; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; private String SERVER_SIDE_KEYSTORE = "server-keystore.jks"; private String CLIENT_SIDE_TRUSTSTORE = "server-ca-truststore.jks"; @@ -214,8 +216,8 @@ public void HTTPUpgradeConnectorUsingNormalAcceptor() throws Exception { } private void startWebServer(int port) throws Exception { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); + bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); ServerBootstrap b = new ServerBootstrap(); final SSLContext context; if (useSSL) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java index f084f383510..1e6366fcb4b 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java @@ -16,13 +16,6 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; - import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.NetworkInterface; @@ -38,7 +31,9 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.proxy.Socks5ProxyHandler; @@ -58,6 +53,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + public class SocksProxyTest extends ActiveMQTestBase { private static final int SOCKS_PORT = 1080; @@ -66,8 +68,8 @@ public class SocksProxyTest extends ActiveMQTestBase { private ExecutorService threadPool; private ScheduledExecutorService scheduledThreadPool; - private NioEventLoopGroup bossGroup; - private NioEventLoopGroup workerGroup; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; @Override @BeforeEach @@ -264,8 +266,8 @@ public void connectionReadyForWrites(Object connectionID, boolean ready) { } private void startSocksProxy() throws Exception { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); + bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup);