diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2f077a0ac08..fada754d009 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -2015,6 +2016,11 @@ private int iterQueue(final int flushLimit, QueueIterateAction messageAction) throws Exception { int count = 0; int txCount = 0; + + if (filter1 != null) { + messageAction.addFilter(filter1); + } + // This is to avoid scheduling depaging while iterQueue is happening // this should minimize the use of the paged executor. depagePending = true; @@ -2033,10 +2039,12 @@ private int iterQueue(final int flushLimit, while (iter.hasNext() && !messageAction.expectedHitsReached(count)) { MessageReference ref = iter.next(); - if (filter1 == null || filter1.match(ref.getMessage())) { + if (messageAction.match(ref)) { if (messageAction.actMessage(tx, ref)) { iter.remove(); - refRemoved(ref); + if (!isLastValue()) { + refRemoved(ref); + } } txCount++; count++; @@ -2055,7 +2063,7 @@ private int iterQueue(final int flushLimit, return count; } - List cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage())); + List cancelled = scheduledDeliveryHandler.cancel(messageAction::match); for (MessageReference messageReference : cancelled) { messageAction.actMessage(tx, messageReference); count++; @@ -2078,12 +2086,12 @@ private int iterQueue(final int flushLimit, PagedReference reference = pageIterator.next(); pageIterator.remove(); - if (filter1 == null || filter1.match(reference.getMessage())) { - count++; - txCount++; + if (messageAction.match(reference)) { if (!messageAction.actMessage(tx, reference)) { addTail(reference, false); } + txCount++; + count++; } else { addTail(reference, false); } @@ -2401,71 +2409,48 @@ public void run() { } @Override - public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { - try (LinkedListIterator iter = iterator()) { - while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (ref.getMessage().getMessageID() == messageID) { - incDelivering(ref); - sendToDeadLetterAddress(null, ref); - iter.remove(); - refRemoved(ref); - return true; - } - } - if (pageIterator != null && !queueDestroyed) { - while (pageIterator.hasNext()) { - PagedReference ref = pageIterator.next(); - if (ref.getMessage().getMessageID() == messageID) { - incDelivering(ref); - sendToDeadLetterAddress(null, ref); - pageIterator.remove(); - refRemoved(ref); - return true; - } - } + public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { + + return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + + @Override + public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { + incDelivering(ref); + sendToDeadLetterAddress(tx, ref); + return true; } - return false; - } + }) == 1; } @Override - public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception { - + public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception { return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { - incDelivering(ref); - return sendToDeadLetterAddress(tx, ref); + sendToDeadLetterAddress(tx, ref); + return true; } }); } @Override - public synchronized boolean moveReference(final long messageID, + public boolean moveReference(final long messageID, final SimpleString toAddress, final Binding binding, final boolean rejectDuplicate) throws Exception { - try (LinkedListIterator iter = iterator()) { - while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (ref.getMessage().getMessageID() == messageID) { - iter.remove(); - refRemoved(ref); - incDelivering(ref); - try { - move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true); - } catch (Exception e) { - decDelivering(ref); - throw e; - } - return true; - } + + return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + + @Override + public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { + incDelivering(ref); + move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true); + return true; } - return false; - } + + }) == 1; } @Override @@ -2511,7 +2496,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception } if (!ignored) { - move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true); + move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true); } return true; @@ -2529,26 +2514,22 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception } @Override - public synchronized boolean copyReference(final long messageID, + public boolean copyReference(final long messageID, final SimpleString toQueue, final Binding binding) throws Exception { - try (LinkedListIterator iter = iterator()) { - while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (ref.getMessage().getMessageID() == messageID) { - try { - copy(null, toQueue, binding, ref); - } catch (Exception e) { - throw e; - } - return true; - } + + return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + + @Override + public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { + copy(tx, toQueue, binding, ref); + addTail(ref, false); + return true; } - return false; - } + }) == 1; } - public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception { + public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception { return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2617,40 +2598,35 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception } @Override - public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { - try (LinkedListIterator iter = iterator()) { + public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { + return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { - while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (ref.getMessage().getMessageID() == messageID) { - iter.remove(); + @Override + public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { + ref.getMessage().setPriority(newPriority); + if (isLastValue()) { refRemoved(ref); - ref.getMessage().setPriority(newPriority); - addTail(ref, false); - return true; } + addTail(ref, false); + return true; } - - return false; - } + }) == 1; } @Override - public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception { - try (LinkedListIterator iter = iterator()) { - int count = 0; - while (iter.hasNext()) { - MessageReference ref = iter.next(); - if (filter == null || filter.match(ref.getMessage())) { - count++; - iter.remove(); + public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception { + return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { + + @Override + public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { + ref.getMessage().setPriority(newPriority); + if (isLastValue()) { refRemoved(ref); - ref.getMessage().setPriority(newPriority); - addTail(ref, false); } + addTail(ref, false); + return true; } - return count; - } + }); } @Override @@ -4186,13 +4162,23 @@ public void run() { abstract class QueueIterateAction { protected Integer expectedHits; + protected Long messageID; + protected Filter filter1 = null; + protected Predicate match; QueueIterateAction(Integer expectedHits) { this.expectedHits = expectedHits; + this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage()); + } + + QueueIterateAction(Long messageID) { + this.expectedHits = 1; + this.match = ref -> ref.getMessage().getMessageID() == messageID; } QueueIterateAction() { this.expectedHits = null; + this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage()); } /** @@ -4207,6 +4193,15 @@ abstract class QueueIterateAction { public boolean expectedHitsReached(int currentHits) { return expectedHits != null && currentHits >= expectedHits.intValue(); } + + public void addFilter(Filter filter1) { + this.filter1 = filter1; + } + + public boolean match(MessageReference ref) { + return match.test(ref); + } + } // For external use we need to use a synchronized version since the list is not thread safe diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java index cb564b9e473..4593b2892dc 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithPagingServerTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.tests.integration.management; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -230,7 +229,7 @@ public void testCopyMessageWhilstPaging() throws Exception { long messageID = (Long) messages[99].get("messageID"); - assertFalse(queueControl.copyMessage(messageID, otherQueue.toString())); + assertTrue(queueControl.copyMessage(messageID, otherQueue.toString())); messageID = (Long) messages[0].get("messageID"); @@ -238,7 +237,7 @@ public void testCopyMessageWhilstPaging() throws Exception { Map[] copiedMessages = otherQueueControl.listMessages(null); - assertEquals(1, copiedMessages.length); + assertEquals(2, copiedMessages.length); } @Test @@ -281,8 +280,8 @@ public void testCopyMessageWhilstPagingSameAddress() throws Exception { messageID = (Long) otherMessages[100].get("messageID"); - //this should fail as the message was paged successfully - assertFalse(otherQueueControl.copyMessage(messageID, queue.toString())); + //this verifies copying of a paged message + assertTrue(otherQueueControl.copyMessage(messageID, queue.toString())); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 012b169bee3..f0fa7680b71 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -4947,7 +4947,157 @@ public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception { clientConsumer.close(); } + @TestTemplate + public void testChangeMessagesPriorityIncludesPagedMessage() throws Exception { + final SimpleString queueName = SimpleString.of("queue"); + final String sampleText = "Message Content"; + final int messageCount = 10; + + AddressSettings addressSettings = new AddressSettings().setMaxSizeBytes(200L); + server.getAddressSettingsRepository().addMatch(queueName.toString(), addressSettings); + session.createQueue(QueueConfiguration.of(queueName).setDurable(durable)); + + // Send message to queue, make sure it enters paging. + ClientProducer producer = session.createProducer(queueName); + for (int i = 0; i < messageCount; i++) { + producer.send(createTextMessage(session, sampleText).setPriority((byte) 2)); + } + + Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging); + + //Send identifiable message to paging queue + producer.send(createTextMessage(session, sampleText).setPriority((byte) 2)); + + QueueControl queueControl = createManagementControl(queueName, queueName); + + for (Map messageMap : queueControl.listMessages(null)) { + assertEquals(2, getPriorityStraight(messageMap.get("priority"))); + } + + queueControl.changeMessagesPriority(null, 5); + + //Make sure priority is changed on all messages + for (Map messageMap : queueControl.listMessages(null)) { + assertEquals(5, getPriorityStraight(messageMap.get("priority"))); + } + + queueControl.removeAllMessages(); + + } + + @TestTemplate + public void testChangeMessagePriorityIncludesPagedMessage() throws Exception { + final SimpleString queueName = SimpleString.of("queue"); + final String sampleText = "Message Content"; + final int messageCount = 10; + + AddressSettings addressSettings = new AddressSettings().setMaxSizeBytes(200L); + server.getAddressSettingsRepository().addMatch(queueName.toString(), addressSettings); + session.createQueue(QueueConfiguration.of(queueName).setDurable(durable)); + + // Send message to queue, make sure it enters paging. + ClientProducer producer = session.createProducer(queueName); + for (int i = 0; i < messageCount; i++) { + producer.send(createTextMessage(session, sampleText)); + } + + Wait.assertTrue(server.locateQueue(queueName).getPagingStore()::isPaging); + + //Send identifiable message to paging queue + producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique").setPriority((byte) 2)); + + QueueControl queueControl = createManagementControl(queueName, queueName); + Map[] messages = queueControl.listMessages(null); + long messageID = (Long) messages[messageCount].get("messageID"); + + assertEquals(2, getPriorityStraight(messages[messageCount].get("priority"))); + + queueControl.changeMessagePriority(messageID, 5); + + //Make sure priority is changed on the message + for (Map messageMap : queueControl.listMessages(null)) { + if (messageMap.get("messageID").equals(messageID)) { + assertEquals(5, getPriorityStraight(messageMap.get("priority"))); + } + } + + queueControl.removeAllMessages(); + + } + @TestTemplate + public void testMoveMessageIncludesPagedMessage() throws Exception { + final String queueNameMatch = "queue.#"; + final SimpleString queueName1 = SimpleString.of("queue.1"); + final SimpleString queueName2 = SimpleString.of("queue.2"); + final String sampleText = "Message Content"; + final int messageCount = 10; + + AddressSettings addressSettings = new AddressSettings().setMaxSizeBytes(200L); + server.getAddressSettingsRepository().addMatch(queueNameMatch, addressSettings); + session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable)); + session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable)); + + // Send message to queue, make sure it enters paging. + ClientProducer producer = session.createProducer(queueName1); + for (int i = 0; i < messageCount; i++) { + producer.send(createTextMessage(session, sampleText)); + } + + Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging); + + //Send identifiable message to paging queue + producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique").setPriority((byte) 2)); + + QueueControl queueControl = createManagementControl(queueName1, queueName1); + Map[] messages = queueControl.listMessages(null); + long messageID = (Long) messages[messageCount].get("messageID"); + + assertTrue(server.locateQueue(queueName1).getMessageCount() == messageCount + 1); + + queueControl.moveMessage(messageID, queueName2.toString()); + + assertTrue(server.locateQueue(queueName1).getMessageCount() == messageCount); + assertTrue(server.locateQueue(queueName2).getMessageCount() == 1); + + } + + @TestTemplate + public void testCopyMessageIncludesPagedMessage() throws Exception { + final String queueNameMatch = "queue.#"; + final SimpleString queueName1 = SimpleString.of("queue.1"); + final SimpleString queueName2 = SimpleString.of("queue.2"); + final String sampleText = "Message Content"; + final int messageCount = 10; + + AddressSettings addressSettings = new AddressSettings().setMaxSizeBytes(200L); + server.getAddressSettingsRepository().addMatch(queueNameMatch, addressSettings); + session.createQueue(QueueConfiguration.of(queueName1).setDurable(durable)); + session.createQueue(QueueConfiguration.of(queueName2).setDurable(durable)); + + // Send message to queue, make sure it enters paging. + ClientProducer producer = session.createProducer(queueName1); + for (int i = 0; i < messageCount; i++) { + producer.send(createTextMessage(session, sampleText)); + } + + Wait.assertTrue(server.locateQueue(queueName1).getPagingStore()::isPaging); + + //Send identifiable message to paging queue + producer.send(createTextMessage(session, sampleText).putStringProperty("myID", "unique").setPriority((byte) 2)); + + QueueControl queueControl = createManagementControl(queueName1, queueName1); + Map[] messages = queueControl.listMessages(null); + long messageID = (Long) messages[messageCount].get("messageID"); + + assertTrue(server.locateQueue(queueName1).getMessageCount() == messageCount + 1); + + queueControl.copyMessage(messageID, queueName2.toString()); + + assertTrue(server.locateQueue(queueName1).getMessageCount() == messageCount + 1); + assertTrue(server.locateQueue(queueName2).getMessageCount() == 1); + + } @Override @BeforeEach @@ -4964,6 +5114,17 @@ public void setUp() throws Exception { session.start(); } + protected int getPriorityStraight(Object priority) { + if (priority instanceof Byte) { + return ((Byte) priority).intValue(); + + } else if (priority instanceof Long) { + return ((Long) priority).intValue(); + } + + return (Integer) priority; + } + protected long getFirstMessageId(final QueueControl queueControl) throws Exception { JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON()); JsonObject object = (JsonObject) array.get(0); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index 5cea1efd54e..8417ac6cbae 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -150,7 +150,7 @@ public boolean changeMessagePriority(final long messageID, final int newPriority @Override public int changeMessagesPriority(final String filter, final int newPriority) throws Exception { - return (Integer) proxy.invokeOperation("changeMessagesPriority", filter, newPriority); + return (Integer) proxy.invokeOperation(Integer.class, "changeMessagesPriority", filter, newPriority); } @Override