diff --git a/artemis-core-client-osgi/pom.xml b/artemis-core-client-osgi/pom.xml
index 518816bf060..1709121c962 100644
--- a/artemis-core-client-osgi/pom.xml
+++ b/artemis-core-client-osgi/pom.xml
@@ -70,7 +70,7 @@
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
*
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true
diff --git a/artemis-core-client/pom.xml b/artemis-core-client/pom.xml
index 209242c26fb..322d6abd174 100644
--- a/artemis-core-client/pom.xml
+++ b/artemis-core-client/pom.xml
@@ -89,6 +89,15 @@
io.netty
netty-transport-classes-kqueue
+
+ io.netty
+ netty-transport-native-io_uring
+ ${netty-transport-native-io_uring-classifier}
+
+
+ io.netty
+ netty-transport-classes-io_uring
+
io.netty
netty-codec-http
@@ -109,10 +118,6 @@
io.netty
netty-handler-proxy
-
- io.netty
- netty-codec
-
io.netty
netty-codec-socks
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
index e7c1c7108df..a47f408ec3d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java
@@ -349,4 +349,10 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);
+
+ @LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
+ void unableToCheckIoUringAvailability(Throwable e);
+
+ @LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
+ void unableToCheckIoUringAvailabilitynoClass();
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java
index 4a90401dcca..e56c2ab764d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java
@@ -19,6 +19,7 @@
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.uring.IoUring;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;
@@ -51,4 +52,17 @@ public static final boolean isKQueueAvailable() {
return false;
}
}
+
+ public static final boolean isIoUringAvailable() {
+ try {
+ return Env.isLinuxOs() && IoUring.isAvailable();
+ } catch (NoClassDefFoundError noClassDefFoundError) {
+ ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
+ return false;
+ } catch (Throwable e) {
+ ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
+ return false;
+ }
+ }
+
}
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index cb7ef817a42..3d1a3bfc0c4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;
-import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;
-
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -63,16 +62,19 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.uring.IoUringIoHandler;
+import io.netty.channel.uring.IoUringSocketChannel;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
@@ -92,11 +94,11 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
-import io.netty.handler.ssl.SslContext;
import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
+import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
@@ -128,8 +130,8 @@
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
+import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;
import static org.apache.activemq.artemis.utils.Base64.encodeBytes;
public class NettyConnector extends AbstractConnector {
@@ -137,6 +139,7 @@ public class NettyConnector extends AbstractConnector {
public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
+ public static String IOURING_CONNECTOR_TYPE = "IO_URING";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector {
private boolean useKQueue;
+ private boolean useIoUring;
+
private int remotingThreads;
private boolean useGlobalWorkerPool;
@@ -404,6 +409,7 @@ public NettyConnector(final Map configuration,
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
+ useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@@ -528,27 +534,43 @@ public synchronized void start() {
return;
}
- if (remotingThreads == -1) {
+ boolean defaultRemotingThreads = remotingThreads == -1;
+
+ if (defaultRemotingThreads) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}
String connectorType;
- if (useEpoll && CheckDependencies.isEpollAvailable()) {
+ if (useIoUring && CheckDependencies.isIoUringAvailable()) {
+ //IO_URING should default to 1 remotingThread unless specified in config
+ remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
+
+ if (useGlobalWorkerPool) {
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory())));
+ } else {
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory());
+ }
+
+ connectorType = IOURING_CONNECTOR_TYPE;
+ channelClazz = IoUringSocketChannel.class;
+
+ logger.debug("Connector {} using native io_uring", this);
+ } else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
- group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory())));
} else {
- group = new EpollEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory());
}
connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;
logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
- group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory())));
} else {
- group = new KQueueEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory());
}
connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;
@@ -556,10 +578,10 @@ public synchronized void start() {
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
- group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
+ group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory())));
} else {
channelClazz = NioSocketChannel.class;
- group = new NioEventLoopGroup(remotingThreads);
+ group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory());
}
connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index e7464975dfe..e20cea8ab4f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -68,6 +68,8 @@ public class TransportConstants {
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
+ public static final String USE_IOURING_PROP_NAME = "useIoUring";
+
/**
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
*/
@@ -214,6 +216,8 @@ public class TransportConstants {
public static final boolean DEFAULT_USE_KQUEUE = true;
+ public static final boolean DEFAULT_USE_IOURING = false;
+
public static final boolean DEFAULT_USE_INVM = false;
public static final boolean DEFAULT_USE_SERVLET = false;
@@ -422,6 +426,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@@ -497,6 +502,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
+ allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
diff --git a/artemis-features/src/main/resources/features.xml b/artemis-features/src/main/resources/features.xml
index d26ee3cc16c..8140f6ea9dc 100644
--- a/artemis-features/src/main/resources/features.xml
+++ b/artemis-features/src/main/resources/features.xml
@@ -33,7 +33,8 @@
mvn:io.netty/netty-resolver/${netty.version}
mvn:io.netty/netty-transport/${netty.version}
mvn:io.netty/netty-buffer/${netty.version}
- mvn:io.netty/netty-codec/${netty.version}
+ mvn:io.netty/netty-codec-base/${netty.version}
+ mvn:io.netty/netty-codec-compression/${netty.version}
mvn:io.netty/netty-codec-socks/${netty.version}
mvn:io.netty/netty-codec-http/${netty.version}
mvn:io.netty/netty-handler/${netty.version}
@@ -43,6 +44,8 @@
mvn:io.netty/netty-transport-native-epoll/${netty.version}
mvn:io.netty/netty-transport-classes-kqueue/${netty.version}
mvn:io.netty/netty-transport-native-kqueue/${netty.version}
+ mvn:io.netty/netty-transport-classes-io_uring/${netty.version}
+ mvn:io.netty/netty-transport-native-io_uring/${netty.version}
mvn:io.netty/netty-transport-native-unix-common/${netty.version}
diff --git a/artemis-jms-client-osgi/pom.xml b/artemis-jms-client-osgi/pom.xml
index d44ec81a819..1d42f0cbb11 100644
--- a/artemis-jms-client-osgi/pom.xml
+++ b/artemis-jms-client-osgi/pom.xml
@@ -78,7 +78,7 @@
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
*
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true
diff --git a/artemis-pom/pom.xml b/artemis-pom/pom.xml
index a096d76f109..72d7e8883a9 100644
--- a/artemis-pom/pom.xml
+++ b/artemis-pom/pom.xml
@@ -389,12 +389,6 @@
${netty.version}
-
- io.netty
- netty-codec
- ${netty.version}
-
-
io.netty
netty-codec-http
@@ -457,6 +451,19 @@
${netty-transport-native-kqueue-classifier}
+
+ io.netty
+ netty-transport-classes-io_uring
+ ${netty.version}
+
+
+
+ io.netty
+ netty-transport-native-io_uring
+ ${netty.version}
+ ${netty-transport-native-io_uring-classifier}
+
+
org.apache.qpid
proton-j
diff --git a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
index 7309a6f423b..a7ad845c01d 100644
--- a/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
+++ b/artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
@@ -90,10 +90,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
org.osgi
osgi.cmpn
diff --git a/artemis-protocols/artemis-mqtt-protocol/pom.xml b/artemis-protocols/artemis-mqtt-protocol/pom.xml
index fb5d89c516e..4c8685ec673 100644
--- a/artemis-protocols/artemis-mqtt-protocol/pom.xml
+++ b/artemis-protocols/artemis-mqtt-protocol/pom.xml
@@ -69,10 +69,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
io.netty
netty-common
diff --git a/artemis-protocols/artemis-openwire-protocol/pom.xml b/artemis-protocols/artemis-openwire-protocol/pom.xml
index 43f14bd6fbf..2f18b2ef55d 100644
--- a/artemis-protocols/artemis-openwire-protocol/pom.xml
+++ b/artemis-protocols/artemis-openwire-protocol/pom.xml
@@ -96,10 +96,6 @@
io.netty
netty-transport
-
- io.netty
- netty-codec
-
org.osgi
osgi.cmpn
diff --git a/artemis-server-osgi/pom.xml b/artemis-server-osgi/pom.xml
index 456a359e0b7..7fbf291004e 100644
--- a/artemis-server-osgi/pom.xml
+++ b/artemis-server-osgi/pom.xml
@@ -128,7 +128,7 @@
org.glassfish.json*;resolution:=optional,
org.postgresql*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
- io.netty.buffer;io.netty.*;version="[4.1,5)",
+ io.netty.*;version="[4.2,5)",
java.net.http*;resolution:=optional,
com.sun.net.httpserver*;resolution:=optional,
*
diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 9a0fe8982b8..fe4f7de1c3d 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -132,10 +132,6 @@
io.netty
netty-transport-classes-kqueue
-
- io.netty
- netty-codec
-
commons-beanutils
commons-beanutils
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 6a089813ec8..f8cd3d1f3e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -49,19 +49,22 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.uring.IoUringIoHandler;
+import io.netty.channel.uring.IoUringServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.ResourceLeakDetector;
@@ -112,6 +115,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
+ public static final String IOURING_ACCEPTOR_TYPE = "IO_URING";
static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@@ -148,6 +152,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean useKQueue;
+ private final boolean useIoUring;
+
private final ProtocolHandler protocolHandler;
private final String host;
@@ -280,6 +286,7 @@ public NettyAcceptor(final String name,
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
+ useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@@ -429,26 +436,37 @@ public synchronized void start() throws Exception {
eventLoopGroup = new DefaultEventLoopGroup();
} else {
- if (remotingThreads == -1) {
+ boolean defaultRemotingThreads = remotingThreads == -1;
+
+ if (defaultRemotingThreads) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}
- if (useEpoll && CheckDependencies.isEpollAvailable()) {
+ if (useIoUring && CheckDependencies.isIoUringAvailable()) {
+ //IO_URING should default to 1 remotingThread unless specified in config
+ remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
+
+ channelClazz = IoUringServerSocketChannel.class;
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), IoUringIoHandler.newFactory());
+ acceptorType = IOURING_ACCEPTOR_TYPE;
+
+ logger.debug("Acceptor using native io_uring");
+ } else if (useEpoll && CheckDependencies.isEpollAvailable()) {
channelClazz = EpollServerSocketChannel.class;
- eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), EpollIoHandler.newFactory());
acceptorType = EPOLL_ACCEPTOR_TYPE;
logger.debug("Acceptor using native epoll");
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
channelClazz = KQueueServerSocketChannel.class;
- eventLoopGroup = new KQueueEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), KQueueIoHandler.newFactory());
acceptorType = KQUEUE_ACCEPTOR_TYPE;
logger.debug("Acceptor using native kqueue");
} else {
channelClazz = NioServerSocketChannel.class;
- eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())), NioIoHandler.newFactory());
acceptorType = NIO_ACCEPTOR_TYPE;
logger.debug("Acceptor using nio");
}
diff --git a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
index 4b4897eaf54..2e7d935784b 100644
--- a/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
+++ b/artemis-web/src/test/java/org/apache/activemq/cli/test/WebServerComponentTest.java
@@ -55,8 +55,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
@@ -1005,7 +1006,7 @@ private void createRandomJettyFiles(File dir, int num, List collector) thr
}
private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
+ EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
@@ -1018,7 +1019,7 @@ protected void initChannel(Channel ch) throws Exception {
}
private Channel getSslChannel(int port, SslHandler sslHandler, ClientHandler clientHandler) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
+ EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
diff --git a/docs/user-manual/configuring-transports.adoc b/docs/user-manual/configuring-transports.adoc
index a9d8bea7c7e..265d4e080aa 100644
--- a/docs/user-manual/configuring-transports.adoc
+++ b/docs/user-manual/configuring-transports.adoc
@@ -244,14 +244,14 @@ These Native transports add features specific to a particular platform, generate
Both Clients and Server can benefit from this.
-Current Supported Platforms.
+Currently supported platforms:
* Linux running 64bit JVM
* MacOS running 64bit JVM
-Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected.
+Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected.
-If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
+If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
==== Linux Native Transport
@@ -264,6 +264,22 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is
Setting this to `false` will force the use of Java NIO instead of epoll.
Default is `true`
+Additionally, Apache ActiveMQ Artemis offers support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring.
+
+The following properties are specific to this native transport:
+
+useIoUring::
+enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected.
+Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO.
+Default is `false`
+
+[WARNING]
+====
+[#io_uring-warning]
+IO_URING support is a recent addition to the broker and should be considered `experimental` at this stage.
+Using it _could_ introduce unwanted side effects. As such, thourough testing and verification is advised before use in any production or otherwise critical environment.
+====
+
==== MacOS Native Transport
On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue.
diff --git a/pom.xml b/pom.xml
index 2183ac21235..623f2990c03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
10.25.0
5.18.0
4.0.5
- 4.1.121.Final
+ 4.2.2.Final
2.2.2
5.8.0
3.9.3
@@ -269,6 +269,7 @@
linux-x86_64
osx-x86_64
+ linux-x86_64
false
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java
index 65c7b9ae2d2..002506c4adb 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/util/TcpProxy.java
@@ -36,7 +36,8 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
@@ -144,8 +145,8 @@ public void run() {
logger.info("Proxying {} to {}", localPort, remotePort);
// Configure the bootstrap.
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
+ EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
+ EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
index 234a46fa76d..9280b8b8434 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
@@ -17,20 +17,13 @@
package org.apache.activemq.transport.netty;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
-import static java.util.function.Function.identity;
-
-import io.netty.channel.ChannelPromise;
-import io.netty.util.ReferenceCounted;
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
@@ -42,15 +35,23 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.function.Function.identity;
/**
* TCP based transport that uses Netty as the underlying IO layer.
@@ -125,7 +126,7 @@ public void connect() throws IOException {
sslHandler = null;
}
- group = new NioEventLoopGroup(1);
+ group = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
bootstrap = new Bootstrap();
bootstrap.group(group);
diff --git a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java
index 9b470c8ae6d..444c6fb0d78 100644
--- a/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java
+++ b/tests/integration-tests-isolated/src/test/java/org/apache/activemq/artemis/tests/integration/isolated/web/WebServerComponentTest.java
@@ -16,11 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.isolated.web;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -37,8 +32,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
@@ -62,6 +58,11 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* This test leaks a thread named org.eclipse.jetty.util.RolloverFileOutputStream which is why it is isolated now. In
* the future Jetty might fix this.
@@ -211,7 +212,7 @@ public void testLargeResponseHeaderConfiguration() throws Exception {
}
private Channel getChannel(int port, ClientHandler clientHandler) throws InterruptedException {
- EventLoopGroup group = new NioEventLoopGroup();
+ EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java
index 925ede3a77c..638e5529b06 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/http/HttpAuthorityTest.java
@@ -26,8 +26,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -51,8 +52,8 @@ public void testHttpAuthority() throws Exception {
int port = 61616;
CountDownLatch requestTested = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
+ EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+ EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
index ba17a06c8b6..228e247b3c0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/NettyConnectorWithHTTPUpgradeTest.java
@@ -27,8 +27,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -107,8 +109,8 @@ public NettyConnectorWithHTTPUpgradeTest(Boolean useSSL) {
private ServerLocator locator;
private String acceptorName;
- private NioEventLoopGroup bossGroup;
- private NioEventLoopGroup workerGroup;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
private String SERVER_SIDE_KEYSTORE = "server-keystore.jks";
private String CLIENT_SIDE_TRUSTSTORE = "server-ca-truststore.jks";
@@ -214,8 +216,8 @@ public void HTTPUpgradeConnectorUsingNormalAcceptor() throws Exception {
}
private void startWebServer(int port) throws Exception {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
+ bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+ workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
ServerBootstrap b = new ServerBootstrap();
final SSLContext context;
if (useSSL) {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
index f084f383510..1e6366fcb4b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java
@@ -16,13 +16,6 @@
*/
package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assumptions.assumeTrue;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
@@ -38,7 +31,9 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.proxy.Socks5ProxyHandler;
@@ -58,6 +53,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
public class SocksProxyTest extends ActiveMQTestBase {
private static final int SOCKS_PORT = 1080;
@@ -66,8 +68,8 @@ public class SocksProxyTest extends ActiveMQTestBase {
private ExecutorService threadPool;
private ScheduledExecutorService scheduledThreadPool;
- private NioEventLoopGroup bossGroup;
- private NioEventLoopGroup workerGroup;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
@Override
@BeforeEach
@@ -264,8 +266,8 @@ public void connectionReadyForWrites(Object connectionID, boolean ready) {
}
private void startSocksProxy() throws Exception {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
+ bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+ workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);