Skip to content

Commit 33f6084

Browse files
committed
ARTEMIS-3163 Experimental support for Netty IO_URING incubator
1 parent 57e85bb commit 33f6084

File tree

4 files changed

+48
-0
lines changed

4 files changed

+48
-0
lines changed

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
395395
format = Message.Format.MESSAGE_FORMAT)
396396
void unableToCheckEpollAvailability(@Cause Throwable e);
397397

398+
@LogMessage(level = Logger.Level.WARN)
399+
@Message(id = 212080, value = "Unable to check IoUring availability ",
400+
format = Message.Format.MESSAGE_FORMAT)
401+
void unableToCheckIoUringAvailability(@Cause Throwable e);
402+
398403
@LogMessage(level = Logger.Level.WARN)
399404
@Message(id = 212072, value = "Failed to change channel state to ReadyForWriting ",
400405
format = Message.Format.MESSAGE_FORMAT)
@@ -420,6 +425,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
420425
format = Message.Format.MESSAGE_FORMAT)
421426
void unableToCheckEpollAvailabilitynoClass();
422427

428+
@LogMessage(level = Logger.Level.WARN)
429+
@Message(id = 212079, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning",
430+
format = Message.Format.MESSAGE_FORMAT)
431+
void unableToCheckIoUringAvailabilitynoClass();
432+
423433
@LogMessage(level = Logger.Level.WARN)
424434
@Message(id = 212077, value = "Timed out waiting to receive initial broadcast from cluster. Retry {0} of {1}",
425435
format = Message.Format.MESSAGE_FORMAT)

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/CheckDependencies.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,17 @@ public static final boolean isKQueueAvailable() {
5454
return false;
5555
}
5656
}
57+
58+
public static boolean isIoUringAvaialble() {
59+
try {
60+
return Env.isLinuxOs() && (boolean) (Class.forName("io.netty.incubator.channel.uring.IOUring")
61+
.getMethod("isAvailable").invoke(null));
62+
} catch (NoClassDefFoundError noClassDefFoundError) {
63+
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
64+
return false;
65+
} catch (Throwable e) {
66+
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
67+
return false;
68+
}
69+
}
5770
}

artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class TransportConstants {
6161

6262
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
6363

64+
public static final String USE_IOURING_PROP_NAME = "useIoUring";
65+
6466
@Deprecated
6567
/**
6668
* @deprecated Use USE_GLOBAL_WORKER_POOL_PROP_NAME
@@ -202,6 +204,8 @@ public class TransportConstants {
202204

203205
public static final boolean DEFAULT_USE_KQUEUE = true;
204206

207+
public static final boolean DEFAULT_USE_IOURING = false;
208+
205209
public static final boolean DEFAULT_USE_INVM = false;
206210

207211
public static final boolean DEFAULT_USE_SERVLET = false;
@@ -443,6 +447,7 @@ private static int parseDefaultVariable(String variableName, int defaultValue) {
443447
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
444448
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
445449
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
450+
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
446451
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
447452
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
448453
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);

artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.Executor;
4141
import java.util.concurrent.ScheduledExecutorService;
4242
import java.util.concurrent.ScheduledFuture;
43+
import java.util.concurrent.ThreadFactory;
4344
import java.util.concurrent.TimeUnit;
4445
import java.util.concurrent.atomic.AtomicBoolean;
4546

@@ -112,6 +113,7 @@ public class NettyAcceptor extends AbstractAcceptor {
112113
public static final String NIO_ACCEPTOR_TYPE = "NIO";
113114
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
114115
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
116+
public static final String IOURING_ACCEPTOR_TYPE = "IO_URING";
115117

116118
static {
117119
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@@ -148,6 +150,8 @@ public class NettyAcceptor extends AbstractAcceptor {
148150

149151
private final boolean useKQueue;
150152

153+
private final boolean useIoUring;
154+
151155
private final ProtocolHandler protocolHandler;
152156

153157
private final String host;
@@ -268,6 +272,7 @@ public NettyAcceptor(final String name,
268272

269273
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
270274
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
275+
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
271276

272277
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
273278
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@@ -401,6 +406,21 @@ public ActiveMQThreadFactory run() {
401406
acceptorType = KQUEUE_ACCEPTOR_TYPE;
402407

403408
logger.debug("Acceptor using native kqueue");
409+
} else if (useIoUring && CheckDependencies.isIoUringAvaialble()) {
410+
channelClazz = (Class<? extends ServerChannel>) Class.forName("io.netty.incubator.channel.uring.IOUringServerSocketChannel",
411+
true, ClientSessionFactoryImpl.class.getClassLoader());
412+
eventLoopGroup = (EventLoopGroup) Class.forName("io.netty.incubator.channel.uring.IOUringEventLoopGroup",
413+
true, ClientSessionFactoryImpl.class.getClassLoader())
414+
.getConstructor(int.class, ThreadFactory.class)
415+
.newInstance(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {
416+
@Override
417+
public ActiveMQThreadFactory run() {
418+
return new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader());
419+
}
420+
}));
421+
acceptorType = IOURING_ACCEPTOR_TYPE;
422+
423+
logger.debug("Acceptor using native io_uring");
404424
} else {
405425
channelClazz = NioServerSocketChannel.class;
406426
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() {

0 commit comments

Comments
 (0)