Skip to content

Commit 4441a87

Browse files
refactor(proxy): proxy delay message and transaction message to the node assigned to its queue (#815)
Signed-off-by: SSpirits <[email protected]>
1 parent 7736c5d commit 4441a87

File tree

10 files changed

+123
-36
lines changed

10 files changed

+123
-36
lines changed

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@ private <T> String getResponseStatus(T response) {
8080
@Override
8181
protected <V, T> void writeResponse(ProxyContext context, V request, T response, StreamObserver<T> responseObserver,
8282
Throwable t, Function<Status, T> errorResponseCreator) {
83+
ProxyContextExt contextExt = (ProxyContextExt) context;
8384
if (t != null) {
8485
Optional<Status> status = ExceptionHandler.convertToGrpcStatus(t);
8586
if (status.isPresent()) {
8687
ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(),
87-
status.get().getCode().name().toLowerCase(), ((ProxyContextExt) context).getElapsedTimeNanos());
88+
status.get().getCode().name().toLowerCase(), contextExt.getElapsedTimeNanos(), contextExt.suspended(), contextExt.relayed());
8889
ResponseWriter.getInstance().write(
8990
responseObserver,
9091
errorResponseCreator.apply(status.get())
@@ -94,7 +95,7 @@ protected <V, T> void writeResponse(ProxyContext context, V request, T response,
9495
}
9596

9697
ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(),
97-
getResponseStatus(response), ((ProxyContextExt) context).getElapsedTimeNanos());
98+
getResponseStatus(response), contextExt.getElapsedTimeNanos(), contextExt.suspended(), contextExt.relayed());
9899

99100
super.writeResponse(context, request, response, responseObserver, t, errorResponseCreator);
100101
}

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public ExtendReceiveMessageResponseStreamWriter(ProxyContextExt ctx, MessagingPr
5555

5656
private void recordRpcLatency(ProxyContext ctx, Code code) {
5757
ProxyContextExt context = (ProxyContextExt) ctx;
58-
ProxyMetricsManager.recordRpcLatency(ctx.getProtocolType(), ctx.getAction(), code.name().toLowerCase(), context.getElapsedTimeNanos(), context.suspended());
58+
ProxyMetricsManager.recordRpcLatency(ctx.getProtocolType(), ctx.getAction(), code.name().toLowerCase(), context.getElapsedTimeNanos(), context.suspended(), context.relayed());
5959
}
6060

6161
@Override

proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class ProxyMetricsManager implements MetricsManager {
7070
public static final String LABEL_ACTION = "action";
7171
public static final String LABEL_RESULT = "result";
7272
public static final String LABEL_SUSPENDED = "suspended";
73+
public static final String LABEL_RELAYED = "relayed";
7374
public static final String PROTOCOL_TYPE_GRPC = "grpc";
7475

7576
private static LongHistogram rpcLatency = new NopLongHistogram();
@@ -271,17 +272,14 @@ public static List<Pair<InstrumentSelector, View>> getMetricsView() {
271272
return metricsViewList;
272273
}
273274

274-
public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos) {
275-
recordRpcLatency(protocolType, action, result, costTimeNanos, false);
276-
}
277-
278275
public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos,
279-
boolean suspended) {
276+
boolean suspended, boolean relayed) {
280277
AttributesBuilder attributesBuilder = newAttributesBuilder()
281278
.put(LABEL_PROTOCOL_TYPE, protocolType)
282279
.put(LABEL_ACTION, action)
283280
.put(LABEL_RESULT, result)
284-
.put(LABEL_SUSPENDED, suspended);
281+
.put(LABEL_SUSPENDED, suspended)
282+
.put(LABEL_RELAYED, relayed);
285283
rpcLatency.record(costTimeNanos, attributesBuilder.build());
286284
}
287285

proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class ProxyContextExt extends ProxyContext implements TraceContext {
3232
private final Stopwatch stopwatch = Stopwatch.createStarted();
3333
public static final long DEFAULT_TIMEOUT_MILLIS = 3000;
3434
private boolean suspended;
35+
private boolean relayed;
3536

3637
private final Tracer tracer;
3738

@@ -60,6 +61,14 @@ public void setSuspended(boolean suspended) {
6061
this.suspended = suspended;
6162
}
6263

64+
public boolean relayed() {
65+
return relayed;
66+
}
67+
68+
public void setRelayed(boolean relayed) {
69+
this.relayed = relayed;
70+
}
71+
6372
public long getElapsedTimeNanos() {
6473
return stopwatch.elapsed().toNanos();
6574
}

proxy/src/main/java/com/automq/rocketmq/proxy/remoting/activity/CommonRemotingBehavior.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ public interface CommonRemotingBehavior {
3636
String BROKER_NAME_FIELD_FOR_SEND_MESSAGE_V2 = "n";
3737

3838
default void recordRpcLatency(ProxyContext context, RemotingCommand response) {
39+
ProxyContextExt contextExt = (ProxyContextExt) context;
3940
ProxyMetricsManager.recordRpcLatency(context.getProtocolType(), context.getAction(),
40-
RemotingHelper.getResponseCodeDesc(response.getCode()), ((ProxyContextExt) context).getElapsedTimeNanos());
41+
RemotingHelper.getResponseCodeDesc(response.getCode()), contextExt.getElapsedTimeNanos(), contextExt.suspended(), contextExt.relayed());
4142
}
4243

4344
default ProxyContext createExtendContext(ProxyContext context) {

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,17 @@ public MessageServiceImpl(BrokerConfig config, MessageStore store, ProxyMetadata
145145
this.suspendRequestService = SuspendRequestService.getInstance();
146146
this.producerManager = producerManager;
147147
this.relayClient = relayClient;
148+
149+
store.registerTimerMessageHandler(timerTag -> executorService.execute(() -> {
150+
try {
151+
ByteBuffer payload = timerTag.payloadAsByteBuffer();
152+
FlatMessage message = FlatMessage.getRootAsFlatMessage(payload);
153+
putMessage(message).join();
154+
} catch (Throwable t) {
155+
LOGGER.error("Error while check transaction status", t);
156+
}
157+
}));
158+
148159
store.registerTransactionCheckHandler(timerTag -> executorService.execute(() -> {
149160
try {
150161
checkTransactionStatus(timerTag);
@@ -171,6 +182,40 @@ public TopicMessageType getMessageType(SendMessageRequestHeader requestHeader) {
171182
return topicMessageType;
172183
}
173184

185+
private CompletableFuture<PutResult> putMessage(FlatMessage message) {
186+
return putMessage(null, message);
187+
}
188+
189+
private CompletableFuture<PutResult> putMessage(ProxyContext ctx, FlatMessage message) {
190+
return topicOf(message.topicId())
191+
.thenCompose(topic -> {
192+
Optional<MessageQueueAssignment> assignment = topic.getAssignmentsList().stream().filter(item -> item.getQueue().getQueueId() == message.queueId()).findFirst();
193+
if (assignment.isEmpty()) {
194+
LOGGER.error("Message: {} is dropped because the topic: {} doesn't have queue: {}",
195+
message.systemProperties().messageId(), topic.getName(), message.queueId());
196+
return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Queue " + message.queueId() + " is not assigned to any node."));
197+
}
198+
return putMessage(ctx, topic, assignment.get(), message);
199+
});
200+
}
201+
202+
private CompletableFuture<PutResult> putMessage(ProxyContext ctx, Topic topic, MessageQueueAssignment assignment,
203+
FlatMessage message) {
204+
if (assignment.getNodeId() != brokerConfig.nodeId()) {
205+
if (ctx instanceof ProxyContextExt contextExt) {
206+
contextExt.setRelayed(true);
207+
}
208+
return metadataService.addressOf(assignment.getNodeId())
209+
.thenCompose(address -> relayClient.relayMessage(address, message))
210+
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
211+
}
212+
StoreContext storeContext = StoreContext.EMPTY;
213+
if (ctx != null) {
214+
storeContext = ContextUtil.buildStoreContext(ctx, topic.getName(), "");
215+
}
216+
return store.put(storeContext, message);
217+
}
218+
174219
@Override
175220
@WithSpan(kind = SpanKind.SERVER)
176221
public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
@@ -221,21 +266,10 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx,
221266
span.setAttribute("reconsumeTimes", requestHeader.getReconsumeTimes());
222267
span.setAttribute("deliveryTimestamp", flatMessage.systemProperties().deliveryTimestamp());
223268
});
224-
if (assignment.getNodeId() != brokerConfig.nodeId()) {
225-
return metadataService.addressOf(assignment.getNodeId())
226-
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
227-
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
228-
}
229-
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), groupName), flatMessage);
269+
return putMessage(ctx, topic, assignment, flatMessage);
230270
}
231271
}
232-
233-
if (assignment.getNodeId() != brokerConfig.nodeId()) {
234-
return metadataService.addressOf(assignment.getNodeId())
235-
.thenCompose(address -> relayClient.relayMessage(address, flatMessage))
236-
.thenApply(status -> new PutResult(PutResult.Status.PUT_OK, 0));
237-
}
238-
return store.put(ContextUtil.buildStoreContext(ctx, topic.getName(), ""), flatMessage);
272+
return putMessage(ctx, topic, assignment, flatMessage);
239273
});
240274

241275
return putFuture.thenApply(putResult -> {
@@ -363,7 +397,16 @@ public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String bro
363397
}
364398
}
365399

366-
return store.endTransaction(requestHeader.getTransactionId(), resolution);
400+
return store.endTransaction(requestHeader.getTransactionId(), resolution)
401+
.thenCompose(optional -> {
402+
if (optional.isEmpty()) {
403+
return CompletableFuture.completedFuture(null);
404+
}
405+
406+
return putMessage(ctx, optional.get())
407+
.thenAccept(ignore -> {
408+
});
409+
});
367410
}
368411

369412
private void checkTransactionStatus(TimerTag timerTag) {
@@ -939,6 +982,21 @@ private CompletableFuture<Topic> topicOf(String topicName) {
939982
});
940983
}
941984

985+
private CompletableFuture<Topic> topicOf(long topicId) {
986+
CompletableFuture<Topic> topicFuture = metadataService.topicOf(topicId);
987+
988+
return topicFuture.exceptionally(throwable -> {
989+
Throwable t = ExceptionUtils.getRealException(throwable);
990+
if (t instanceof ControllerException controllerException) {
991+
if (controllerException.getErrorCode() == Code.NOT_FOUND.ordinal()) {
992+
throw new ProxyException(apache.rocketmq.v2.Code.TOPIC_NOT_FOUND, "Topic resource does not exist.");
993+
}
994+
}
995+
// Rethrow other exceptions.
996+
throw new CompletionException(t);
997+
});
998+
}
999+
9421000
private CompletableFuture<ConsumerGroup> consumerGroupOf(String groupName) {
9431001
CompletableFuture<ConsumerGroup> groupFuture = metadataService.consumerGroupOf(groupName);
9441002

proxy/src/test/java/com/automq/rocketmq/proxy/grpc/GrpcServerRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public MutableHandlerRegistry getServiceRegistry() {
6262
}
6363

6464
@Override
65-
public void afterEach(ExtensionContext context) throws Exception {
65+
public void afterEach(ExtensionContext context) {
6666
this.serverName = null;
6767
this.serviceRegistry = null;
6868
this.channel.shutdown();

proxy/src/test/java/com/automq/rocketmq/proxy/mock/MockMessageStore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.HashSet;
4242
import java.util.List;
4343
import java.util.Map;
44+
import java.util.Optional;
4445
import java.util.Set;
4546
import java.util.concurrent.CompletableFuture;
4647
import java.util.concurrent.atomic.AtomicLong;
@@ -200,7 +201,8 @@ public CompletableFuture<Boolean> cancelDelayMessage(String messageId) {
200201
}
201202

202203
@Override
203-
public CompletableFuture<Void> endTransaction(String transactionId, TransactionResolution resolution) {
204+
public CompletableFuture<Optional<FlatMessage>> endTransaction(String transactionId,
205+
TransactionResolution resolution) {
204206
return CompletableFuture.completedFuture(null);
205207
}
206208

@@ -212,6 +214,10 @@ public void scheduleCheckTransaction(FlatMessage message) throws StoreException
212214
public void registerMessageArriveListener(MessageArrivalListener listener) {
213215
}
214216

217+
@Override
218+
public void registerTimerMessageHandler(Consumer<TimerTag> handler) throws StoreException {
219+
}
220+
215221
@Override
216222
public void registerTransactionCheckHandler(Consumer<TimerTag> handler) throws StoreException {
217223
}

store/src/main/java/com/automq/rocketmq/store/MessageStoreImpl.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,6 @@ public void start() throws Exception {
121121
}
122122
clearStateMachineData();
123123
streamStore.start();
124-
timerService.registerHandler(TimerHandlerType.TIMER_MESSAGE, timerTag -> put(StoreContext.EMPTY, FlatMessage.getRootAsFlatMessage(timerTag.payloadAsByteBuffer())));
125124
timerService.start();
126125
snapshotService.start();
127126
logicQueueManager.start();
@@ -316,18 +315,20 @@ public CompletableFuture<Boolean> cancelDelayMessage(String messageId) {
316315
}
317316

318317
@Override
319-
public CompletableFuture<Void> endTransaction(String transactionId, TransactionResolution resolution) {
318+
public CompletableFuture<Optional<FlatMessage>> endTransaction(String transactionId,
319+
TransactionResolution resolution) {
320320
try {
321321
if (resolution == TransactionResolution.COMMIT) {
322-
transactionService.commit(transactionId)
323-
.ifPresent(message -> {
324-
message.systemProperties().mutatePreparedTransactionMark(false);
325-
put(StoreContext.EMPTY, message);
326-
});
322+
Optional<FlatMessage> optional = transactionService.commit(transactionId);
323+
if (optional.isEmpty()) {
324+
return CompletableFuture.completedFuture(Optional.empty());
325+
}
326+
optional.get().systemProperties().mutatePreparedTransactionMark(false);
327+
return CompletableFuture.completedFuture(optional);
327328
} else {
328329
transactionService.rollback(transactionId);
329330
}
330-
return CompletableFuture.completedFuture(null);
331+
return CompletableFuture.completedFuture(Optional.empty());
331332
} catch (Exception e) {
332333
return CompletableFuture.failedFuture(e);
333334
}
@@ -343,6 +344,11 @@ public void registerMessageArriveListener(MessageArrivalListener listener) {
343344
messageArrivalNotificationService.registerMessageArriveListener(listener);
344345
}
345346

347+
@Override
348+
public void registerTimerMessageHandler(Consumer<TimerTag> handler) throws StoreException {
349+
timerService.registerHandler(TimerHandlerType.TIMER_MESSAGE, handler);
350+
}
351+
346352
@Override
347353
public void registerTransactionCheckHandler(Consumer<TimerTag> handler) throws StoreException {
348354
timerService.registerHandler(TimerHandlerType.TRANSACTION_MESSAGE, handler);

store/src/main/java/com/automq/rocketmq/store/api/MessageStore.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@
3030
import com.automq.rocketmq.store.model.message.PullResult;
3131
import com.automq.rocketmq.store.model.message.PutResult;
3232
import com.automq.rocketmq.store.model.message.ResetConsumeOffsetResult;
33-
import java.util.List;
3433
import com.automq.rocketmq.store.model.transaction.TransactionResolution;
34+
import java.util.List;
35+
import java.util.Optional;
3536
import java.util.concurrent.CompletableFuture;
3637
import java.util.function.Consumer;
3738

@@ -174,7 +175,7 @@ CompletableFuture<ResetConsumeOffsetResult> resetConsumeOffset(long consumerGrou
174175
* @param resolution transaction resolution
175176
* @return end transaction result
176177
*/
177-
CompletableFuture<Void> endTransaction(String transactionId, TransactionResolution resolution);
178+
CompletableFuture<Optional<FlatMessage>> endTransaction(String transactionId, TransactionResolution resolution);
178179

179180
/**
180181
* Schedule a check for transaction status.
@@ -190,6 +191,13 @@ CompletableFuture<ResetConsumeOffsetResult> resetConsumeOffset(long consumerGrou
190191
*/
191192
void registerMessageArriveListener(MessageArrivalListener listener);
192193

194+
/**
195+
* Register a hanler for denqueuing timer messages.
196+
*
197+
* @param handler timer message handler
198+
*/
199+
void registerTimerMessageHandler(Consumer<TimerTag> handler) throws StoreException;
200+
193201
/**
194202
* Register a hanler for checking transaction status.
195203
*

0 commit comments

Comments
 (0)