diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java index 966a5c4a4b9..69fa39a7eb6 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/ActiveMQJMSConstants.java @@ -34,4 +34,7 @@ public class ActiveMQJMSConstants { public static final int INDIVIDUAL_ACKNOWLEDGE = 101; public static final String JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME = "amq.jms.support-bytes-id"; + + public static final String JMS_MAX_TEXT_MESSAGE_SIZE = "jmsMaxTextMessageSize"; + } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java index d9f8cc55aad..1757aba899d 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java @@ -20,6 +20,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -54,6 +55,7 @@ import org.apache.activemq.artemis.core.version.Version; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; @@ -134,6 +136,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme private final ConnectionFactoryOptions options; + private Integer jmsMaxTextMessageSize; public ActiveMQConnection(final ConnectionFactoryOptions options, final String username, @@ -170,6 +173,16 @@ public ActiveMQConnection(final ConnectionFactoryOptions options, this.enable1xPrefixes = enable1xPrefixes; creationStack = new Exception(); + + if (sessionFactory != null && sessionFactory.getConnectorConfiguration() != null) { + int maxSize = ConfigurationHelper.getIntProperty( + ActiveMQJMSConstants.JMS_MAX_TEXT_MESSAGE_SIZE, + 0, + sessionFactory.getConnectorConfiguration().getExtraParams()); + if (maxSize > 0) { + this.jmsMaxTextMessageSize = maxSize; + } + } } /** @@ -647,6 +660,10 @@ public void authorize(boolean validateClientId) throws JMSException { } } + public Optional getJmsMaxTextMessageSize() { + return Optional.ofNullable(jmsMaxTextMessageSize); + } + private void addSessionMetaData(ClientSession session) throws ActiveMQException { session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, ""); if (clientID != null) { @@ -680,7 +697,6 @@ public String getDeserializationAllowList() { return this.factoryReference.getDeserializationAllowList(); } - private static class JMSFailureListener implements SessionFailureListener { private final WeakReference connectionRef; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java index 4f6a3bd5d23..75927f715a8 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java @@ -39,6 +39,10 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage; +import java.util.Optional; + +import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE; + /** * ActiveMQ Artemis implementation of a JMS MessageConsumer. */ @@ -66,8 +70,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr private final SimpleString autoDeleteQueueName; - - protected ActiveMQMessageConsumer(final ConnectionFactoryOptions options, final ActiveMQConnection connection, final ActiveMQSession session, @@ -216,6 +218,17 @@ private ActiveMQMessage getMessage(final long timeout, final boolean noWait) thr ActiveMQMessage jmsMsg = null; if (coreMessage != null) { + + Optional jmsMaxTextMessageSize = connection.getJmsMaxTextMessageSize(); + if (jmsMaxTextMessageSize.isPresent()) { + if (coreMessage.getType() == TEXT_TYPE && coreMessage.getBodySize() > jmsMaxTextMessageSize.get()) { + String errorMsg = "The text message exceeds maximum set size of %d bytes.".formatted(jmsMaxTextMessageSize.get()); + ActiveMQException amqe = new ActiveMQException(errorMsg); + ActiveMQClientLogger.LOGGER.unableToGetMessage(amqe); + throw amqe; + } + } + ClientSession coreSession = session.getCoreSession(); boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE || diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java index 4011ba1c2a8..f3b6e8dd30c 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageConsumerTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.Wait; @@ -3670,6 +3671,92 @@ public void testExceptionMessageListenerStopSession() throws Exception { } } + @Test + public void testReceiveThrowsTextMessageAboveSetMaxSize() throws Exception { + Connection producerConnection = null; + + Connection consumerConnection = null; + + try { + String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"; + + producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL)); + + var cf = new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=20"); + cf.setMinLargeMessageSize(10); + consumerConnection = createConnection( + cf + ); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer queueProducer = producerSession.createProducer(queue1); + + MessageConsumer queueConsumer = consumerSession.createConsumer(queue1); + + consumerConnection.start(); + + TextMessage tm = producerSession.createTextMessage("Hello, world!"); + + queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + queueProducer.send(tm); + + JMSException exception = ProxyAssertSupport.assertThrows(JMSException.class, queueConsumer::receive); + ProxyAssertSupport.assertEquals("The text message exceeds maximum set size of 20 bytes.", exception.getMessage()); + } finally { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + } + } + + @Test + public void testReceiveDoesNotThrowWhenMessageIsBelowSetMaximuSize() throws Exception { + Connection producerConnection = null; + + Connection consumerConnection = null; + + try { + String brokerURL = "tcp://127.0.0.1:61616?blockOnAcknowledge=true&blockOnDurableSend=true&blockOnNonDurableSend=true"; + + producerConnection = createConnection(new ActiveMQConnectionFactory(brokerURL)); + + consumerConnection = createConnection( + new ActiveMQConnectionFactory(brokerURL + "&jmsMaxTextMessageSize=100") + ); + + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer queueProducer = producerSession.createProducer(queue1); + + MessageConsumer queueConsumer = consumerSession.createConsumer(queue1); + + consumerConnection.start(); + + TextMessage tm = producerSession.createTextMessage("Hello, world!"); + + queueProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + queueProducer.send(tm); + + ProxyAssertSupport.assertDoesNotThrow(queueConsumer::receive); + } finally { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + } + } private class ConnectionCloseMessageListener implements MessageListener { diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java index cf1bdb6d351..4af0b39ebc7 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/util/ProxyAssertSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.jms.tests.util; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; @@ -341,4 +342,22 @@ public static void assertNotSame(final java.lang.Object object, final java.lang. throw e; } } + + public static T assertThrows(Class expectedType, Executable executable) { + try { + return Assertions.assertThrows(expectedType, executable); + } catch (AssertionError e) { + logger.warn("AssertionFailure::", e); + throw e; + } + } + + public static void assertDoesNotThrow(Executable executable) { + try { + Assertions.assertDoesNotThrow(executable); + } catch (AssertionError e) { + logger.warn("AssertionFailure::", e); + throw e; + } + } }