From c792765fd5622d8774e41eaade2824a2ddfb489d Mon Sep 17 00:00:00 2001 From: Alexej Timonin Date: Sun, 18 May 2025 21:47:19 +0200 Subject: [PATCH] ARTEMIS-5483: jmsMaxTextMessageSize Adds a new configuration option for Jakarta client consumers. When this configuration is set, the consumer will throw a javax.jms.JMSException on receiving large text(type 3) message that would exceed the specified size (in bytes) when creating javax.jms.TextMessage jmsMaxTextMessageSize is intended to prevent consumers of javax.jms.TextMessage from crashing due to out-of-memory when receiving messages that are larger than consumers memory. The configuration is provided as a URL property when establishing a connection to the broker. Example: Throw on large text messages exceeding 10MB in size: tcp://localhost:61616?jmsMaxTextMessageSize=10000000 Note: This option relies on how core client consumer works. When message is above certain size threshold (default 100KB) it will be considered as a large message and be delivered in parts with headers first. With the help of headers we're able to determine the size of the incoming body and reject if it is above jmsMaxTextMessageSize. If the message is below large message threshold then this option has no real defensive effect since the message will be read into memory anyway. --- .../artemis/api/jms/ActiveMQJMSConstants.java | 3 + .../jms/client/ActiveMQConnection.java | 18 +++- .../jms/client/ActiveMQMessageConsumer.java | 17 +++- .../jms/tests/MessageConsumerTest.java | 87 +++++++++++++++++++ .../jms/tests/util/ProxyAssertSupport.java | 19 ++++ 5 files changed, 141 insertions(+), 3 deletions(-) diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java index 966a5c4a4b9..69fa39a7eb6 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java @@ -34,4 +34,7 @@ public class ActiveMQJMSConstants { public static final int INDIVIDUAL_ACKNOWLEDGE = 101; public static final String JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME = "amq.jms.support-bytes-id"; + + public static final String JMS_MAX_TEXT_MESSAGE_SIZE = "jmsMaxTextMessageSize"; + } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index d9f8cc55aad..1757aba899d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -20,6 +20,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,6 +55,7 @@ import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; @@ -134,6 +136,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private final ConnectionFactoryOptions options; + private Integer jmsMaxTextMessageSize; public ActiveMQConnection(final ConnectionFactoryOptions options, final String username, @@ -170,6 +173,16 @@ public ActiveMQConnection(final ConnectionFactoryOptions options, this.enable1xPrefixes = enable1xPrefixes; creationStack = new Exception(); + + if (sessionFactory != null && sessionFactory.getConnectorConfiguration() != null) { + int maxSize = ConfigurationHelper.getIntProperty( + ActiveMQJMSConstants.JMS_MAX_TEXT_MESSAGE_SIZE, + 0, + sessionFactory.getConnectorConfiguration().getExtraParams()); + if (maxSize > 0) { + this.jmsMaxTextMessageSize = maxSize; + } + } } /** @@ -647,6 +660,10 @@ public void authorize(boolean validateClientId) throws JMSException { } } + public Optional getJmsMaxTextMessageSize() { + return Optional.ofNullable(jmsMaxTextMessageSize); + } + private void addSessionMetaData(ClientSession session) throws ActiveMQException { session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, ""); if (clientID != null) { @@ -680,7 +697,6 @@ public String getDeserializationAllowList() { return this.factoryReference.getDeserializationAllowList(); } - private static class JMSFailureListener implements SessionFailureListener { private final WeakReference connectionRef; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 4f6a3bd5d23..75927f715a8 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -39,6 +39,10 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage; +import java.util.Optional; + +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + /** * ActiveMQ Artemis implementation of a JMS MessageConsumer. */ @@ -66,8 +70,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr private final SimpleString autoDeleteQueueName; - - protected ActiveMQMessageConsumer(final ConnectionFactoryOptions options, final ActiveMQConnection connection, final ActiveMQSession session, @@ -216,6 +218,17 @@ private ActiveMQMessage getMessage(final long timeout, final boolean noWait) thr ActiveMQMessage jmsMsg = null; if (coreMessage != null) { + + Optional jmsMaxTextMessageSize = connection.getJmsMaxTextMessageSize(); + if (jmsMaxTextMessageSize.isPresent()) { + if (coreMessage.getType() == TEXT_TYPE && coreMessage.getBodySize() > jmsMaxTextMessageSize.get()) { + String errorMsg = "The text message exceeds maximum set size of %d bytes.".formatted(jmsMaxTextMessageSize.get()); + ActiveMQException amqe = new ActiveMQException(errorMsg); + ActiveMQClientLogger.LOGGER.unableToGetMessage(amqe); + throw amqe; + } + } + ClientSession coreSession = session.getCoreSession(); boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE || diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java index 4011ba1c2a8..f3b6e8dd30c 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.Wait; @@ -3670,6 +3671,92 @@ public void testExceptionMessageListenerStopSession() throws Exception { } } + @Test + public void testReceiveThrowsTextMessageAboveSetMaxSize() throws Exception { + Connection producerConnection = null; + + Connection consumerConnection = null; + + try { + String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"; + + producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL)); + + var cf = new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=20"); + cf.setMinLargeMessageSize(10); + consumerConnection = createConnection( + cf + ); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer queueProducer = producerSession.createProducer(queue1); + + MessageConsumer queueConsumer = consumerSession.createConsumer(queue1); + + consumerConnection.start(); + + TextMessage tm = producerSession.createTextMessage("Hello, world!"); + + queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + queueProducer.send(tm); + + JMSException exception = ProxyAssertSupport.assertThrows(JMSException.class, queueConsumer::receive); + ProxyAssertSupport.assertEquals("The text message exceeds maximum set size of 20 bytes.", exception.getMessage()); + } finally { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + } + } + + @Test + public void testReceiveDoesNotThrowWhenMessageIsBelowSetMaximuSize() throws Exception { + Connection producerConnection = null; + + Connection consumerConnection = null; + + try { + String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"; + + producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL)); + + consumerConnection = createConnection( + new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=100") + ); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer queueProducer = producerSession.createProducer(queue1); + + MessageConsumer queueConsumer = consumerSession.createConsumer(queue1); + + consumerConnection.start(); + + TextMessage tm = producerSession.createTextMessage("Hello, world!"); + + queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + queueProducer.send(tm); + + ProxyAssertSupport.assertDoesNotThrow(queueConsumer::receive); + } finally { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + } + } private class ConnectionCloseMessageListener implements MessageListener { diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java index cf1bdb6d351..4af0b39ebc7 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jms.tests.util; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -341,4 +342,22 @@ public static void assertNotSame(final java.lang.Object object, final java.lang. throw e; } } + + public static T assertThrows(Class expectedType, Executable executable) { + try { + return Assertions.assertThrows(expectedType, executable); + } catch (AssertionError e) { + logger.warn("AssertionFailure::", e); + throw e; + } + } + + public static void assertDoesNotThrow(Executable executable) { + try { + Assertions.assertDoesNotThrow(executable); + } catch (AssertionError e) { + logger.warn("AssertionFailure::", e); + throw e; + } + } }