-
Notifications
You must be signed in to change notification settings - Fork 942
Netty 4.2 upgrade + IO_URING support #6021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,9 @@ if [ -f "$ARTEMIS_OOME_DUMP" ] ; then | |
| mv $ARTEMIS_OOME_DUMP $ARTEMIS_OOME_DUMP.bkp | ||
| fi | ||
|
|
||
| # Netty needs access to unsafe, but this is turned off in Java 25 by default | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment could be confusing/misleading later, since Unsafe itself is not turned off in Java 25 by default. It warns once on first use on 24+ but whats left of Unsafe (some has been removed already) is still usable. Setting Netty 4.2 doesn't use Unsafe by default in Java 24+ so as to avoid the warning (whereas 4.1 still does use Unsafe by default [excepting 4.1.120-4.1.121], hence the warning mentioned on ARTEMIS-5711 when it uses Unsafe currently). Instead it uses the 'fallback MemorySegment' based cleaner on 25+ (it's broken on 24, so that seems to default to heap-buffers rather than direct buffers) rather than use Unsafe. Its possible to have it use a more performant alternative on 24+ by explicitly enabling native access (which is actually still enabled by default on 24+, for now, but again produces a warning, so Netty 4.2 doesn't do this by default to avoid those warnings, requiring the https://github.com/netty/netty/blob/netty-4.2.7.Final/common/src/main/java/io/netty/util/internal/PlatformDependent0.java#L547-L588 plus the monster starting at https://github.com/netty/netty/blob/4.2/common/src/main/java/io/netty/util/internal/PlatformDependent0.java#L85 |
||
| $JAVACMD --sun-misc-unsafe-memory-access=allow --version > /dev/null 2>&1 && ALLOW_UNSAFE="--sun-misc-unsafe-memory-access=allow" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If netty is the only thing we need/decide to enable Unsafe for (dont know) on 25+, then I wonder if going with io.netty.noUnsafe sys prop might be preferable, as it at least currently seems like it wont require a second JVM startup and there would be no harm always specifying it on either older JDKs or future JDKs after its irrelevant. Though, that would leave the warning on 25.. I still wonder about just leaving it not using Unsafe on 25. Seems like that should be possible, and that is the future beyond 25. If thats still not viable, perhaps we stick with 4.1 using Unsafe and its warnings for a bit longer until it is, and just try to ensure compatibility if an end user wants to use 4.2 (the milestone/RC builds of spring using Artemis with 4.2 suggest it mostly already is...thats the only 4.2 usage I'm personally aware of). Anyone not satisifed with no-Unsafe can always add the config themselves. |
||
|
|
||
| exec "$JAVACMD" \ | ||
| $LOGGING_ARGS \ | ||
| $JAVA_ARGS \ | ||
|
|
@@ -132,6 +135,7 @@ exec "$JAVACMD" \ | |
| -Djava.io.tmpdir="$ARTEMIS_INSTANCE/tmp" \ | ||
| -Ddata.dir="$ARTEMIS_DATA_DIR" \ | ||
| -Dartemis.instance.etc="$ARTEMIS_INSTANCE_ETC" \ | ||
| $ALLOW_UNSAFE \ | ||
| $DEBUG_ARGS \ | ||
| $JAVA_ARGS_APPEND \ | ||
| org.apache.activemq.artemis.boot.Artemis "$@" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ if [ -z "$LOGGING_ARGS" ]; then | |
| fi | ||
|
|
||
| if [ -z "$JAVA_ARGS" ]; then | ||
| JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED ${java-utility-opts}" | ||
| JAVA_ARGS="-Dlog4j2.disableJmx=true --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --enable-native-access=ALL-UNNAMED ${java-utility-opts}" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At least its the case that --enable-native-access was already in JDK17 (as the initial FFM in 17 was an incubator addition there) so hopefully that means it works on all versions of JDK17+ as opposed to just post-GA backport releases, save us doing any checking. |
||
| fi | ||
|
|
||
| # Uncomment to enable remote debugging | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -204,6 +204,12 @@ public static Process spawnVM(String classPath, | |
| commandList.add(jacocoAgent); | ||
| } | ||
|
|
||
| String javaVersion = System.getProperty("java.version"); | ||
| if (javaVersion.startsWith("24") || javaVersion.startsWith("25")) { | ||
| commandList.add("--enable-native-access=ALL-UNNAMED"); | ||
|
Comment on lines
+207
to
+209
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps Restricting to just 24 and 25 for --enable-native-access probably doesnt make sense, its going to be useful or needed for all 26+ |
||
| commandList.add("--sun-misc-unsafe-memory-access=allow"); | ||
| } | ||
|
|
||
| commandList.add(className); | ||
| for (String arg : args) { | ||
| commandList.add(arg); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -223,6 +223,7 @@ public void shouldZeroesDirectByteBuffer() { | |
|
|
||
| @Test | ||
| public void shouldZeroesLimitedDirectByteBuffer() { | ||
| assumeTrue(PlatformDependent.hasUnsafe()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is possibly covering up a bug in either the test or more likely in the ByteUtil that was exposed by Unsafe not being available, and so this doesnt seem like the solution. I dont think this test should need Unsafe, the behaviour just seems questionable. The very next test calls the same utility method, without Unsafe. It only behaves differently as the buffer passed isnt limited. Real code could call the ByeUtil method later without Unsafe and get unexpected behaviour, it wont have the benefit of covering itself with an assume. EDIT: after typing that, I reminded myself of the time that this essentially already happened and I fixed it in the calling code: If Franz doesnt know why he did it, and it doesnt work without Unsafe anyway, I think it might be time we stopped it doing what its doing, or figure out why it did and how to adapt it to not having Unsafe.. |
||
| final byte one = (byte) 1; | ||
| final int capacity = 64; | ||
| final int bytes = 32; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -89,6 +89,15 @@ | |
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-classes-kqueue</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-native-io_uring</artifactId> | ||
| <classifier>${netty-transport-native-io_uring-classifier}</classifier> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-transport-classes-io_uring</artifactId> | ||
| </dependency> | ||
|
Comment on lines
+92
to
+100
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I've commented on all the other io_uring PRs, I'd personally prefer folks had to add the deps if they want them, especially given its off by default, and its entirely untested. Or at least for the client, if not both. The way its availability+use is done would need to change slightly to facilitate it to work without the classes dep though (elsewhere, I use a class per native type, and thats the only one to directly reference the types). |
||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec-http</artifactId> | ||
|
|
@@ -109,10 +118,6 @@ | |
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-handler-proxy</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec</artifactId> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>io.netty</groupId> | ||
| <artifactId>netty-codec-socks</artifactId> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -349,4 +349,10 @@ public interface ActiveMQClientLogger { | |
|
|
||
| @LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO) | ||
| void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type); | ||
|
|
||
| @LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN) | ||
| void unableToCheckIoUringAvailability(Throwable e); | ||
|
|
||
| @LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN) | ||
| void unableToCheckIoUringAvailabilitynoClass(); | ||
|
Comment on lines
+352
to
+357
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could these go in commons? Its annoying when methods like these log and suggest they are client issues, but are in fact being triggered by the broker while its acting as a server to clients. E.g the method right above these ones is especially confusing when logged by the broker. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import io.netty.channel.epoll.Epoll; | ||
| import io.netty.channel.kqueue.KQueue; | ||
| import io.netty.channel.uring.IoUring; | ||
| import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; | ||
| import org.apache.activemq.artemis.utils.Env; | ||
|
|
||
|
|
@@ -51,4 +52,17 @@ public static final boolean isKQueueAvailable() { | |
| return false; | ||
| } | ||
| } | ||
|
|
||
| public static final boolean isIoUringAvailable() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added via netty/netty#15785 a way to verify the status of the different optimizations made by io_uring on the OS - you can use it for reporting/debugging 👍
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was about to use it, but I see now that it's so recent that it's not in a release yet. 😆 |
||
| try { | ||
| return Env.isLinuxOs() && IoUring.isAvailable(); | ||
| } catch (NoClassDefFoundError noClassDefFoundError) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass(); | ||
| return false; | ||
| } catch (Throwable e) { | ||
| ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e); | ||
| return false; | ||
| } | ||
| } | ||
|
Comment on lines
+57
to
+66
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also feels like it would make more sense in commons (I know, the previous checks are in here; same comment to them) |
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,12 @@ | |
| */ | ||
| package org.apache.activemq.artemis.core.remoting.impl.netty; | ||
|
|
||
| import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; | ||
|
|
||
| import javax.net.ssl.SNIHostName; | ||
| import javax.net.ssl.SSLContext; | ||
| import javax.net.ssl.SSLEngine; | ||
| import javax.net.ssl.SSLParameters; | ||
| import java.io.IOException; | ||
| import java.lang.invoke.MethodHandles; | ||
| import java.net.ConnectException; | ||
| import java.net.InetAddress; | ||
| import java.net.InetSocketAddress; | ||
|
|
@@ -63,16 +62,19 @@ | |
| import io.netty.channel.ChannelPipeline; | ||
| import io.netty.channel.ChannelPromise; | ||
| import io.netty.channel.EventLoopGroup; | ||
| import io.netty.channel.MultiThreadIoEventLoopGroup; | ||
| import io.netty.channel.SimpleChannelInboundHandler; | ||
| import io.netty.channel.WriteBufferWaterMark; | ||
| import io.netty.channel.epoll.EpollEventLoopGroup; | ||
| import io.netty.channel.epoll.EpollIoHandler; | ||
| import io.netty.channel.epoll.EpollSocketChannel; | ||
| import io.netty.channel.group.ChannelGroup; | ||
| import io.netty.channel.group.DefaultChannelGroup; | ||
| import io.netty.channel.kqueue.KQueueEventLoopGroup; | ||
| import io.netty.channel.kqueue.KQueueIoHandler; | ||
| import io.netty.channel.kqueue.KQueueSocketChannel; | ||
| import io.netty.channel.nio.NioEventLoopGroup; | ||
| import io.netty.channel.nio.NioIoHandler; | ||
| import io.netty.channel.socket.nio.NioSocketChannel; | ||
| import io.netty.channel.uring.IoUringIoHandler; | ||
| import io.netty.channel.uring.IoUringSocketChannel; | ||
| import io.netty.handler.codec.base64.Base64; | ||
| import io.netty.handler.codec.http.DefaultFullHttpRequest; | ||
| import io.netty.handler.codec.http.DefaultHttpRequest; | ||
|
|
@@ -92,11 +94,11 @@ | |
| import io.netty.handler.codec.http.LastHttpContent; | ||
| import io.netty.handler.codec.http.cookie.ClientCookieDecoder; | ||
| import io.netty.handler.codec.http.cookie.Cookie; | ||
| import io.netty.handler.ssl.SslContext; | ||
| import io.netty.handler.codec.socksx.SocksVersion; | ||
| import io.netty.handler.proxy.ProxyHandler; | ||
| import io.netty.handler.proxy.Socks4ProxyHandler; | ||
| import io.netty.handler.proxy.Socks5ProxyHandler; | ||
| import io.netty.handler.ssl.SslContext; | ||
| import io.netty.handler.ssl.SslHandler; | ||
| import io.netty.resolver.NoopAddressResolverGroup; | ||
| import io.netty.util.AttributeKey; | ||
|
|
@@ -128,15 +130,16 @@ | |
| import org.apache.activemq.artemis.utils.PasswordMaskingUtil; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import java.lang.invoke.MethodHandles; | ||
|
|
||
| import static org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.NETTY_HTTP_HEADER_PREFIX; | ||
| import static org.apache.activemq.artemis.utils.Base64.encodeBytes; | ||
|
|
||
| public class NettyConnector extends AbstractConnector { | ||
|
|
||
| public static String NIO_CONNECTOR_TYPE = "NIO"; | ||
| public static String EPOLL_CONNECTOR_TYPE = "EPOLL"; | ||
| public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE"; | ||
| public static String IOURING_CONNECTOR_TYPE = "IO_URING"; | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
|
|
||
|
|
@@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector { | |
|
|
||
| private boolean useKQueue; | ||
|
|
||
| private boolean useIoUring; | ||
|
|
||
| private int remotingThreads; | ||
|
|
||
| private boolean useGlobalWorkerPool; | ||
|
|
@@ -404,6 +409,7 @@ public NettyConnector(final Map<String, Object> configuration, | |
|
|
||
| useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration); | ||
| useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration); | ||
| useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration); | ||
|
|
||
| useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); | ||
| host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); | ||
|
|
@@ -528,38 +534,54 @@ public synchronized void start() { | |
| return; | ||
| } | ||
|
|
||
| if (remotingThreads == -1) { | ||
| boolean defaultRemotingThreads = remotingThreads == -1; | ||
|
|
||
| if (defaultRemotingThreads) { | ||
| // Default to number of cores * 3 | ||
| remotingThreads = Runtime.getRuntime().availableProcessors() * 3; | ||
| } | ||
|
|
||
| String connectorType; | ||
|
|
||
| if (useEpoll && CheckDependencies.isEpollAvailable()) { | ||
| if (useIoUring && CheckDependencies.isIoUringAvailable()) { | ||
| //IO_URING should default to 1 remotingThread unless specified in config | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should use less than we use too with epoll - but 1 could be quite low. And check netty/netty#15524 if can help to scale up the number of event loops if required
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, setting it to 1 was a recommendation I found back from when netty io_uring was still an incubator project. I did some testing on this when I submitted the previous PR and saw that using 1 thread performed best up to something on the order of a few thousand connections and messages per second. After that it had to be increased but as I recall it, I saw no measurable improvement using more than just a few dedicated threads. Again, this was some time ago but I would guess up to ~5 threads or so on a decently sized server, say 16 cores. Perhaps it should be set to something like |
||
| remotingThreads = defaultRemotingThreads ? 1 : remotingThreads; | ||
|
|
||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, IoUringIoHandler.newFactory()))); | ||
| } else { | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, IoUringIoHandler.newFactory()); | ||
| } | ||
|
|
||
| connectorType = IOURING_CONNECTOR_TYPE; | ||
| channelClazz = IoUringSocketChannel.class; | ||
|
|
||
| logger.debug("Connector {} using native io_uring", this); | ||
| } else if (useEpoll && CheckDependencies.isEpollAvailable()) { | ||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, EpollIoHandler.newFactory()))); | ||
| } else { | ||
| group = new EpollEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, EpollIoHandler.newFactory()); | ||
| } | ||
| connectorType = EPOLL_CONNECTOR_TYPE; | ||
| channelClazz = EpollSocketChannel.class; | ||
| logger.debug("Connector {} using native epoll", this); | ||
| } else if (useKQueue && CheckDependencies.isKQueueAvailable()) { | ||
| if (useGlobalWorkerPool) { | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new KQueueEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, KQueueIoHandler.newFactory()))); | ||
| } else { | ||
| group = new KQueueEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, KQueueIoHandler.newFactory()); | ||
| } | ||
| connectorType = KQUEUE_CONNECTOR_TYPE; | ||
| channelClazz = KQueueSocketChannel.class; | ||
| logger.debug("Connector {} using native kqueue", this); | ||
| } else { | ||
| if (useGlobalWorkerPool) { | ||
| channelClazz = NioSocketChannel.class; | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory))); | ||
| group = SharedEventLoopGroup.getInstance((threadFactory -> new MultiThreadIoEventLoopGroup(remotingThreads, threadFactory, NioIoHandler.newFactory()))); | ||
| } else { | ||
| channelClazz = NioSocketChannel.class; | ||
| group = new NioEventLoopGroup(remotingThreads); | ||
| group = new MultiThreadIoEventLoopGroup(remotingThreads, NioIoHandler.newFactory()); | ||
| } | ||
| connectorType = NIO_CONNECTOR_TYPE; | ||
| channelClazz = NioSocketChannel.class; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not already the case from the parent?