Skip to content

Commit 35690cf

Browse files
HDFS-17650. [ARR] The router server-side rpc protocol PB supports asynchrony. (#7139). Contributed by hfutatzhanghb.
Co-authored-by: Jian Zhang <[email protected]> Signed-off-by: He Xiaoqiao <[email protected]>
1 parent d7779c2 commit 35690cf

20 files changed

+2554
-124
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/AsyncRpcProtocolPBUtil.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@
2020

2121
import org.apache.hadoop.hdfs.server.federation.router.ThreadLocalContext;
2222
import org.apache.hadoop.hdfs.server.federation.router.async.utils.ApplyFunction;
23+
import org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil;
2324
import org.apache.hadoop.io.Writable;
25+
import org.apache.hadoop.ipc.CallerContext;
2426
import org.apache.hadoop.ipc.Client;
2527
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
28+
import org.apache.hadoop.ipc.ProtobufRpcEngineCallback2;
2629
import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
30+
import org.apache.hadoop.thirdparty.protobuf.Message;
2731
import org.apache.hadoop.util.concurrent.AsyncGet;
2832
import org.slf4j.Logger;
2933
import org.slf4j.LoggerFactory;
3034

3135
import java.io.IOException;
3236
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.CompletionException;
3338
import java.util.concurrent.Executor;
3439

3540
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
@@ -96,6 +101,45 @@ public static <T, R> R asyncIpcClient(
96101
return asyncReturn(clazz);
97102
}
98103

104+
/**
105+
* Asynchronously invokes an RPC call and applies a response transformation function
106+
* to the result on server-side.
107+
* @param req The IPC call encapsulating the RPC request on server-side.
108+
* @param res The function to apply to the response of the RPC call on server-side.
109+
* @param <T> Type of the call's result.
110+
*/
111+
public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
112+
final ProtobufRpcEngineCallback2 callback =
113+
ProtobufRpcEngine2.Server.registerForDeferredResponse2();
114+
115+
CompletableFuture<Object> completableFuture =
116+
CompletableFuture.completedFuture(null);
117+
completableFuture.thenCompose(o -> {
118+
try {
119+
req.req();
120+
return (CompletableFuture<T>) AsyncUtil.getAsyncUtilCompletableFuture();
121+
} catch (Exception e) {
122+
throw new CompletionException(e);
123+
}
124+
}).handle((result, e) -> {
125+
LOG.debug("Async response, callback: {}, CallerContext: {}, result: [{}], exception: [{}]",
126+
callback, CallerContext.getCurrent(), result, e);
127+
if (e == null) {
128+
Message value = null;
129+
try {
130+
value = res.res(result);
131+
} catch (Exception re) {
132+
callback.error(re);
133+
return null;
134+
}
135+
callback.setResponse(value);
136+
} else {
137+
callback.error(e.getCause());
138+
}
139+
return null;
140+
});
141+
}
142+
99143
/**
100144
* Sets the executor used for handling responses asynchronously within
101145
* the utility class.
@@ -105,4 +149,14 @@ public static <T, R> R asyncIpcClient(
105149
public static void setWorker(Executor worker) {
106150
AsyncRpcProtocolPBUtil.worker = worker;
107151
}
152+
153+
@FunctionalInterface
154+
interface ServerReq<T> {
155+
T req() throws Exception;
156+
}
157+
158+
@FunctionalInterface
159+
interface ServerRes<T> {
160+
Message res(T result) throws Exception;
161+
}
108162
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java

Lines changed: 1769 additions & 0 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.protocolPB;
19+
20+
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
21+
import org.apache.hadoop.thirdparty.protobuf.RpcController;
22+
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
23+
import org.apache.hadoop.tools.GetUserMappingsProtocol;
24+
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserRequestProto;
25+
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
26+
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
27+
28+
import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer;
29+
30+
public class RouterGetUserMappingsProtocolServerSideTranslatorPB
31+
extends GetUserMappingsProtocolServerSideTranslatorPB {
32+
private final RouterRpcServer server;
33+
private final boolean isAsyncRpc;
34+
35+
public RouterGetUserMappingsProtocolServerSideTranslatorPB(GetUserMappingsProtocol impl) {
36+
super(impl);
37+
this.server = (RouterRpcServer) impl;
38+
this.isAsyncRpc = server.isAsync();
39+
}
40+
41+
@Override
42+
public GetGroupsForUserResponseProto getGroupsForUser(
43+
RpcController controller,
44+
GetGroupsForUserRequestProto request) throws ServiceException {
45+
if (!isAsyncRpc) {
46+
return super.getGroupsForUser(controller, request);
47+
}
48+
asyncRouterServer(() -> server.getGroupsForUser(request.getUser()), groups -> {
49+
GetGroupsForUserResponseProto.Builder builder =
50+
GetGroupsForUserResponseProto
51+
.newBuilder();
52+
for (String g : groups) {
53+
builder.addGroups(g);
54+
}
55+
return builder.build();
56+
});
57+
return null;
58+
}
59+
}

0 commit comments

Comments
 (0)