Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.apache.rocketmq.proxy.processor;

import io.netty.channel.local.LocalChannel;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.proxy.common.ContextVariable;
Expand All @@ -29,6 +32,7 @@
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -90,12 +94,33 @@ public void before() throws Throwable {

@Test
public void testStart() throws Exception {
AckResult ackResult = new AckResult();
ackResult.setStatus(AckStatus.OK);
ackResult.setExtraInfo(messageReceiptHandle.getReceiptHandleStr());
Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel())))
.thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class),
Mockito.any(ReceiptHandle.class), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong(), Mockito.nullable(String.class)))
.thenReturn(CompletableFuture.completedFuture(ackResult));

receiptHandleProcessor.start();
receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, PROXY_CONTEXT.getChannel(), CONSUMER_GROUP, MSG_ID, messageReceiptHandle);
Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))).thenReturn(Mockito.mock(ClientChannelInfo.class));
Mockito.verify(messagingProcessor, Mockito.timeout(10000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
Mockito.eq(CONSUMER_GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()), Mockito.eq(null));
}

@After
@Override
public void after() {
try {
receiptHandleProcessor.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
super.after();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void beforeClass() {
@Test
public void testTruncateCQ() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testTruncateCQ() throws Exception {
@Test
public void testRecover() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();
{
Expand Down Expand Up @@ -162,7 +162,7 @@ public void testRecover() throws Exception {
@Test
public void testDLedgerAbnormallyRecover() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
String topic = UUID.randomUUID().toString();

Expand Down Expand Up @@ -199,7 +199,7 @@ public void testDLedgerAbnormallyRecover() throws Exception {
@Test
public void testPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
Expand Down Expand Up @@ -242,7 +242,7 @@ public void testPutAndGetMessage() throws Exception {
@Test
public void testBatchPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testBatchPutAndGetMessage() throws Exception {
public void testAsyncPutAndGetMessage() throws Exception {
Assume.assumeFalse(MixAll.isWindows());
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
Expand Down Expand Up @@ -333,7 +333,7 @@ public void testAsyncPutAndGetMessage() throws Exception {
@Test
public void testAsyncBatchPutAndGetMessage() throws Exception {
String base = createBaseDir();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
DefaultMessageStore messageStore = createDledgerMessageStore(base, group, "n0", peers, null, false, 0);
DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
Expand Down Expand Up @@ -380,7 +380,7 @@ public void testAsyncBatchPutAndGetMessage() throws Exception {

@Test
public void testCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);

Expand Down Expand Up @@ -409,7 +409,7 @@ public void testCommittedPos() throws Exception {

@Test
public void testIPv6HostMsgCommittedPos() throws Exception {
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextAvailablePort(), nextAvailablePort());
String group = UUID.randomUUID().toString();
DefaultMessageStore leaderStore = createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void multiDirsStorageTest() throws Exception {
Assume.assumeFalse(MixAll.isWindows());
String base = createBaseDir();
String topic = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d", nextPort());
String peers = "n0-localhost:0";
String group = UUID.randomUUID().toString();
String multiStorePath =
base + "/multi/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerServer;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +43,12 @@

public class MessageStoreTestBase extends StoreTestBase {

protected static int nextAvailablePort() throws IOException {
try (ServerSocket serverSocket = new ServerSocket(0)) {
return serverSocket.getLocalPort();
}
}

protected DefaultMessageStore createDledgerMessageStore(String base, String group, String selfId, String peers, String leaderId, boolean createAbort, int deleteFileNum) throws Exception {
System.setProperty("dledger.disk.ratio.check", "0.95");
System.setProperty("dledger.disk.ratio.clean", "0.95");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,19 @@ public void init(int mappedFileSize) throws Exception {
storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache");
storeConfig1.setTotalReplicas(3);
storeConfig1.setInSyncReplicas(2);
storeConfig1.setHaListenPort(0);
buildMessageStoreConfig(storeConfig1, mappedFileSize);
this.store1HaAddress = "127.0.0.1:10912";

storeConfig2 = new MessageStoreConfig();
storeConfig2.setBrokerRole(BrokerRole.SLAVE);
storeConfig2.setHaSendHeartbeatInterval(1000);
storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2");
storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog");
storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache");
storeConfig2.setHaListenPort(10943);
storeConfig2.setHaListenPort(0);
storeConfig2.setTotalReplicas(3);
storeConfig2.setInSyncReplicas(2);
buildMessageStoreConfig(storeConfig2, mappedFileSize);
this.store2HaAddress = "127.0.0.1:10943";

messageStore1 = buildMessageStore(storeConfig1, 1L);
messageStore2 = buildMessageStore(storeConfig2, 2L);
Expand All @@ -124,7 +123,7 @@ public void init(int mappedFileSize) throws Exception {
storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#3");
storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "commitlog");
storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#3" + File.separator + "EpochFileCache");
storeConfig3.setHaListenPort(10980);
storeConfig3.setHaListenPort(0);
storeConfig3.setTotalReplicas(3);
storeConfig3.setInSyncReplicas(2);
buildMessageStoreConfig(storeConfig3, mappedFileSize);
Expand All @@ -136,6 +135,8 @@ public void init(int mappedFileSize) throws Exception {
messageStore1.start();
messageStore2.start();
messageStore3.start();
this.store1HaAddress = haAddress(storeConfig1);
this.store2HaAddress = haAddress(storeConfig2);

// ((AutoSwitchHAService) this.messageStore1.getHaService()).("127.0.0.1:8000");
// ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
Expand All @@ -154,18 +155,17 @@ public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Except
storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "commitlog");
storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#1" + File.separator + "EpochFileCache");
storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet);
storeConfig1.setHaListenPort(0);
buildMessageStoreConfig(storeConfig1, mappedFileSize);
this.store1HaAddress = "127.0.0.1:10912";

storeConfig2 = new MessageStoreConfig();
storeConfig2.setBrokerRole(BrokerRole.SLAVE);
storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + brokerName + "#2");
storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "commitlog");
storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + brokerName + "#2" + File.separator + "EpochFileCache");
storeConfig2.setHaListenPort(10943);
storeConfig2.setHaListenPort(0);
storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet);
buildMessageStoreConfig(storeConfig2, mappedFileSize);
this.store2HaAddress = "127.0.0.1:10943";

messageStore1 = buildMessageStore(storeConfig1, 1L);
messageStore2 = buildMessageStore(storeConfig2, 2L);
Expand All @@ -174,6 +174,8 @@ public void init(int mappedFileSize, boolean allAckInSyncStateSet) throws Except
assertTrue(messageStore2.load());
messageStore1.start();
messageStore2.start();
this.store1HaAddress = haAddress(storeConfig1);
this.store2HaAddress = haAddress(storeConfig2);

// ((AutoSwitchHAService) this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
// ((AutoSwitchHAService) this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
Expand All @@ -190,6 +192,8 @@ private boolean changeMasterAndPutMessage(DefaultMessageStore master, MessageSto
flag &= slave.getHaService().changeToSlave("", epoch, slaveId);
slave.getHaService().updateHaMasterAddress(masterHaAddress);
flag &= master.getHaService().changeToMaster(epoch);
AutoSwitchHAService masterHaService = (AutoSwitchHAService) master.getHaService();
await().atMost(10, TimeUnit.SECONDS).until(() -> masterHaService.getConnectionCount().get() > 0);
// Put message on master
for (int i = 0; i < totalPutMessageNums; i++) {
PutMessageResult result = master.putMessage(buildMessage());
Expand Down Expand Up @@ -219,7 +223,7 @@ public void testConfirmOffset() throws Exception {
final long confirmOffset = this.messageStore1.getConfirmOffset();

// Step2, shutdown store2
this.messageStore2.shutdown();
this.messageStore2 = shutdownStore(this.messageStore2);

// Put message, which should succeed because slave is removed from syncStateSet, only master remains
final PutMessageResult putMessageResult = this.messageStore1.putMessage(buildMessage());
Expand All @@ -229,7 +233,7 @@ public void testConfirmOffset() throws Exception {
assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset);

// Step3, shutdown store1, start store2, change store2 to master, epoch = 2
this.messageStore1.shutdown();
this.messageStore1 = shutdownStore(this.messageStore1);

storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
messageStore2 = buildMessageStore(storeConfig2, 2L);
Expand Down Expand Up @@ -293,8 +297,7 @@ public void testOptionAllAckInSyncStateSet() throws Exception {
assertTrue(result.contains(2L));

// Now, shutdown store2
this.messageStore2.shutdown();
this.messageStore2.destroy();
this.messageStore2 = destroyStore(this.messageStore2);

// Wait for connection to be removed and syncStateSet to be updated by removeConnection
await().atMost(10, TimeUnit.SECONDS).until(() -> {
Expand Down Expand Up @@ -447,8 +450,7 @@ public void testAddBrokerAndSyncFromLastFile() throws Exception {
checkMessage(this.messageStore2, 20, 0);

// Step2: restart broker3
messageStore3.shutdown();
messageStore3.destroy();
messageStore3 = destroyStore(messageStore3);

storeConfig3.setSyncFromLastFile(true);
messageStore3 = buildMessageStore(storeConfig3, 3L);
Expand All @@ -457,7 +459,7 @@ public void testAddBrokerAndSyncFromLastFile() throws Exception {

// Step2: add new broker3, link to broker1. because broker3 request sync from lastFile, so it only synced 10 msg from offset 10;
messageStore3.getHaService().changeToSlave("", 2, 3L);
messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912");
messageStore3.getHaService().updateHaMasterAddress(store1HaAddress);

checkMessage(messageStore3, 10, 10);
}
Expand Down Expand Up @@ -498,7 +500,7 @@ public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception {

long tmpConfirmOffset = this.messageStore2.getConfirmOffset();
long setConfirmOffset = this.messageStore2.getConfirmOffset() - this.messageStore2.getConfirmOffset() / 2;
messageStore2.shutdown();
messageStore2 = shutdownStore(messageStore2);
StoreCheckpoint storeCheckpoint = new StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + "checkpoint");
assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset());
storeCheckpoint.setConfirmPhyOffset(setConfirmOffset);
Expand All @@ -514,22 +516,28 @@ public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception {

@After
public void destroy() throws Exception {
if (this.messageStore2 != null) {
messageStore2.shutdown();
messageStore2.destroy();
}
if (this.messageStore1 != null) {
messageStore1.shutdown();
messageStore1.destroy();
}
if (this.messageStore3 != null) {
messageStore3.shutdown();
messageStore3.destroy();
}
this.messageStore2 = destroyStore(this.messageStore2);
this.messageStore1 = destroyStore(this.messageStore1);
this.messageStore3 = destroyStore(this.messageStore3);
File file = new File(storePathRootParentDir);
UtilAll.deleteFile(file);
}

private DefaultMessageStore shutdownStore(DefaultMessageStore messageStore) {
if (messageStore != null) {
messageStore.shutdown();
}
return null;
}

private DefaultMessageStore destroyStore(DefaultMessageStore messageStore) {
if (messageStore != null) {
messageStore.shutdown();
messageStore.destroy();
}
return null;
}

private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreConfig,
long brokerId) throws Exception {
BrokerConfig brokerConfig = new BrokerConfig();
Expand All @@ -538,6 +546,10 @@ private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreCon
return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig, new ConcurrentHashMap<>());
}

private String haAddress(MessageStoreConfig messageStoreConfig) {
return "127.0.0.1:" + messageStoreConfig.getHaListenPort();
}

private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) {
messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
Expand Down
Loading