Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -2015,6 +2016,11 @@ private int iterQueue(final int flushLimit,
QueueIterateAction messageAction) throws Exception {
int count = 0;
int txCount = 0;

if (filter1 != null) {
messageAction.addFilter(filter1);
}

// This is to avoid scheduling depaging while iterQueue is happening
// this should minimize the use of the paged executor.
depagePending = true;
Expand All @@ -2033,10 +2039,12 @@ private int iterQueue(final int flushLimit,
while (iter.hasNext() && !messageAction.expectedHitsReached(count)) {
MessageReference ref = iter.next();

if (filter1 == null || filter1.match(ref.getMessage())) {
if (messageAction.match(ref)) {
if (messageAction.actMessage(tx, ref)) {
iter.remove();
refRemoved(ref);
if (!isLastValue()) {
refRemoved(ref);
}
}
txCount++;
count++;
Expand All @@ -2055,7 +2063,7 @@ private int iterQueue(final int flushLimit,
return count;
}

List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1 == null ? true : filter1.match(ref.getMessage()));
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(messageAction::match);
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
Expand All @@ -2078,12 +2086,12 @@ private int iterQueue(final int flushLimit,
PagedReference reference = pageIterator.next();
pageIterator.remove();

if (filter1 == null || filter1.match(reference.getMessage())) {
count++;
txCount++;
if (messageAction.match(reference)) {
if (!messageAction.actMessage(tx, reference)) {
addTail(reference, false);
}
txCount++;
count++;
} else {
addTail(reference, false);
}
Expand Down Expand Up @@ -2401,71 +2409,48 @@ public void run() {
}

@Override
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
iter.remove();
refRemoved(ref);
return true;
}
}
if (pageIterator != null && !queueDestroyed) {
while (pageIterator.hasNext()) {
PagedReference ref = pageIterator.next();
if (ref.getMessage().getMessageID() == messageID) {
incDelivering(ref);
sendToDeadLetterAddress(null, ref);
pageIterator.remove();
refRemoved(ref);
return true;
}
}
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception {

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering(ref);
sendToDeadLetterAddress(tx, ref);
return true;
}
return false;
}
}) == 1;
}

@Override
public synchronized int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {

public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {

incDelivering(ref);
return sendToDeadLetterAddress(tx, ref);
sendToDeadLetterAddress(tx, ref);
return true;
}
});
}

@Override
public synchronized boolean moveReference(final long messageID,
public boolean moveReference(final long messageID,
final SimpleString toAddress,
final Binding binding,
final boolean rejectDuplicate) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
iter.remove();
refRemoved(ref);
incDelivering(ref);
try {
move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
} catch (Exception e) {
decDelivering(ref);
throw e;
}
return true;
}

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
incDelivering(ref);
move(tx, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null, null, true);
return true;
}
return false;
}

}) == 1;
}

@Override
Expand Down Expand Up @@ -2511,7 +2496,7 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
}

if (!ignored) {
move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
move(tx, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null, null, true);
}

return true;
Expand All @@ -2529,26 +2514,22 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
}

@Override
public synchronized boolean copyReference(final long messageID,
public boolean copyReference(final long messageID,
final SimpleString toQueue,
final Binding binding) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
try {
copy(null, toQueue, binding, ref);
} catch (Exception e) {
throw e;
}
return true;
}

return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
copy(tx, toQueue, binding, ref);
addTail(ref, false);
return true;
}
return false;
}
}) == 1;
}

public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
Expand Down Expand Up @@ -2617,40 +2598,35 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
}

@Override
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) {

while (iter.hasNext()) {
MessageReference ref = iter.next();
if (ref.getMessage().getMessageID() == messageID) {
iter.remove();
@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
ref.getMessage().setPriority(newPriority);
if (isLastValue()) {
refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
return true;
}
addTail(ref, false);
return true;
}

return false;
}
}) == 1;
}

@Override
public synchronized int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
try (LinkedListIterator<MessageReference> iter = iterator()) {
int count = 0;
while (iter.hasNext()) {
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage())) {
count++;
iter.remove();
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception {
return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {

@Override
public boolean actMessage(Transaction tx, MessageReference ref) throws Exception {
ref.getMessage().setPriority(newPriority);
if (isLastValue()) {
refRemoved(ref);
ref.getMessage().setPriority(newPriority);
addTail(ref, false);
}
addTail(ref, false);
return true;
}
return count;
}
});
}

@Override
Expand Down Expand Up @@ -4186,13 +4162,23 @@ public void run() {
abstract class QueueIterateAction {

protected Integer expectedHits;
protected Long messageID;
protected Filter filter1 = null;
protected Predicate<MessageReference> match;

QueueIterateAction(Integer expectedHits) {
this.expectedHits = expectedHits;
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
}

QueueIterateAction(Long messageID) {
this.expectedHits = 1;
this.match = ref -> ref.getMessage().getMessageID() == messageID;
}

QueueIterateAction() {
this.expectedHits = null;
this.match = ref -> filter1 == null ? true : filter1.match(ref.getMessage());
}

/**
Expand All @@ -4207,6 +4193,15 @@ abstract class QueueIterateAction {
public boolean expectedHitsReached(int currentHits) {
return expectedHits != null && currentHits >= expectedHits.intValue();
}

public void addFilter(Filter filter1) {
this.filter1 = filter1;
}

public boolean match(MessageReference ref) {
return match.test(ref);
}

}

// For external use we need to use a synchronized version since the list is not thread safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.integration.management;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -230,15 +229,15 @@ public void testCopyMessageWhilstPaging() throws Exception {

long messageID = (Long) messages[99].get("messageID");

assertFalse(queueControl.copyMessage(messageID, otherQueue.toString()));
assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));

messageID = (Long) messages[0].get("messageID");

assertTrue(queueControl.copyMessage(messageID, otherQueue.toString()));

Map<String, Object>[] copiedMessages = otherQueueControl.listMessages(null);

assertEquals(1, copiedMessages.length);
assertEquals(2, copiedMessages.length);
}

@Test
Expand Down Expand Up @@ -281,8 +280,8 @@ public void testCopyMessageWhilstPagingSameAddress() throws Exception {

messageID = (Long) otherMessages[100].get("messageID");

//this should fail as the message was paged successfully
assertFalse(otherQueueControl.copyMessage(messageID, queue.toString()));
//this verifies copying of a paged message
assertTrue(otherQueueControl.copyMessage(messageID, queue.toString()));
}

@Test
Expand Down
Loading