From 285ebfae48fce2b8807e84a065d4f3c146eb6f7a Mon Sep 17 00:00:00 2001 From: qianye Date: Fri, 29 May 2026 16:37:47 +0800 Subject: [PATCH 1/2] [ISSUE #10405] Fix orderly consumer retry message routed to wrong broker via TBW102 fallback Co-Authored-By: Claude Opus 4.6 --- .../ConsumeMessageOrderlyService.java | 34 ++++++++++++++++++- .../ConsumeMessagePopOrderlyService.java | 34 ++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 3ca465da70d..eb411d42269 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -390,7 +392,9 @@ public boolean sendMessageBack(final MessageExt msg) { MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + if (!trySendToTopicBroker(newMsg, msg)) { + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + } return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); @@ -399,6 +403,34 @@ public boolean sendMessageBack(final MessageExt msg) { return false; } + private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { + Set mqs = this.defaultMQPushConsumerImpl.getRebalanceImpl() + .getTopicSubscribeInfoTable().get(originalMsg.getTopic()); + if (mqs == null || mqs.isEmpty()) { + return false; + } + + Set brokerNames = new LinkedHashSet<>(); + String originalBroker = originalMsg.getBrokerName(); + if (originalBroker != null) { + brokerNames.add(originalBroker); + } + for (MessageQueue mq : mqs) { + brokerNames.add(mq.getBrokerName()); + } + + for (String brokerName : brokerNames) { + try { + MessageQueue retryMQ = new MessageQueue(retryMsg.getTopic(), brokerName, 0); + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(retryMsg, retryMQ); + return true; + } catch (Exception e) { + log.warn("Failed to send retry message to broker {}, trying next", brokerName, e); + } + } + return false; + } + public void resetNamespace(final List msgs) { for (MessageExt msg : msgs) { if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java index 4eab1ccf664..5df7fbd9e02 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java @@ -18,7 +18,9 @@ import io.netty.util.internal.ConcurrentSet; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -304,7 +306,9 @@ public boolean sendMessageBack(final MessageExt msg) { MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); - this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + if (!trySendToTopicBroker(newMsg, msg)) { + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(newMsg); + } return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); @@ -313,6 +317,34 @@ public boolean sendMessageBack(final MessageExt msg) { return false; } + private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { + Set mqs = this.defaultMQPushConsumerImpl.getRebalanceImpl() + .getTopicSubscribeInfoTable().get(originalMsg.getTopic()); + if (mqs == null || mqs.isEmpty()) { + return false; + } + + Set brokerNames = new LinkedHashSet<>(); + String originalBroker = originalMsg.getBrokerName(); + if (originalBroker != null) { + brokerNames.add(originalBroker); + } + for (MessageQueue mq : mqs) { + brokerNames.add(mq.getBrokerName()); + } + + for (String brokerName : brokerNames) { + try { + MessageQueue retryMQ = new MessageQueue(retryMsg.getTopic(), brokerName, 0); + this.defaultMQPushConsumerImpl.getmQClientFactory().getDefaultMQProducer().send(retryMsg, retryMQ); + return true; + } catch (Exception e) { + log.warn("Failed to send retry message to broker {}, trying next", brokerName, e); + } + } + return false; + } + public void resetNamespace(final List msgs) { for (MessageExt msg : msgs) { if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { From 3692d1b1d9fbafe7bbfe32605c1103f238504269 Mon Sep 17 00:00:00 2001 From: qianye Date: Fri, 29 May 2026 16:51:15 +0800 Subject: [PATCH 2/2] fix: add null check for topicSubscribeInfoTable in trySendToTopicBroker Co-Authored-By: Claude Opus 4.6 --- .../impl/consumer/ConsumeMessageOrderlyService.java | 8 ++++++-- .../impl/consumer/ConsumeMessagePopOrderlyService.java | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index eb411d42269..040605e411c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -404,8 +404,12 @@ public boolean sendMessageBack(final MessageExt msg) { } private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { - Set mqs = this.defaultMQPushConsumerImpl.getRebalanceImpl() - .getTopicSubscribeInfoTable().get(originalMsg.getTopic()); + java.util.concurrent.ConcurrentMap> table = + this.defaultMQPushConsumerImpl.getRebalanceImpl().getTopicSubscribeInfoTable(); + if (table == null) { + return false; + } + Set mqs = table.get(originalMsg.getTopic()); if (mqs == null || mqs.isEmpty()) { return false; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java index 5df7fbd9e02..88cca72ac51 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopOrderlyService.java @@ -318,8 +318,12 @@ public boolean sendMessageBack(final MessageExt msg) { } private boolean trySendToTopicBroker(Message retryMsg, MessageExt originalMsg) { - Set mqs = this.defaultMQPushConsumerImpl.getRebalanceImpl() - .getTopicSubscribeInfoTable().get(originalMsg.getTopic()); + java.util.concurrent.ConcurrentMap> table = + this.defaultMQPushConsumerImpl.getRebalanceImpl().getTopicSubscribeInfoTable(); + if (table == null) { + return false; + } + Set mqs = table.get(originalMsg.getTopic()); if (mqs == null || mqs.isEmpty()) { return false; }