Skip to content

Commit 2f26707

Browse files
committed
feat(groupStatus): Add "groupStatus" command to the CLI tools #764
Signed-off-by: zhouyou.ltx <[email protected]>
1 parent 4441a87 commit 2f26707

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

proto/src/main/proto/proxy/proxy.proto

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ message ConsumerClientConnection {
136136

137137
message ConsumerClientConnectionReply {
138138
Status status = 1;
139-
// Producer client connection
139+
// consumer client connection
140140
repeated ConsumerClientConnection connection = 2;
141141
}
142142

@@ -165,11 +165,44 @@ message RelayReply {
165165
Status status = 1;
166166
}
167167

168+
169+
170+
message ConsumerConnectionRequest {
171+
ProxyRequestContext context = 1;
172+
// Consumer group name
173+
string group = 2;
174+
}
175+
176+
177+
message ConsumerSubInfo {
178+
// topic
179+
string topic = 1;
180+
// subExpression
181+
string sub_expression = 2;
182+
}
183+
184+
message ConsumerGroupCliInfo {
185+
string consume_type = 1;
186+
string message_model = 2;
187+
string consume_from_where = 3;
188+
// consumer client connection
189+
repeated ConsumerClientConnection connection = 4;
190+
// consumer subscription info
191+
repeated ConsumerSubInfo consumer_sub_info = 5;
192+
}
193+
message ConsumerConnectionReply {
194+
Status status = 1;
195+
// consumer group cli info
196+
ConsumerGroupCliInfo consumer_group_cli_info = 2;
197+
}
198+
199+
168200
service ProxyService {
169201
rpc resetConsumeOffset(ResetConsumeOffsetRequest) returns (ResetConsumeOffsetReply) {}
170202
rpc resetConsumeOffsetByTimestamp(ResetConsumeOffsetByTimestampRequest) returns (ResetConsumeOffsetReply) {}
171203
rpc topicStats(TopicStatsRequest) returns (TopicStatsReply) {}
172204
rpc producerClientConnection(ProducerClientConnectionRequest) returns (ProducerClientConnectionReply) {}
173205
rpc consumerClientConnection(ConsumerClientConnectionRequest) returns (ConsumerClientConnectionReply) {}
174206
rpc relay(RelayRequest) returns (RelayReply) {}
207+
rpc consumerConnection(ConsumerConnectionRequest) returns (ConsumerConnectionReply) {}
175208
}

proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImpl.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import apache.rocketmq.proxy.v1.ConsumerClientConnection;
2222
import apache.rocketmq.proxy.v1.ConsumerClientConnectionReply;
2323
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
24+
import apache.rocketmq.proxy.v1.ConsumerConnectionReply;
25+
import apache.rocketmq.proxy.v1.ConsumerConnectionRequest;
26+
import apache.rocketmq.proxy.v1.ConsumerGroupCliInfo;
27+
import apache.rocketmq.proxy.v1.ConsumerSubInfo;
2428
import apache.rocketmq.proxy.v1.ProducerClientConnection;
2529
import apache.rocketmq.proxy.v1.ProducerClientConnectionReply;
2630
import apache.rocketmq.proxy.v1.ProducerClientConnectionRequest;
@@ -52,6 +56,7 @@
5256
import org.apache.rocketmq.common.utils.NetworkUtil;
5357
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
5458
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
59+
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
5560
import org.slf4j.Logger;
5661

5762
public class ProxyServiceImpl extends ProxyServiceGrpc.ProxyServiceImplBase {
@@ -241,4 +246,56 @@ public void relay(RelayRequest request, StreamObserver<RelayReply> responseObser
241246
}
242247
}
243248
}
249+
250+
@Override
251+
public void consumerConnection(ConsumerConnectionRequest request,
252+
StreamObserver<ConsumerConnectionReply> responseObserver) {
253+
ConsumerGroupInfo groupInfo = consumerManager.getConsumerGroupInfo(request.getGroup(), true);
254+
if (groupInfo == null) {
255+
responseObserver.onNext(ConsumerConnectionReply.newBuilder()
256+
.setStatus(Status
257+
.newBuilder()
258+
.setCode(Code.BAD_REQUEST)
259+
.setMessage("Consumer group not found: " + request.getGroup())
260+
.build())
261+
.build());
262+
responseObserver.onCompleted();
263+
return;
264+
}
265+
266+
ConsumerGroupCliInfo.Builder consumerBuilder = ConsumerGroupCliInfo.newBuilder();
267+
268+
consumerBuilder
269+
.setConsumeType(groupInfo.getConsumeType().getTypeCN())
270+
.setMessageModel(groupInfo.getMessageModel().getModeCN())
271+
.setConsumeFromWhere(groupInfo.getConsumeFromWhere().name());
272+
273+
for (ClientChannelInfo info : groupInfo.getChannelInfoTable().values()) {
274+
String protocolType = ChannelProtocolType.REMOTING.name();
275+
if (info.getChannel() instanceof GrpcClientChannel) {
276+
protocolType = ChannelProtocolType.GRPC_V2.name();
277+
}
278+
consumerBuilder.addConnection(ConsumerClientConnection.newBuilder()
279+
.setClientId(info.getClientId())
280+
.setProtocol(protocolType)
281+
.setAddress(NetworkUtil.socketAddress2String(info.getChannel().remoteAddress()))
282+
.setLanguage(info.getLanguage().name())
283+
.setVersion(MQVersion.getVersionDesc(info.getVersion()))
284+
.setLastUpdateTime(info.getLastUpdateTimestamp())
285+
.build());
286+
}
287+
288+
for (SubscriptionData data : groupInfo.getSubscriptionTable().values()) {
289+
consumerBuilder.addConsumerSubInfo(ConsumerSubInfo.newBuilder()
290+
.setTopic(data.getTopic())
291+
.setSubExpression(data.getSubString())
292+
.build());
293+
}
294+
ConsumerConnectionReply.Builder builder = ConsumerConnectionReply.newBuilder()
295+
.setConsumerGroupCliInfo(consumerBuilder.build());
296+
297+
responseObserver.onNext(builder.build());
298+
responseObserver.onCompleted();
299+
}
300+
244301
}

proxy/src/test/java/com/automq/rocketmq/proxy/grpc/ProxyServiceImplTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package com.automq.rocketmq.proxy.grpc;
1919

2020
import apache.rocketmq.common.v1.Code;
21+
import apache.rocketmq.proxy.v1.ConsumerClientConnection;
22+
import apache.rocketmq.proxy.v1.ConsumerClientConnectionRequest;
2123
import apache.rocketmq.proxy.v1.ProxyServiceGrpc;
2224
import apache.rocketmq.proxy.v1.Status;
2325
import com.automq.rocketmq.common.config.BrokerConfig;
@@ -28,6 +30,7 @@
2830
import com.automq.rocketmq.store.api.MessageStore;
2931
import com.automq.rocketmq.store.model.message.PutResult;
3032
import java.lang.reflect.Field;
33+
import java.util.List;
3134
import java.util.concurrent.CompletableFuture;
3235
import java.util.concurrent.ConcurrentMap;
3336
import org.apache.rocketmq.broker.client.ConsumerManager;
@@ -75,4 +78,14 @@ void relay() {
7578
Status status = proxyClient.relayMessage(TARGET, messageExt.message()).join();
7679
assertEquals(Code.OK, status.getCode());
7780
}
81+
82+
83+
@Test
84+
void ConsumerClientConnection() {
85+
final String groupName = "";
86+
87+
ConsumerClientConnectionRequest request = ConsumerClientConnectionRequest.newBuilder().setGroup(groupName).build();
88+
List<ConsumerClientConnection> list = proxyClient.consumerClientConnection(TARGET, request).join();
89+
90+
}
7891
}

0 commit comments

Comments
 (0)