Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion artemis-core-client-osgi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
io.netty.*;version="[4.2,5)",
*
</Import-Package>
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true</_exportcontents>
Expand Down
13 changes: 9 additions & 4 deletions artemis-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-kqueue</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-io_uring</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
Expand All @@ -109,10 +118,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-socks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,10 @@ public interface ActiveMQClientLogger {

@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);

@LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailability(Throwable e);

@LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailabilitynoClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.uring.IoUring;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;

Expand Down Expand Up @@ -51,4 +52,17 @@ public static final boolean isKQueueAvailable() {
return false;
}
}

public static final boolean isIoUringAvailable() {
try {
return Env.isLinuxOs() && IoUring.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.core.remoting.impl.netty;

import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;

import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -63,16 +62,19 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueIoHandler;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.uring.IoUringIoHandler;
import io.netty.channel.uring.IoUringSocketChannel;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpRequest;
Expand All @@ -92,11 +94,11 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.codec.socksx.SocksVersion;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
Expand Down Expand Up @@ -128,15 +130,16 @@
import org.apache.activemq.artemis.utils.PasswordMaskingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX;
import static org.apache.activemq.artemis.utils.Base64.encodeBytes;

public class NettyConnector extends AbstractConnector {

public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
public static String IOURING_CONNECTOR_TYPE = "IO_URING";

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

Expand Down Expand Up @@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector {

private boolean useKQueue;

private boolean useIoUring;

private int remotingThreads;

private boolean useGlobalWorkerPool;
Expand Down Expand Up @@ -404,6 +409,7 @@ public NettyConnector(final Map<String, Object> configuration,

useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);

useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
Expand Down Expand Up @@ -528,38 +534,54 @@ public synchronized void start() {
return;
}

if (remotingThreads == -1) {
boolean defaultRemotingThreads = remotingThreads == -1;

if (defaultRemotingThreads) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}

String connectorType;

if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;

if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory())));
} else {
group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory());
}

connectorType = IOURING_CONNECTOR_TYPE;
channelClazz = IoUringSocketChannel.class;

logger.debug("Connector {} using native io_uring", this);
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory())));
} else {
group = new EpollEventLoopGroup(remotingThreads);
group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory());
}
connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;
logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory)));
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory())));
} else {
group = new KQueueEventLoopGroup(remotingThreads);
group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory());
}
connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;
logger.debug("Connector {} using native kqueue", this);
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory())));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(remotingThreads);
group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory());
}
connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class TransportConstants {

public static final String USE_KQUEUE_PROP_NAME = "useKQueue";

public static final String USE_IOURING_PROP_NAME = "useIoUring";

/**
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
*/
Expand Down Expand Up @@ -214,6 +216,8 @@ public class TransportConstants {

public static final boolean DEFAULT_USE_KQUEUE = true;

public static final boolean DEFAULT_USE_IOURING = false;

public static final boolean DEFAULT_USE_INVM = false;

public static final boolean DEFAULT_USE_SERVLET = false;
Expand Down Expand Up @@ -422,6 +426,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
Expand Down Expand Up @@ -497,6 +502,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);
Expand Down
5 changes: 4 additions & 1 deletion artemis-features/src/main/resources/features.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
<bundle>mvn:io.netty/netty-resolver/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-buffer/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec-base/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec-compression/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec-socks/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-codec-http/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-handler/${netty.version}</bundle>
Expand All @@ -43,6 +44,8 @@
<bundle>mvn:io.netty/netty-transport-native-epoll/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-classes-kqueue/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-kqueue/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-classes-io_uring/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-io_uring/${netty.version}</bundle>
<bundle>mvn:io.netty/netty-transport-native-unix-common/${netty.version}</bundle>
</feature>

Expand Down
2 changes: 1 addition & 1 deletion artemis-jms-client-osgi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
io.netty.*;version="[4.2,5)",
*
</Import-Package>
<_exportcontents>org.apache.activemq.artemis.*;-noimport:=true</_exportcontents>
Expand Down
19 changes: 13 additions & 6 deletions artemis-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,6 @@
<version>${netty.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
Expand Down Expand Up @@ -457,6 +451,19 @@
<classifier>${netty-transport-native-kqueue-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-io_uring</artifactId>
<version>${netty.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-io_uring</artifactId>
<version>${netty.version}</version>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions artemis-protocols/artemis-jakarta-openwire-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions artemis-protocols/artemis-mqtt-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions artemis-protocols/artemis-openwire-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion artemis-server-osgi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
org.glassfish.json*;resolution:=optional,
org.postgresql*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
io.netty.*;version="[4.2,5)",
java.net.http*;resolution:=optional,
com.sun.net.httpserver*;resolution:=optional,
*
Expand Down
4 changes: 0 additions & 4 deletions artemis-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-kqueue</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
Expand Down
Loading
Loading