Skip to content

Commit

Permalink
[ISSUE openmessaging#176] Realize the ability to read from the replic…
Browse files Browse the repository at this point in the history
…a node,Optimize read performance
  • Loading branch information
mxsm committed Jul 30, 2022
1 parent a34df8d commit 8bd75a9
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
Expand Down Expand Up @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
this(dLedgerServer, null, null, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.dLedgerServer = dLedgerServer;
this.memberState = dLedgerServer.getMemberState();
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
Expand Down Expand Up @@ -109,6 +113,7 @@ public boolean rejectRequest() {
this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.PULL_READ_INDEX.getCode(), protocolProcessor, null);

//start the remoting client
if (nettyClientConfig == null) {
Expand Down Expand Up @@ -252,9 +257,29 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw
return future;
}

@Override
public CompletableFuture<PullReadIndexResponse> pullReadIndex(PullReadIndexRequest request) throws Exception {
CompletableFuture<PullReadIndexResponse> future = new CompletableFuture<>();
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL_READ_INDEX.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
PullReadIndexResponse response;
if (null != responseCommand) {
response = JSON.parseObject(responseCommand.getBody(), PullReadIndexResponse.class);
} else {
response = new PullReadIndexResponse();
response.copyBaseInfo(request);
response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
}
future.complete(response);
});
return future;
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
try {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
Expand Down Expand Up @@ -283,7 +308,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
}

private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
ChannelHandlerContext ctx) {
ChannelHandlerContext ctx) {
RemotingCommand response = null;
try {
if (t != null) {
Expand Down Expand Up @@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
case METADATA: {
MetadataRequest metadataRequest = JSON.parseObject(request.getBody(), MetadataRequest.class);
CompletableFuture<MetadataResponse> future = handleMetadata(metadataRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case APPEND: {
AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class);
CompletableFuture<AppendEntryResponse> future = handleAppend(appendEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case GET: {
GetEntriesRequest getEntriesRequest = JSON.parseObject(request.getBody(), GetEntriesRequest.class);
CompletableFuture<GetEntriesResponse> future = handleGet(getEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PULL: {
PullEntriesRequest pullEntriesRequest = JSON.parseObject(request.getBody(), PullEntriesRequest.class);
CompletableFuture<PullEntriesResponse> future = handlePull(pullEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PUSH: {
PushEntryRequest pushEntryRequest = JSON.parseObject(request.getBody(), PushEntryRequest.class);
CompletableFuture<PushEntryResponse> future = handlePush(pushEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case VOTE: {
VoteRequest voteRequest = JSON.parseObject(request.getBody(), VoteRequest.class);
CompletableFuture<VoteResponse> future = handleVote(voteRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case HEART_BEAT: {
HeartBeatRequest heartBeatRequest = JSON.parseObject(request.getBody(), HeartBeatRequest.class);
CompletableFuture<HeartBeatResponse> future = handleHeartBeat(heartBeatRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case LEADERSHIP_TRANSFER: {
Expand All @@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
request, x, DLedgerUtils.elapsed(start));
request, x, DLedgerUtils.elapsed(start));
}, futureExecutor);
break;
}
case PULL_READ_INDEX: {
PullReadIndexRequest pullReadIndexRequest = JSON.parseObject(request.getBody(), PullReadIndexRequest.class);
CompletableFuture<PullReadIndexResponse> future = handlePullReadIndex(pullReadIndexRequest);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
default:
logger.error("Unknown request code {} from {}", request.getCode(), request);
break;
Expand All @@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
}

Expand Down Expand Up @@ -432,6 +449,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
return dLedgerServer.handlePush(request);
}

@Override
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
return dLedgerServer.handlePullReadIndex(request);
}

public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
remotingCommand.setBody(JSON.toJSONBytes(response));
Expand Down
111 changes: 100 additions & 11 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
Expand All @@ -44,19 +46,17 @@
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand Down Expand Up @@ -211,7 +211,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
DLedgerEntry resEntry = null;
// split bodys to append
// split bodies to append
int index = 0;
Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
while (iterator.hasNext()) {
Expand All @@ -226,8 +226,8 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
batchAppendFuture.setPositions(positions);
return batchAppendFuture;
}
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODIES, "BatchAppendEntryRequest" +
" with empty bodies");
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
Expand All @@ -246,16 +246,58 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
}

@Override
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex());
PreConditions.check(!memberState.isCandidate(), DLedgerResponseCode.IS_CANDIDATE);
GetEntriesResponse response = new GetEntriesResponse();
response.setGroup(memberState.getGroup());
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
Long requestIndex = request.getBeginIndex();
if (memberState.isFollower()) {
//Get from follower
if (requestIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

// when requestIndex greater than ledgerEndIndex then send pull readIndex(ledgerEndIndex) request to leader
PullReadIndexRequest indexRequest = new PullReadIndexRequest();
indexRequest.setGroup(request.getGroup());
indexRequest.setRemoteId(memberState.getLeaderId());
CompletableFuture<PullReadIndexResponse> future = dLedgerRpcService.pullReadIndex(indexRequest);
PullReadIndexResponse pullReadIndexResponse = future.get();
if (pullReadIndexResponse.getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(pullReadIndexResponse.getCode());
return CompletableFuture.completedFuture(response);
}

long readIndex = pullReadIndexResponse.getReadIndex();
if (requestIndex > readIndex) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(DLedgerResponseCode.INDEX_OUT_OF_RANGE.getCode());
return CompletableFuture.completedFuture(response);
}

if (readIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

//wait for follower ledgerEndIndex to update
if (!waitFollowerEndIndex2Update(2, TimeUnit.SECONDS, requestIndex)) {
logger.warn("update follower[{}] ledgerEndIndex time out", memberState.getSelfId());
response.setCode(DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode());
return CompletableFuture.completedFuture(response);
}
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
} else {
//get from leader
getEntry(response, requestIndex);
}
return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
Expand All @@ -268,6 +310,13 @@ public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request
}
}

private void getEntry(GetEntriesResponse response, Long requestIndex) {
DLedgerEntry entry = dLedgerStore.get(requestIndex);
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
}
}

@Override
public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
try {
Expand Down Expand Up @@ -311,6 +360,30 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)

}

@Override
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

PullReadIndexResponse response = new PullReadIndexResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getLeaderId());
response.setEndIndex(memberState.getLedgerEndIndex());
response.setReadIndex(memberState.getLedgerEndIndex());

return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
logger.error("[{}][HandlePullReadIndex] failed", memberState.getSelfId(), e);
PullReadIndexResponse response = new PullReadIndexResponse();
response.copyBaseInfo(request);
response.setCode(e.getCode().getCode());
response.setLeaderId(memberState.getLeaderId());
return CompletableFuture.completedFuture(response);
}
}

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
Expand Down Expand Up @@ -486,4 +559,20 @@ public NettyRemotingClient getRemotingClient() {
return null;
}

private boolean waitFollowerEndIndex2Update(long maxWaitTime, TimeUnit unit, long requestIndex) {
long maxWaitMs = unit.toMillis(maxWaitTime);
long start = System.currentTimeMillis();
while (DLedgerUtils.elapsed(start) < maxWaitMs) {
try {
if (requestIndex <= memberState.getLedgerEndIndex()) {
return true;
}
DLedgerUtils.sleep(1);
} catch (Exception e) {
logger.warn("Wait [{}]Follower update endIndex error",memberState.getSelfId(),e);
break;
}
}
return false;
}
}
Loading

0 comments on commit 8bd75a9

Please sign in to comment.