Skip to content

Commit f77cb65

Browse files
author
bigbigxu
committed
shuttle: Allow closing flow control requests
1 parent a6ceec0 commit f77cb65

File tree

6 files changed

+37
-8
lines changed

6 files changed

+37
-8
lines changed

src/main/java/com/oppo/shuttle/rss/clients/NettyClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class NettyClient {
5555
private final long netWorkTimeout;
5656
private final int ioMaxRetry;
5757
private final long retryBaseWaitTime;
58+
private final boolean flowControlEnable;
5859

5960
private final ResponseCallback callback;
6061

@@ -64,6 +65,7 @@ public NettyClient(List<Ors2ServerGroup> groupList, SparkConf conf, AppTaskInfo
6465
netWorkTimeout = clientFactory.getNetWorkTimeout();
6566
ioMaxRetry = Math.max((int) conf.get(Ors2Config.sendDataMaxRetries()), 1);
6667
retryBaseWaitTime = (long) conf.get(Ors2Config.retryBaseWaitTime());
68+
flowControlEnable = (boolean) conf.get(Ors2Config.flowControlEnable());
6769
long networkSlowTime = (long) conf.get(Ors2Config.networkSlowTime());
6870
mapWriteDispersion = (boolean) conf.get(Ors2Config.mapWriteDispersion());
6971
int workerRetryNumber = (int) conf.get(Ors2Config.workerRetryNumber());
@@ -146,7 +148,7 @@ public void send(int workerId, ShufflePacket packet) {
146148
}
147149

148150
Tuple2<ShuffleClient, ShuffleClient> tuple2 = getClient(workerId, 0, Optional.empty());
149-
Request request = new Request(packet, workerId, tuple2._1, tuple2._2, callback);
151+
Request request = new Request(flowControlEnable, packet, workerId, tuple2._1, tuple2._2, callback);
150152

151153
readySend.incrementAndGet();
152154
request.writeBuild();
@@ -203,7 +205,8 @@ public Request createRetryRequest(Request request, boolean channelError) {
203205
}
204206

205207
Tuple2<ShuffleClient, ShuffleClient> tuple2 = getClient(request.getWorkerId(), retry, errorServer);
206-
return new Request(request.getPacket(), request.getWorkerId(), tuple2._1, tuple2._2, request.getCallback());
208+
return new Request(flowControlEnable, request.getPacket(), request.getWorkerId(),
209+
tuple2._1, tuple2._2, request.getCallback());
207210
}
208211

209212
public Ors2ClientFactory getClientFactory() {

src/main/java/com/oppo/shuttle/rss/clients/handler/Request.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package com.oppo.shuttle.rss.clients.handler;
1818

1919
import com.oppo.shuttle.rss.clients.ShuffleClient;
20+
import com.oppo.shuttle.rss.common.Constants;
2021
import com.oppo.shuttle.rss.common.Ors2WorkerDetail;
2122
import com.oppo.shuttle.rss.messages.ShufflePacket;
2223
import org.apache.spark.util.Utils;
2324

2425
public class Request {
26+
private final boolean flowControlEnable;
2527
private final ShufflePacket packet;
2628
private final ShuffleClient buildClient;
2729
private final ShuffleClient dataClient;
@@ -33,11 +35,13 @@ public class Request {
3335
private final int workerId;
3436

3537
public Request(
38+
boolean flowControlEnable,
3639
ShufflePacket packet,
3740
int workerId,
3841
ShuffleClient buildClient,
3942
ShuffleClient dataClient,
4043
ResponseCallback callback) {
44+
this.flowControlEnable = flowControlEnable;
4145
this.packet = packet;
4246
this.buildClient = buildClient;
4347
this.dataClient = dataClient;
@@ -66,7 +70,11 @@ public void onError(Throwable e) {
6670
}
6771

6872
public void writeBuild() {
69-
buildClient.writeAndFlush(this, packet.buildBuffer());
73+
if (flowControlEnable) {
74+
buildClient.writeAndFlush(this, packet.buildBuffer());
75+
} else {
76+
writeData(Constants.SKIP_CHECK_BUILD_ID, Constants.SKIP_CHECK_BUILD_ID);
77+
}
7078
}
7179

7280
public void writeData(int id, long value) {

src/main/java/com/oppo/shuttle/rss/common/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,5 @@ public class Constants {
7474

7575
public static final String SHUTTLE_HOST_IP_ENV_NAME = "SHUTTLE_HOST_IP";
7676

77+
public static final int SKIP_CHECK_BUILD_ID = -100;
7778
}

src/main/java/com/oppo/shuttle/rss/handlers/ShuffleDataIncomingHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.oppo.shuttle.rss.handlers;
1818

19+
import com.oppo.shuttle.rss.common.Constants;
1920
import com.oppo.shuttle.rss.common.StageShuffleId;
2021
import com.oppo.shuttle.rss.exceptions.Ors2InvalidDataException;
2122
import com.oppo.shuttle.rss.execution.BuildConnectionExecutor;
@@ -58,11 +59,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
5859
ShuffleData shuffleData = (ShuffleData) msg;
5960
ShuffleMessage.UploadPackageRequest up = shuffleData.getUploadPackage();
6061

61-
if (!buildConnExecutor.checkConnectionIdValue(up.getBuildId(), up.getBuildValue())) {
62-
logger.warn("invalid or timeout connectionId&Value: {}, {}, for {}, address {}",
63-
up.getBuildId(), up.getBuildValue(), up.getAppId(), shuffleConnInfo);
64-
} else {
65-
buildConnExecutor.releaseConnection(up.getBuildId());
62+
if (up.getBuildId() != Constants.SKIP_CHECK_BUILD_ID) {
63+
if(!buildConnExecutor.checkConnectionIdValue(up.getBuildId(), up.getBuildValue())) {
64+
logger.warn("invalid or timeout connectionId&Value: {}, {}, for {}, address {}",
65+
up.getBuildId(), up.getBuildValue(), up.getAppId(), shuffleConnInfo);
66+
} else {
67+
buildConnExecutor.releaseConnection(up.getBuildId());
68+
}
6669
}
6770

6871
if (appId == null) {

src/main/scala/org/apache/spark/shuffle/Ors2Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,12 @@ object Ors2Config {
327327
.booleanConf
328328
.createWithDefault(true)
329329

330+
val flowControlEnable: ConfigEntry[Boolean] =
331+
ConfigBuilder("spark.shuffle.rss.flowControlEnable")
332+
.doc("Whether to enable flow control. Defaults to true")
333+
.booleanConf
334+
.createWithDefault(true)
335+
330336
val SHUFFLE_TYPE_ASYNC = "async"
331337
val SHUFFLE_TYPE_SYNC = "sync"
332338

src/test/scala/org/apache/spark/shuffle/Ors2ShuffleManagerTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,12 @@ class Ors2ShuffleManagerTest {
161161
conf.set(Ors2Config.serviceRegistryZKServers, zk)
162162
runWithConf(conf)
163163
}
164+
165+
@Test
166+
def skipFlowControl(): Unit = {
167+
val conf = new SparkConf()
168+
.set(Ors2Config.shuffleWriterType, Ors2Config.SHUFFLE_WRITER_UNSAFE)
169+
.set(Ors2Config.flowControlEnable, false)
170+
runWithConf(conf)
171+
}
164172
}

0 commit comments

Comments
 (0)