Skip to content

Commit c190fe7

Browse files
committed
Remove synchronized in Waiter
1 parent fba56e4 commit c190fe7

File tree

2 files changed

+74
-40
lines changed

2 files changed

+74
-40
lines changed

db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Waiter.java

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@
1515

1616
import java.time.Duration;
1717
import java.time.Instant;
18+
import java.util.concurrent.locks.Condition;
19+
import java.util.concurrent.locks.ReentrantLock;
20+
import java.util.function.Supplier;
1821
import org.slf4j.Logger;
1922
import org.slf4j.LoggerFactory;
2023

2124
public class Waiter {
25+
2226
private static final Logger LOG = LoggerFactory.getLogger(Waiter.class);
2327

24-
private final Object lock;
28+
private final ReentrantLock lock;
29+
private final Condition condition;
2530
private boolean woken = false;
2631
private final Duration duration;
27-
private Clock clock;
32+
private final Clock clock;
2833
private boolean isWaiting = false;
2934
private boolean skipNextWait = false;
3035

@@ -33,66 +38,77 @@ public Waiter(Duration duration) {
3338
}
3439

3540
public Waiter(Duration duration, Clock clock) {
36-
this(duration, clock, new Object());
41+
this.duration = duration;
42+
this.clock = clock;
43+
this.lock = new ReentrantLock();
44+
this.condition = this.lock.newCondition();
3745
}
3846

39-
Waiter(Duration duration, Clock clock, Object lock) {
47+
Waiter(Duration duration, Clock clock, ReentrantLock lock, Condition condition) {
4048
this.duration = duration;
4149
this.clock = clock;
4250
this.lock = lock;
51+
this.condition = condition;
4352
}
4453

4554
public void doWait() throws InterruptedException {
4655
long millis = duration.toMillis();
4756

48-
if (millis > 0) {
49-
Instant waitUntil = clock.now().plusMillis(millis);
50-
51-
while (clock.now().isBefore(waitUntil)) {
52-
synchronized (lock) {
53-
if (skipNextWait) {
54-
LOG.debug("Waiter has been notified to skip next wait-period. Skipping wait.");
55-
skipNextWait = false;
56-
return;
57-
}
58-
59-
woken = false;
60-
LOG.debug("Waiter start wait.");
61-
this.isWaiting = true;
62-
lock.wait(millis);
63-
this.isWaiting = false;
64-
if (woken) {
65-
LOG.debug(
66-
"Waiter woken, it had {}ms left to wait.",
67-
(waitUntil.toEpochMilli() - clock.now().toEpochMilli()));
68-
return;
69-
}
57+
if (millis <= 0) {
58+
return;
59+
}
60+
61+
Instant waitUntil = clock.now().plusMillis(millis);
62+
63+
while (clock.now().isBefore(waitUntil)) {
64+
lock.lock();
65+
try {
66+
if (skipNextWait) {
67+
LOG.debug("Waiter has been notified to skip next wait-period. Skipping wait.");
68+
skipNextWait = false;
69+
return;
70+
}
71+
72+
woken = false;
73+
LOG.debug("Waiter start wait.");
74+
this.isWaiting = true;
75+
condition.awaitNanos(millis * 1_000_000);
76+
this.isWaiting = false;
77+
if (woken) {
78+
LOG.debug(
79+
"Waiter woken, it had {}ms left to wait.",
80+
(waitUntil.toEpochMilli() - clock.now().toEpochMilli()));
81+
return;
7082
}
83+
84+
} finally {
85+
lock.unlock();
7186
}
7287
}
7388
}
7489

75-
public boolean wake() {
76-
synchronized (lock) {
90+
boolean wake() {
91+
return withLock(lock, () -> {
7792
if (!isWaiting) {
7893
return false;
7994
} else {
8095
woken = true;
81-
lock.notify();
96+
condition.signal();
8297
return true;
8398
}
84-
}
99+
});
85100
}
86101

87102
public void wakeOrSkipNextWait() {
88103
// Take early lock to avoid race-conditions. Lock is also taken in wake() (lock is re-entrant)
89-
synchronized (lock) {
104+
withLock(lock, () -> {
90105
final boolean awoken = wake();
91106
if (!awoken) {
92107
LOG.debug("Waiter not waiting, instructing to skip next wait.");
93108
this.skipNextWait = true;
94109
}
95-
}
110+
return null;
111+
});
96112
}
97113

98114
public Duration getWaitDuration() {
@@ -102,4 +118,20 @@ public Duration getWaitDuration() {
102118
public boolean isWaiting() {
103119
return isWaiting;
104120
}
121+
122+
public static <T> T withLock(ReentrantLock lock, Supplier<T> action) {
123+
lock.lock();
124+
try {
125+
return action.get();
126+
} finally {
127+
lock.unlock();
128+
}
129+
}
130+
131+
public static <T> T withLock(ReentrantLock lock, Runnable action) {
132+
return withLock(lock, () -> {
133+
action.run();
134+
return null;
135+
});
136+
}
105137
}

db-scheduler/src/test/java/com/github/kagkarlsson/scheduler/WaiterTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44

55
import java.time.Duration;
66
import java.util.concurrent.*;
7+
import java.util.concurrent.locks.Condition;
8+
import java.util.concurrent.locks.ReentrantLock;
79
import org.junit.jupiter.api.AfterEach;
810
import org.junit.jupiter.api.BeforeEach;
911
import org.junit.jupiter.api.Test;
1012

1113
public class WaiterTest {
14+
1215
private ExecutorService executor;
1316

1417
@BeforeEach
@@ -45,23 +48,21 @@ public void should_wait_until_woken() throws ExecutionException, InterruptedExce
4548

4649
@Test
4750
public void should_wait_for_duration_even_if_prematurely_notified()
48-
throws ExecutionException, InterruptedException {
49-
Object lock = new Object();
51+
throws ExecutionException, InterruptedException {
52+
ReentrantLock lock = new ReentrantLock();
53+
Condition condition = lock.newCondition();
5054

51-
Waiter waiter = new Waiter(Duration.ofMillis(200), new SystemClock(), lock);
55+
Waiter waiter = new Waiter(Duration.ofMillis(200), new SystemClock(), lock, condition);
5256
Future<Long> waitTime = executor.submit(new WaitForWaiter(waiter));
5357
sleep(20); // give executor time to get to wait(..)
5458

55-
synchronized (lock) {
56-
lock.notify();
57-
}
58-
59+
Waiter.withLock(lock, condition::signal);
5960
assertTrue(waitTime.get() >= 200L, "Waited: " + waitTime.get());
6061
}
6162

6263
@Test
6364
public void should_not_wait_if_instructed_to_skip_next()
64-
throws ExecutionException, InterruptedException {
65+
throws ExecutionException, InterruptedException {
6566
Waiter waiter = new Waiter(Duration.ofMillis(1000));
6667
waiter.wakeOrSkipNextWait(); // set skip
6768
Future<Long> waitTime = executor.submit(new WaitForWaiter(waiter));
@@ -77,6 +78,7 @@ private void sleep(int millis) {
7778
}
7879

7980
private static class WaitForWaiter implements Callable<Long> {
81+
8082
private Waiter waiter;
8183

8284
WaitForWaiter(Waiter waiter) {

0 commit comments

Comments
 (0)