diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 62e5e64eb42..a21bd7edaf5 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java @@ -17,8 +17,11 @@ package org.apache.rocketmq.proxy.processor; import io.netty.channel.local.LocalChannel; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.proxy.common.ContextVariable; @@ -29,6 +32,7 @@ import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -90,12 +94,33 @@ public void before() throws Throwable { @Test public void testStart() throws Exception { + AckResult ackResult = new AckResult(); + ackResult.setStatus(AckStatus.OK); + ackResult.setExtraInfo(messageReceiptHandle.getReceiptHandleStr()); + Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))) + .thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), + Mockito.any(ReceiptHandle.class), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.anyLong(), Mockito.nullable(String.class))) + .thenReturn(CompletableFuture.completedFuture(ackResult)); + receiptHandleProcessor.start(); receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, PROXY_CONTEXT.getChannel(), CONSUMER_GROUP, MSG_ID, messageReceiptHandle); - Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.verify(messagingProcessor, Mockito.timeout(10000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), Mockito.eq(CONSUMER_GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()), Mockito.eq(null)); } + @After + @Override + public void after() { + try { + receiptHandleProcessor.shutdown(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + super.after(); + } + } + } diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java index 7b09a6aa2fd..90536a71c3a 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java @@ -63,7 +63,7 @@ public static void beforeClass() { @Test public void testTruncateCQ() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); { @@ -122,7 +122,7 @@ public void testTruncateCQ() throws Exception { @Test public void testRecover() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); { @@ -162,7 +162,7 @@ public void testRecover() throws Exception { @Test public void testDLedgerAbnormallyRecover() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String topic = UUID.randomUUID().toString(); @@ -199,7 +199,7 @@ public void testDLedgerAbnormallyRecover() throws Exception { @Test public void testPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -242,7 +242,7 @@ public void testPutAndGetMessage() throws Exception { @Test public void testBatchPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -289,7 +289,7 @@ public void testBatchPutAndGetMessage() throws Exception { public void testAsyncPutAndGetMessage() throws Exception { Assume.assumeFalse(MixAll.isWindows()); String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -333,7 +333,7 @@ public void testAsyncPutAndGetMessage() throws Exception { @Test public void testAsyncBatchPutAndGetMessage() throws Exception { String base = createBaseDir(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0); DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog(); @@ -380,7 +380,7 @@ public void testAsyncBatchPutAndGetMessage() throws Exception { @Test public void testCommittedPos() throws Exception { - String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort()); String group = UUID.randomUUID().toString(); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); @@ -409,7 +409,7 @@ public void testCommittedPos() throws Exception { @Test public void testIPv6HostMsgCommittedPos() throws Exception { - String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort()); + String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort()); String group = UUID.randomUUID().toString(); DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0); diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java index 9de4e4820ed..315921d3dd0 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java @@ -43,7 +43,7 @@ public void multiDirsStorageTest() throws Exception { Assume.assumeFalse(MixAll.isWindows()); String base = createBaseDir(); String topic = UUID.randomUUID().toString(); - String peers = String.format("n0-localhost:%d", nextPort()); + String peers = "n0-localhost:0"; String group = UUID.randomUUID().toString(); String multiStorePath = base + "/multi/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER + diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java index c4d9f0727b9..c8370b9ddb6 100644 --- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java +++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java @@ -20,6 +20,8 @@ import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +43,12 @@ public class MessageStoreTestBase extends StoreTestBase { + protected static int nextAvailablePort() throws IOException { + try (ServerSocket serverSocket = new ServerSocket(0)) { + return serverSocket.getLocalPort(); + } + } + protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception { System.setProperty("dledger.disk.ratio.check", "0.95"); System.setProperty("dledger.disk.ratio.clean", "0.95"); diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java index 519af441591..2d759486226 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java @@ -100,8 +100,8 @@ public void init(int mappedFileSize) throws Exception { storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache"); storeConfig1.setTotalReplicas(3); storeConfig1.setInSyncReplicas(2); + storeConfig1.setHaListenPort(0); buildMessageStoreConfig(storeConfig1, mappedFileSize); - this.store1HaAddress = "127.0.0.1:10912"; storeConfig2 = new MessageStoreConfig(); storeConfig2.setBrokerRole(BrokerRole.SLAVE); @@ -109,11 +109,10 @@ public void init(int mappedFileSize) throws Exception { storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2"); storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog"); storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache"); - storeConfig2.setHaListenPort(10943); + storeConfig2.setHaListenPort(0); storeConfig2.setTotalReplicas(3); storeConfig2.setInSyncReplicas(2); buildMessageStoreConfig(storeConfig2, mappedFileSize); - this.store2HaAddress = "127.0.0.1:10943"; messageStore1 = buildMessageStore(storeConfig1, 1L); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -124,7 +123,7 @@ public void init(int mappedFileSize) throws Exception { storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#3"); storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "commitlog"); storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "EpochFileCache"); - storeConfig3.setHaListenPort(10980); + storeConfig3.setHaListenPort(0); storeConfig3.setTotalReplicas(3); storeConfig3.setInSyncReplicas(2); buildMessageStoreConfig(storeConfig3, mappedFileSize); @@ -136,6 +135,8 @@ public void init(int mappedFileSize) throws Exception { messageStore1.start(); messageStore2.start(); messageStore3.start(); + this.store1HaAddress = haAddress(storeConfig1); + this.store2HaAddress = haAddress(storeConfig2); // ((AutoSwitchHAService) this.messageStore1.getHaService()).("127.0.0.1:8000"); // ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); @@ -154,18 +155,17 @@ public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Except storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "commitlog"); storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache"); storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet); + storeConfig1.setHaListenPort(0); buildMessageStoreConfig(storeConfig1, mappedFileSize); - this.store1HaAddress = "127.0.0.1:10912"; storeConfig2 = new MessageStoreConfig(); storeConfig2.setBrokerRole(BrokerRole.SLAVE); storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2"); storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog"); storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache"); - storeConfig2.setHaListenPort(10943); + storeConfig2.setHaListenPort(0); storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet); buildMessageStoreConfig(storeConfig2, mappedFileSize); - this.store2HaAddress = "127.0.0.1:10943"; messageStore1 = buildMessageStore(storeConfig1, 1L); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -174,6 +174,8 @@ public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Except assertTrue(messageStore2.load()); messageStore1.start(); messageStore2.start(); + this.store1HaAddress = haAddress(storeConfig1); + this.store2HaAddress = haAddress(storeConfig2); // ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000"); // ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001"); @@ -190,6 +192,8 @@ private boolean changeMasterAndPutMessage(DefaultMessageStore master, MessageSto flag &= slave.getHaService().changeToSlave("", epoch, slaveId); slave.getHaService().updateHaMasterAddress(masterHaAddress); flag &= master.getHaService().changeToMaster(epoch); + AutoSwitchHAService masterHaService = (AutoSwitchHAService) master.getHaService(); + await().atMost(10, TimeUnit.SECONDS).until(() -> masterHaService.getConnectionCount().get() > 0); // Put message on master for (int i = 0; i < totalPutMessageNums; i++) { PutMessageResult result = master.putMessage(buildMessage()); @@ -219,7 +223,7 @@ public void testConfirmOffset() throws Exception { final long confirmOffset = this.messageStore1.getConfirmOffset(); // Step2, shutdown store2 - this.messageStore2.shutdown(); + this.messageStore2 = shutdownStore(this.messageStore2); // Put message, which should succeed because slave is removed from syncStateSet, only master remains final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage()); @@ -229,7 +233,7 @@ public void testConfirmOffset() throws Exception { assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset); // Step3, shutdown store1, start store2, change store2 to master, epoch = 2 - this.messageStore1.shutdown(); + this.messageStore1 = shutdownStore(this.messageStore1); storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER); messageStore2 = buildMessageStore(storeConfig2, 2L); @@ -293,8 +297,7 @@ public void testOptionAllAckInSyncStateSet() throws Exception { assertTrue(result.contains(2L)); // Now, shutdown store2 - this.messageStore2.shutdown(); - this.messageStore2.destroy(); + this.messageStore2 = destroyStore(this.messageStore2); // Wait for connection to be removed and syncStateSet to be updated by removeConnection await().atMost(10, TimeUnit.SECONDS).until(() -> { @@ -447,8 +450,7 @@ public void testAddBrokerAndSyncFromLastFile() throws Exception { checkMessage(this.messageStore2, 20, 0); // Step2: restart broker3 - messageStore3.shutdown(); - messageStore3.destroy(); + messageStore3 = destroyStore(messageStore3); storeConfig3.setSyncFromLastFile(true); messageStore3 = buildMessageStore(storeConfig3, 3L); @@ -457,7 +459,7 @@ public void testAddBrokerAndSyncFromLastFile() throws Exception { // Step2: add new broker3, link to broker1. because broker3 request sync from lastFile, so it only synced 10 msg from offset 10; messageStore3.getHaService().changeToSlave("", 2, 3L); - messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912"); + messageStore3.getHaService().updateHaMasterAddress(store1HaAddress); checkMessage(messageStore3, 10, 10); } @@ -498,7 +500,7 @@ public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception { long tmpConfirmOffset = this.messageStore2.getConfirmOffset(); long setConfirmOffset = this.messageStore2.getConfirmOffset() - this.messageStore2.getConfirmOffset() / 2; - messageStore2.shutdown(); + messageStore2 = shutdownStore(messageStore2); StoreCheckpoint storeCheckpoint = new StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + "checkpoint"); assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset()); storeCheckpoint.setConfirmPhyOffset(setConfirmOffset); @@ -514,22 +516,28 @@ public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception { @After public void destroy() throws Exception { - if (this.messageStore2 != null) { - messageStore2.shutdown(); - messageStore2.destroy(); - } - if (this.messageStore1 != null) { - messageStore1.shutdown(); - messageStore1.destroy(); - } - if (this.messageStore3 != null) { - messageStore3.shutdown(); - messageStore3.destroy(); - } + this.messageStore2 = destroyStore(this.messageStore2); + this.messageStore1 = destroyStore(this.messageStore1); + this.messageStore3 = destroyStore(this.messageStore3); File file = new File(storePathRootParentDir); UtilAll.deleteFile(file); } + private DefaultMessageStore shutdownStore(DefaultMessageStore messageStore) { + if (messageStore != null) { + messageStore.shutdown(); + } + return null; + } + + private DefaultMessageStore destroyStore(DefaultMessageStore messageStore) { + if (messageStore != null) { + messageStore.shutdown(); + messageStore.destroy(); + } + return null; + } + private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreConfig, long brokerId) throws Exception { BrokerConfig brokerConfig = new BrokerConfig(); @@ -538,6 +546,10 @@ private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreCon return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>()); } + private String haAddress(MessageStoreConfig messageStoreConfig) { + return "127.0.0.1:" + messageStoreConfig.getHaListenPort(); + } + private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) { messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize); messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);