From 18ccef3037064d7374c5dc6307e903540a1524be Mon Sep 17 00:00:00 2001 From: somiljain2006 Date: Wed, 27 May 2026 17:08:13 +0530 Subject: [PATCH] Support separate retry queues for priority topics --- .../broker/processor/PopReviveService.java | 26 +++- .../processor/PopReviveServiceTest.java | 146 +++++++++++++++++- .../apache/rocketmq/common/BrokerConfig.java | 10 ++ .../rocketmq/common/BrokerConfigTest.java | 9 ++ 4 files changed, 183 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 07f16e98965..616a35b7389 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -134,7 +134,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); - msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt)); + msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), popCheckPoint.getCId(), messageExt)); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { @@ -166,12 +166,13 @@ private void initPopRetryOffset(String retryTopic, String consumerGroup, int ret public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) { if (brokerController != null) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic); - if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + boolean useSeparate = checkUseSeparateRetryQueue(retryTopic, consumerGroup); + if (topicConfig != null && !useSeparate) { return; } int retryQueueNum = PopAckConstants.retryQueueNum; - if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + if (useSeparate) { String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup); TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists retryQueueNum = normalConfig.getWriteQueueNums(); @@ -193,8 +194,8 @@ public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) { } } - private int getRetryQueueId(String retryTopic, MessageExt messageExt) { - if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + private int getRetryQueueId(String retryTopic, String consumerGroup, MessageExt messageExt) { + if (!checkUseSeparateRetryQueue(retryTopic, consumerGroup)) { return 0; } int oriQueueId = messageExt.getQueueId(); // original qid of normal or retry topic @@ -729,4 +730,19 @@ ArrayList genSortList() { return sortList; } } + + private boolean checkUseSeparateRetryQueue(String retryTopic, String consumerGroup) { + if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) { + return true; + } + if (brokerController.getBrokerConfig().isUseSeparateRetryQueueForPriorityTopic()) { + String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup); + TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); + if (topicConfig != null && topicConfig.getAttributes() != null) { + String priorityStr = topicConfig.getAttributes().get("priority"); + return Boolean.parseBoolean(priorityStr); + } + } + return false; + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index fa7e9982e1f..c25bd2a6e7a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -53,11 +53,14 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Method; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -104,7 +107,6 @@ public class PopReviveServiceTest { private BrokerMetricsManager brokerMetricsManager; @Mock private PopMetricsManager popMetricsManager; - private PopMessageProcessor popMessageProcessor; private BrokerConfig brokerConfig; private PopReviveService popReviveService; @@ -129,8 +131,7 @@ public void before() { // Initialize BrokerMetricsManager for tests when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager); when(brokerMetricsManager.getPopMetricsManager()).thenReturn(popMetricsManager); - - popMessageProcessor = new PopMessageProcessor(brokerController); // a real one, not mock + PopMessageProcessor popMessageProcessor = new PopMessageProcessor(brokerController); // a real one, not mock when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor); popReviveService = spy(new PopReviveService(brokerController, REVIVE_TOPIC, REVIVE_QUEUE_ID)); @@ -555,4 +556,143 @@ public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, Ack return msgInner; } + + @Test + public void testRetryQueueRouting_GlobalSwitchOverridesAll() throws Exception { + brokerConfig.setUseSeparateRetryQueue(true); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(false); + + Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class); + method.setAccessible(true); + + boolean result = (boolean) method.invoke(popReviveService, "%RETRY%groupA", "groupA"); + Assert.assertTrue("Global switch should force separate retry queue", result); + } + + @Test + public void testRetryQueueRouting_PriorityTopicWithNewFlag() throws Exception { + brokerConfig.setUseSeparateRetryQueue(false); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true); + + TopicConfig priorityTopicConfig = new TopicConfig("NormalPriorityTopic"); + Map attributes = new HashMap<>(); + attributes.put("priority", "true"); + priorityTopicConfig.setAttributes(attributes); + when(topicConfigManager.selectTopicConfig("NormalPriorityTopic")).thenReturn(priorityTopicConfig); + + Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class); + method.setAccessible(true); + + String retryTopic = KeyBuilder.buildPopRetryTopic("NormalPriorityTopic", "groupA", false); + boolean result = (boolean) method.invoke(popReviveService, retryTopic, "groupA"); + + Assert.assertTrue("Priority topic with new flag should use separate retry queue", result); + } + + @Test + public void testRetryQueueRouting_NormalTopicWithNewFlag() throws Exception { + brokerConfig.setUseSeparateRetryQueue(false); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true); + + TopicConfig normalTopicConfig = new TopicConfig("JustANormalTopic"); + when(topicConfigManager.selectTopicConfig("JustANormalTopic")).thenReturn(normalTopicConfig); + + java.lang.reflect.Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class); + method.setAccessible(true); + + String retryTopic = KeyBuilder.buildPopRetryTopic("JustANormalTopic", "groupA", false); + boolean result = (boolean) method.invoke(popReviveService, retryTopic, "groupA"); + + Assert.assertFalse("Normal topic should NOT use separate retry queue (will fallback to queue 0)", result); + } + + @Test + public void testRetryQueueRouting_BothFlagsDisabled() throws Exception { + brokerConfig.setUseSeparateRetryQueue(false); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(false); + + Method method = PopReviveService.class.getDeclaredMethod("checkUseSeparateRetryQueue", String.class, String.class); + method.setAccessible(true); + + boolean result = (boolean) method.invoke(popReviveService, "%RETRY%groupA", "groupA"); + Assert.assertFalse("Should use shared retry queue 0 when all flags are false", result); + } + + @Test + public void testPriorityInversionBugIsFixed_QueueIsolation() throws Throwable { + brokerConfig.setUseSeparateRetryQueue(false); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true); + + String consumerGroup = "PriorityGroup"; + String normalTopic = "NormalTopic"; + String priorityTopic = "PriorityTopic"; + + TopicConfig normalTopicConfig = new TopicConfig(normalTopic); + when(topicConfigManager.selectTopicConfig(normalTopic)).thenReturn(normalTopicConfig); + + TopicConfig priorityTopicConfig = new TopicConfig(priorityTopic); + java.util.Map attributes = new java.util.HashMap<>(); + attributes.put("priority", "true"); + priorityTopicConfig.setAttributes(attributes); + when(topicConfigManager.selectTopicConfig(priorityTopic)).thenReturn(priorityTopicConfig); + + long pastPopTime = System.currentTimeMillis() - 5000; + + PopCheckPoint normalCk = buildPopCheckPoint(100, pastPopTime, 1); + normalCk.setTopic(normalTopic); + normalCk.setCId(consumerGroup); + normalCk.setQueueId(0); + + PopCheckPoint priorityCk = buildPopCheckPoint(200, pastPopTime, 2); + priorityCk.setTopic(priorityTopic); + priorityCk.setCId(consumerGroup); + priorityCk.setQueueId(1); + + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("normal", normalCk); + reviveObj.map.put("priority", priorityCk); + reviveObj.endTime = System.currentTimeMillis(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenAnswer(invocation -> { + long offset = invocation.getArgument(1); + MessageExt msgExt = new MessageExt(); + + if (offset == 100) { + msgExt.setTopic(normalTopic); + msgExt.setQueueId(0); + msgExt.putUserProperty("TEST_REAL_TOPIC", normalTopic); + } else { + msgExt.setTopic(priorityTopic); + msgExt.setQueueId(1); + msgExt.putUserProperty("TEST_REAL_TOPIC", priorityTopic); + } + return CompletableFuture.completedFuture(Triple.of(msgExt, "", false)); + }); + + List savedRetryMessages = new ArrayList<>(); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + savedRetryMessages.add(msg); + return new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)); + }); + + popReviveService.mergeAndRevive(reviveObj); + + Assert.assertEquals("Both messages should be revived", 2, savedRetryMessages.size()); + + MessageExtBrokerInner savedNormalMsg = savedRetryMessages.stream() + .filter(m -> normalTopic.equals(m.getProperty("TEST_REAL_TOPIC"))) + .findFirst().orElseThrow(() -> new AssertionError("Normal message not found")); + + MessageExtBrokerInner savedPriorityMsg = savedRetryMessages.stream() + .filter(m -> priorityTopic.equals(m.getProperty("TEST_REAL_TOPIC"))) + .findFirst().orElseThrow(() -> new AssertionError("Priority message not found")); + + Assert.assertEquals("Normal retry messages must be blocked in shared Queue 0", + 0, savedNormalMsg.getQueueId()); + + Assert.assertEquals("High-priority retry messages MUST be isolated to their own queue to prevent priority inversion!", + 1, savedPriorityMsg.getQueueId()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 08e27a20ee3..379a96d26e3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -572,6 +572,8 @@ public class BrokerConfig extends BrokerIdentity { private int liteLagLatencyTopK = 50; + private boolean useSeparateRetryQueueForPriorityTopic = false; + public String getConfigBlackList() { return configBlackList; } @@ -2504,4 +2506,12 @@ public int getMaxMessageFilterNumForNotification() { public void setMaxMessageFilterNumForNotification(int maxMessageFilterNumForNotification) { this.maxMessageFilterNumForNotification = maxMessageFilterNumForNotification; } + + public boolean isUseSeparateRetryQueueForPriorityTopic() { + return useSeparateRetryQueueForPriorityTopic; + } + + public void setUseSeparateRetryQueueForPriorityTopic(boolean useSeparateRetryQueueForPriorityTopic) { + this.useSeparateRetryQueueForPriorityTopic = useSeparateRetryQueueForPriorityTopic; + } } diff --git a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java index 07e132ff815..0cf6229203e 100644 --- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java @@ -38,6 +38,8 @@ public void testBrokerConfigAttribute() { brokerConfig.setBrokerClusterName("DefaultCluster"); brokerConfig.setMsgTraceTopicName("RMQ_SYS_TRACE_TOPIC4"); brokerConfig.setAutoDeleteUnusedStats(true); + brokerConfig.setUseSeparateRetryQueueForPriorityTopic(true); + assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster"); assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876"); assertThat(brokerConfig.getMsgTraceTopicName()).isEqualTo("RMQ_SYS_TRACE_TOPIC4"); @@ -45,5 +47,12 @@ public void testBrokerConfigAttribute() { assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a"); assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false); assertThat(brokerConfig.isAutoDeleteUnusedStats()).isEqualTo(true); + assertThat(brokerConfig.isUseSeparateRetryQueueForPriorityTopic()).isEqualTo(true); + } + + @Test + public void testPriorityTopicRetryQueueConfigDefaultValue() { + BrokerConfig brokerConfig = new BrokerConfig(); + assertThat(brokerConfig.isUseSeparateRetryQueueForPriorityTopic()).isFalse(); } } \ No newline at end of file