Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #349 DelegatingScheduler: Bill Pugh Singleton Implementation #350

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 126 additions & 75 deletions core/src/main/java/dev/failsafe/internal/util/DelegatingScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@

import dev.failsafe.spi.Scheduler;

import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.ForkJoinPool.commonPool;

/**
* A {@link Scheduler} implementation that schedules delays on an internal, common ScheduledExecutorService and executes
Expand All @@ -33,52 +45,83 @@
* @author Ben Manes
*/
public final class DelegatingScheduler implements Scheduler {
public static final DelegatingScheduler INSTANCE = new DelegatingScheduler();
private static volatile ForkJoinPool FORK_JOIN_POOL;
private static volatile ScheduledThreadPoolExecutor DELAYER;
public static final DelegatingScheduler INSTANCE = new DelegatingScheduler(null,null);

private final ExecutorService executorService;
private final ScheduledExecutorService scheduler;
private final int executorType;

private static final int EX_FORK_JOIN = 1;
private static final int EX_COMMON = 4;
private static final int EX_INTERNAL = 8;

private DelegatingScheduler() {
this.executorService = null;
}

public DelegatingScheduler(ExecutorService executor) {
this.executorService = executor;
this(executor, null);
}

public DelegatingScheduler(ExecutorService executor, ScheduledExecutorService scheduler) {
if (executor == null || executor == commonPool()) {
if (ForkJoinPool.getCommonPoolParallelism() > 1) {// @see CompletableFuture#useCommonPool
executorService = commonPool();
executorType = EX_COMMON | EX_FORK_JOIN;

} else {// don't use commonPool(): cannot support parallelism
executorService = null;
executorType = EX_INTERNAL | EX_FORK_JOIN;
}
} else {
executorService = executor;
executorType = executor instanceof ForkJoinPool ? EX_FORK_JOIN
: 0;
}
this.scheduler = scheduler;
}

private static final class DelayerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
DelegatingScheduler (byte flags) {
executorService = null; executorType = flags; scheduler = null;
}//new for tests

private static final class LazyDelayerHolder extends ScheduledThreadPoolExecutor implements ThreadFactory {
private static final ScheduledThreadPoolExecutor DELAYER = new LazyDelayerHolder();

public LazyDelayerHolder(){
super(1);
setThreadFactory(this);
setRemoveOnCancelPolicy(true);
}

@Override public Thread newThread(Runnable r) {
Thread t = new Thread(r, "FailsafeDelayScheduler");
t.setDaemon(true);
t.setName("FailsafeDelayScheduler");
return t;
}
}

private static final class LazyForkJoinPoolHolder {
private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(
Math.max(Runtime.getRuntime().availableProcessors(), 2),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true/*asyncMode*/);
}

static final class ScheduledCompletableFuture<V> extends CompletableFuture<V> implements ScheduledFuture<V> {
// Guarded by this
volatile Future<V> delegate;
// Guarded by this
Thread forkJoinPoolThread;
private final long time;

ScheduledCompletableFuture(long delay, TimeUnit unit) {
this.time = System.nanoTime() + unit.toNanos(delay);
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
public long getDelay(TimeUnit unit){
Future<V> f = delegate;
return f instanceof Delayed ? ((Delayed) f).getDelay(unit)
: 0; // we are executing now
}

@Override
public int compareTo(Delayed other) {
if (other == this) {
if (other == this)// ScheduledFuture<?> gives no extra info
return 0;
} else if (other instanceof ScheduledCompletableFuture) {
return Long.compare(time, ((ScheduledCompletableFuture<?>) other).time);
}
return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS));
}

Expand All @@ -93,75 +136,83 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
return result;
}
}
}//ScheduledCompletableFuture

private static ScheduledExecutorService delayer() {
if (DELAYER == null) {
synchronized (DelegatingScheduler.class) {
if (DELAYER == null) {
ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new DelayerThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
DELAYER = delayer;
}
}
}
return DELAYER;
private ScheduledExecutorService delayer() {
return scheduler != null ? scheduler
: LazyDelayerHolder.DELAYER;
}

private ExecutorService executorService() {
if (executorService != null)
return executorService;
if (FORK_JOIN_POOL == null) {
synchronized (DelegatingScheduler.class) {
if (FORK_JOIN_POOL == null) {
if (ForkJoinPool.getCommonPoolParallelism() > 1)
FORK_JOIN_POOL = ForkJoinPool.commonPool();
else
FORK_JOIN_POOL = new ForkJoinPool(2);
}
}
}
return FORK_JOIN_POOL;
return executorService != null ? executorService
: LazyForkJoinPoolHolder.FORK_JOIN_POOL;
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override @SuppressWarnings({ "unchecked", "rawtypes" })
public ScheduledFuture<?> schedule(Callable<?> callable, long delay, TimeUnit unit) {
ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>(delay, unit);
ExecutorService es = executorService();
boolean isForkJoinPool = es instanceof ForkJoinPool;
Callable<?> completingCallable = () -> {
try {
if (isForkJoinPool) {
// Guard against race with promise.cancel
ScheduledCompletableFuture promise = new ScheduledCompletableFuture<>();
final Callable<?> completingCallable = (executorType & EX_FORK_JOIN) == EX_FORK_JOIN
? () -> {
try {
// Guard against race with promise.cancel
synchronized (promise) {
promise.forkJoinPoolThread = Thread.currentThread();
}
}
promise.complete(callable.call());
} catch (Throwable t) {
promise.completeExceptionally(t);
} finally {
if (isForkJoinPool) {
promise.complete(callable.call());
} catch (Throwable t) {
promise.completeExceptionally(t);
} finally {
synchronized (promise) {
promise.forkJoinPoolThread = null;
}
}
}
return null;
};

if (delay == 0)
promise.delegate = es.submit(completingCallable);
else
promise.delegate = delayer().schedule(() -> {
// Guard against race with promise.cancel
synchronized (promise) {
return null;
}// else not ForkJoin BTW: but why? Other ExecutorServices also support cancellation
: () ->{
try {
promise.complete(callable.call());
} catch (Throwable t) {
promise.completeExceptionally(t);
}
return null;
};

if (delay <= 0) {
promise.delegate = executorService().submit(completingCallable);
return promise;
}

final Callable<Void> r;// use less memory: don't capture variable with commonPool

if ((executorType & EX_COMMON) == EX_COMMON)
r = ()->{ // Guard against race with promise.cancel
synchronized(promise) {
if (!promise.isCancelled())
promise.delegate = es.submit(completingCallable);
promise.delegate = commonPool().submit(completingCallable);
}
return null;
};

else if ((executorType & EX_INTERNAL) == EX_INTERNAL)
r = ()->{// Guard against race with promise.cancel
synchronized(promise) {
if (!promise.isCancelled())
promise.delegate = LazyForkJoinPoolHolder.FORK_JOIN_POOL.submit(completingCallable);
}
}, delay, unit);
return null;
};

else {
final ExecutorService es = executorService();
r = ()->{// Guard against race with promise.cancel
synchronized(promise){
if (!promise.isCancelled())
promise.delegate = es.submit(completingCallable);
}
return null;
};
}
promise.delegate = delayer().schedule(r, delay, unit);
return promise;
}
}
Loading