diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 3ca465da70d..040605e411c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -390,7 +392,9 @@ public boolean sendMessageBack(final MessageExt msg) { MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + if (!trySendToTopicBroker(newMsg, msg)) { + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + } return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); @@ -399,6 +403,38 @@ public boolean sendMessageBack(final MessageExt msg) { return false; } + private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { + java.util.concurrent.ConcurrentMap> table = + this.defaultMQPushConsumerImpl.getRebalanceImpl().getTopicSubscribeInfoTable(); + if (table == null) { + return false; + } + Set mqs = table.get(originalMsg.getTopic()); + if (mqs == null || mqs.isEmpty()) { + return false; + } + + Set brokerNames = new LinkedHashSet<>(); + String originalBroker = originalMsg.getBrokerName(); + if (originalBroker != null) { + brokerNames.add(originalBroker); + } + for (MessageQueue mq : mqs) { + brokerNames.add(mq.getBrokerName()); + } + + for (String brokerName : brokerNames) { + try { + MessageQueue retryMQ = new MessageQueue(retryMsg.getTopic(), brokerName, 0); + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(retryMsg, retryMQ); + return true; + } catch (Exception e) { + log.warn("Failed to send retry message to broker {}, trying next", brokerName, e); + } + } + return false; + } + public void resetNamespace(final List msgs) { for (MessageExt msg : msgs) { if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java index 4eab1ccf664..88cca72ac51 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java @@ -18,7 +18,9 @@ import io.netty.util.internal.ConcurrentSet; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -304,7 +306,9 @@ public boolean sendMessageBack(final MessageExt msg) { MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + if (!trySendToTopicBroker(newMsg, msg)) { + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + } return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); @@ -313,6 +317,38 @@ public boolean sendMessageBack(final MessageExt msg) { return false; } + private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { + java.util.concurrent.ConcurrentMap> table = + this.defaultMQPushConsumerImpl.getRebalanceImpl().getTopicSubscribeInfoTable(); + if (table == null) { + return false; + } + Set mqs = table.get(originalMsg.getTopic()); + if (mqs == null || mqs.isEmpty()) { + return false; + } + + Set brokerNames = new LinkedHashSet<>(); + String originalBroker = originalMsg.getBrokerName(); + if (originalBroker != null) { + brokerNames.add(originalBroker); + } + for (MessageQueue mq : mqs) { + brokerNames.add(mq.getBrokerName()); + } + + for (String brokerName : brokerNames) { + try { + MessageQueue retryMQ = new MessageQueue(retryMsg.getTopic(), brokerName, 0); + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(retryMsg, retryMQ); + return true; + } catch (Exception e) { + log.warn("Failed to send retry message to broker {}, trying next", brokerName, e); + } + } + return false; + } + public void resetNamespace(final List msgs) { for (MessageExt msg : msgs) { if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {