Skip to content

Commit 2058cb9

Browse files
committed
fix TestShuffleChannelHandler
1 parent fdb7efd commit 2058cb9

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import io.netty.handler.ssl.SslContextBuilder;
4545
import io.netty.handler.ssl.SslHandler;
4646
import io.netty.handler.stream.ChunkedWriteHandler;
47+
import io.netty.util.ReferenceCounted;
4748
import io.netty.util.concurrent.GlobalEventExecutor;
4849

4950
import java.io.ByteArrayOutputStream;
@@ -56,11 +57,13 @@
5657
import java.security.cert.X509Certificate;
5758
import java.util.ArrayList;
5859
import java.util.Arrays;
60+
import java.util.Collection;
5961
import java.util.Collections;
6062
import java.util.LinkedList;
6163
import java.util.List;
6264
import java.util.Objects;
6365
import java.util.concurrent.ConcurrentHashMap;
66+
import java.util.concurrent.ExecutionException;
6467
import java.util.concurrent.TimeUnit;
6568

6669
import javax.crypto.SecretKey;
@@ -115,6 +118,7 @@ public void testGetMapsChunkedFileSSl() throws Exception {
115118
final LinkedList<Object> unencryptedMessages = new LinkedList<>();
116119
final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
117120
t.testGetAllAttemptsForReduce0NoKeepAlive(unencryptedMessages, shuffle);
121+
drainChannel(shuffle);
118122
}
119123

120124
@Test
@@ -194,6 +198,8 @@ public void testIncompatibleShuffleVersion() {
194198
actual.toString());
195199

196200
assertFalse(shuffle.isActive(), "closed"); // known-issue
201+
tryRelease(actual);
202+
drainChannel(decoder);
197203
}
198204

199205
@Test
@@ -217,6 +223,9 @@ public void testInvalidMapNoIndexFile() {
217223
actual.toString());
218224

219225
assertFalse(shuffle.isActive(), "closed");
226+
227+
tryRelease(actual);
228+
drainChannel(decoder);
220229
}
221230

222231
@Test
@@ -243,9 +252,31 @@ public void testInvalidMapNoDataFile() {
243252
assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
244253
actual.toString());
245254

255+
tryRelease(actual);
256+
drainChannel(decoder);
257+
246258
assertFalse(shuffle.isActive(), "closed");
247259
}
248260

261+
private void drainChannel(EmbeddedChannel ch) {
262+
Object o;
263+
while((o = ch.readInbound())!=null) {
264+
tryRelease(o);
265+
}
266+
while((o = ch.readOutbound())!=null) {
267+
tryRelease(o);
268+
}
269+
}
270+
271+
private void tryRelease(Object obj) {
272+
if (obj instanceof ReferenceCounted) {
273+
ReferenceCounted bb = (ReferenceCounted) obj;
274+
if (bb.refCnt() > 0) {
275+
bb.release(bb.refCnt());
276+
}
277+
}
278+
}
279+
249280
private DefaultHttpResponse getExpectedHttpResponse(HttpResponseStatus status) {
250281
DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
251282
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
@@ -366,7 +397,7 @@ private void testGetAllAttemptsForReduce0NoKeepAlive(
366397
}
367398

368399
private void testKeepAlive(java.util.Queue<Object> messages,
369-
EmbeddedChannel shuffle) throws IOException {
400+
EmbeddedChannel shuffle) throws IOException, InterruptedException, ExecutionException {
370401
final FullHttpRequest req1 = createRequest(
371402
getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true));
372403
shuffle.writeInbound(req1);
@@ -375,6 +406,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
375406
getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
376407
);
377408
assertTrue(shuffle.isActive(), "keep-alive");
409+
drainChannel(shuffle);
378410
messages.clear();
379411

380412
final FullHttpRequest req2 = createRequest(
@@ -385,6 +417,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
385417
getAttemptData(new Attempt(TEST_ATTEMPT_2, TEST_DATA_B))
386418
);
387419
assertTrue(shuffle.isActive(), "keep-alive");
420+
drainChannel(shuffle);
388421
messages.clear();
389422

390423
final FullHttpRequest req3 = createRequest(
@@ -395,6 +428,7 @@ private void testKeepAlive(java.util.Queue<Object> messages,
395428
getAttemptData(new Attempt(TEST_ATTEMPT_3, TEST_DATA_C))
396429
);
397430
assertFalse(shuffle.isActive(), "no keep-alive");
431+
drainChannel(shuffle);
398432
}
399433

400434
private ArrayList<ByteBuf> getAllAttemptsForReduce0() throws IOException {
@@ -442,6 +476,8 @@ private void assertResponse(java.util.Queue<Object> outboundMessages,
442476
}
443477

444478
i++;
479+
tryRelease(obj);
480+
drainChannel(decodeChannel);
445481
}
446482

447483
// This check is done after to have better debug logs on failure.

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public void setup() throws IOException {
8484

8585
@AfterEach
8686
public void teardown() {
87+
//Trigger GC so that we get the leak warnings early
88+
System.gc();
89+
try {
90+
// Wait for logger to flush
91+
Thread.sleep(1000);
92+
} catch (InterruptedException e) {
93+
}
8794
System.setOut(standardOut);
8895
System.out.print(outputStreamCaptor);
8996
// For this to work ch.qos.logback.classic is needed for some reason

0 commit comments

Comments
 (0)