Skip to content

Commit c347def

Browse files
committed
Move things
* rate limiting api to hadoop-aws * semaphored delegating excutor Itest to hadoop common unit test
1 parent d7bcb5f commit c347def

File tree

5 files changed

+81
-65
lines changed

5 files changed

+81
-65
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
128128

129129
@Override
130130
public <T> Future<T> submit(Callable<T> task) {
131+
rejectWhenShutdown();
131132
try (DurationTracker ignored =
132133
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
133134
queueingPermits.acquire();
@@ -140,6 +141,7 @@ public <T> Future<T> submit(Callable<T> task) {
140141

141142
@Override
142143
public <T> Future<T> submit(Runnable task, T result) {
144+
rejectWhenShutdown();
143145
try (DurationTracker ignored =
144146
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
145147
queueingPermits.acquire();
@@ -211,7 +213,11 @@ public String toString() {
211213
return sb.toString();
212214
}
213215

214-
private void rejectWhenShutdown() {
216+
/**
217+
* Raise an exception if invoked when the executor is shut down.
218+
* @throws RejectedExecutionException if the executor is shut down.
219+
*/
220+
private void rejectWhenShutdown() throws RejectedExecutionException{
215221
if (isShutdown()) {
216222
throw new RejectedExecutionException("ExecutorService is shutdown");
217223
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java renamed to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestBlockingThreadPoolExecutorService.java

Lines changed: 63 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,77 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.fs.s3a;
20-
21-
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
22-
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
23-
import org.apache.hadoop.util.StopWatch;
24-
25-
import org.junit.AfterClass;
26-
import org.junit.Rule;
27-
import org.junit.Test;
28-
import org.junit.rules.Timeout;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
19+
package org.apache.hadoop.util;
3120

3221
import java.util.concurrent.Callable;
3322
import java.util.concurrent.CountDownLatch;
3423
import java.util.concurrent.ExecutorService;
3524
import java.util.concurrent.Future;
25+
import java.util.concurrent.RejectedExecutionException;
3626
import java.util.concurrent.TimeUnit;
3727

38-
import static org.junit.Assert.assertEquals;
28+
import org.assertj.core.api.Assertions;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
import org.apache.hadoop.test.AbstractHadoopTestBase;
36+
37+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3938

4039
/**
41-
* Basic test for S3A's blocking executor service.
40+
* Test for the blocking executor service.
4241
*/
43-
public class ITestBlockingThreadPoolExecutorService {
42+
public class TestBlockingThreadPoolExecutorService extends AbstractHadoopTestBase {
4443

4544
private static final Logger LOG = LoggerFactory.getLogger(
46-
ITestBlockingThreadPoolExecutorService.class);
45+
TestBlockingThreadPoolExecutorService.class);
4746

4847
private static final int NUM_ACTIVE_TASKS = 4;
48+
4949
private static final int NUM_WAITING_TASKS = 2;
50+
5051
private static final int TASK_SLEEP_MSEC = 100;
52+
5153
private static final int SHUTDOWN_WAIT_MSEC = 200;
54+
5255
private static final int SHUTDOWN_WAIT_TRIES = 5;
56+
5357
private static final int BLOCKING_THRESHOLD_MSEC = 50;
5458

5559
private static final Integer SOME_VALUE = 1337;
5660

57-
private static BlockingThreadPoolExecutorService tpe;
61+
private BlockingThreadPoolExecutorService tpe;
5862

59-
@Rule
60-
public Timeout testTimeout = new Timeout(60, TimeUnit.SECONDS);
6163

62-
@AfterClass
63-
public static void afterClass() throws Exception {
64+
@Before
65+
public void setup() throws Exception {
66+
ensureCreated();
67+
}
68+
69+
@After
70+
public void teardown() throws Exception {
6471
ensureDestroyed();
6572
}
6673

74+
6775
/**
6876
* Basic test of running one trivial task.
6977
*/
7078
@Test
7179
public void testSubmitCallable() throws Exception {
72-
ensureCreated();
7380
Future<Integer> f = tpe.submit(callableSleeper);
7481
Integer v = f.get();
75-
assertEquals(SOME_VALUE, v);
82+
Assertions.assertThat(v).isEqualTo(SOME_VALUE);
7683
}
7784

7885
/**
7986
* More involved test, including detecting blocking when at capacity.
8087
*/
8188
@Test
8289
public void testSubmitRunnable() throws Exception {
83-
ensureCreated();
8490
verifyQueueSize(tpe, NUM_ACTIVE_TASKS + NUM_WAITING_TASKS);
8591
}
8692

@@ -102,27 +108,30 @@ protected void verifyQueueSize(ExecutorService executorService,
102108
assertDidBlock(stopWatch);
103109
}
104110

105-
@Test
106-
public void testShutdown() throws Exception {
107-
// Cover create / destroy, regardless of when this test case runs
108-
ensureCreated();
109-
ensureDestroyed();
110-
111-
// Cover create, execute, destroy, regardless of when test case runs
112-
ensureCreated();
113-
testSubmitRunnable();
114-
ensureDestroyed();
115-
}
116-
117111
@Test
118112
public void testChainedQueue() throws Throwable {
119-
ensureCreated();
120113
int size = 2;
121114
ExecutorService wrapper = new SemaphoredDelegatingExecutor(tpe,
122115
size, true);
123116
verifyQueueSize(wrapper, size);
124117
}
125118

119+
@Test
120+
public void testShutdownQueueRejectsOperations() throws Throwable {
121+
tpe.shutdown();
122+
Assertions.assertThat(tpe.isShutdown())
123+
.describedAs("%s should be shutdown", tpe)
124+
.isTrue();
125+
// runnable
126+
intercept(RejectedExecutionException.class, () ->
127+
tpe.submit(failToRun));
128+
// callable
129+
intercept(RejectedExecutionException.class, () ->
130+
tpe.submit(() -> 0));
131+
intercept(RejectedExecutionException.class, () ->
132+
tpe.execute(failToRun));
133+
}
134+
126135
// Helper functions, etc.
127136

128137
private void assertDidBlock(StopWatch sw) {
@@ -135,28 +144,27 @@ private void assertDidBlock(StopWatch sw) {
135144
}
136145
}
137146

138-
private Runnable sleeper = new Runnable() {
139-
@Override
140-
public void run() {
141-
String name = Thread.currentThread().getName();
142-
try {
143-
Thread.sleep(TASK_SLEEP_MSEC);
144-
} catch (InterruptedException e) {
145-
LOG.info("Thread {} interrupted.", name);
146-
Thread.currentThread().interrupt();
147-
}
148-
}
147+
private Runnable failToRun = () -> {
148+
throw new RuntimeException("This runnable raises and exception");
149149
};
150150

151-
private Callable<Integer> callableSleeper = new Callable<Integer>() {
152-
@Override
153-
public Integer call() throws Exception {
154-
sleeper.run();
155-
return SOME_VALUE;
151+
private Runnable sleeper = () -> {
152+
String name = Thread.currentThread().getName();
153+
try {
154+
Thread.sleep(TASK_SLEEP_MSEC);
155+
} catch (InterruptedException e) {
156+
LOG.info("Thread {} interrupted.", name);
157+
Thread.currentThread().interrupt();
156158
}
157159
};
158160

161+
private Callable<Integer> callableSleeper = () -> {
162+
sleeper.run();
163+
return SOME_VALUE;
164+
};
165+
159166
private class LatchedSleeper implements Runnable {
167+
160168
private final CountDownLatch latch;
161169

162170
LatchedSleeper(CountDownLatch latch) {
@@ -178,7 +186,7 @@ public void run() {
178186
/**
179187
* Helper function to create thread pool under test.
180188
*/
181-
private static void ensureCreated() throws Exception {
189+
private void ensureCreated() throws Exception {
182190
if (tpe == null) {
183191
LOG.debug("Creating thread pool");
184192
tpe = BlockingThreadPoolExecutorService.newInstance(
@@ -191,7 +199,7 @@ private static void ensureCreated() throws Exception {
191199
* Helper function to terminate thread pool under test, asserting that
192200
* shutdown -> terminate works as expected.
193201
*/
194-
private static void ensureDestroyed() throws Exception {
202+
private void ensureDestroyed() throws Exception {
195203
if (tpe == null) {
196204
return;
197205
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
5858
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
5959
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
60-
import org.apache.hadoop.io.IORateLimiting;
60+
import org.apache.hadoop.fs.s3a.api.IORateLimiting;
6161
import org.apache.hadoop.service.Service;
6262

6363
/**
@@ -117,6 +117,14 @@ public interface S3AStore extends
117117

118118
ClientManager clientManager();
119119

120+
/**
121+
* Get the Multipart IO operations.
122+
* @return an instance of multipart IO.
123+
*/
124+
default MultipartIOService getMultipartIO() {
125+
return lookupService(MultipartIO.MULTIPART_IO, MultipartIOService.class);
126+
}
127+
120128
/**
121129
* Look up a service by name, validate its classtype and then return the cast value.
122130
* This allows for the lookup of any registered service within the store, if the name
@@ -439,13 +447,6 @@ default boolean hasCapability(String capability) {
439447
*/
440448
WriteOperationHelper.WriteOperationHelperCallbacks createWriteOperationHelperCallbacks();
441449

442-
/**
443-
* Get the Multipart IO operations.
444-
* @return an instance of multipart IO.
445-
*/
446-
default MultipartIOService getMultipartIO() {
447-
return lookupService(MultipartIO.MULTIPART_IO, MultipartIOService.class);
448-
}
449450

450451
/*
451452
=============== END WriteOperationHelperCallbacks ===============

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IORateLimiting.java renamed to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/IORateLimiting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.hadoop.io;
19+
package org.apache.hadoop.fs.s3a.api;
2020

2121
import java.time.Duration;
2222

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestStoreClose.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,5 @@ public void testMultipartUploadClosed() throws Throwable {
7979
fs.close();
8080
intercept(RejectedExecutionException.class, out::close);
8181
}
82+
8283
}

0 commit comments

Comments
 (0)