Skip to content

Commit 6509d20

Browse files
committed
Dispatch Mutiny scheduled operations using Vert.x timers when possible
Some Mutiny operations such as delaying an item, performing a retry, or generating streams from periodic ticks all require a ScheduledExecutorService. This introduces a wrapper ScheduledExecutorService that dispatches some of the operations to Vert.x timers when called from a Vert.x event-loop, offering a more natural threading mental model as well as avoiding thread hops between worker threads and event-loops. The implementation only tunes Mutiny with that Vert.x-aware wrapper ScheduledExecutorService from the quarkus-vertx extension because non-server applications might not have Vert.x available. We also introduce an artificial build item in the Mutiny extension to ensure that the Vert.x build items always execute after those from Mutiny, or else we could have cases where Mutiny re-installs a non-Vert.x aware scheduler in its own configuration. Fixes: #50918
1 parent 5623ed5 commit 6509d20

File tree

6 files changed

+386
-3
lines changed

6 files changed

+386
-3
lines changed

extensions/mutiny/deployment/src/main/java/io/quarkus/mutiny/deployment/MutinyProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ public class MutinyProcessor {
1616

1717
@BuildStep
1818
@Record(ExecutionTime.RUNTIME_INIT)
19-
public void runtimeInit(ExecutorBuildItem executorBuildItem,
19+
public MutinyRuntimeInitBuildItem runtimeInit(ExecutorBuildItem executorBuildItem,
2020
MutinyInfrastructure recorder,
2121
ShutdownContextBuildItem shutdownContext,
2222
Optional<ContextHandlerBuildItem> contextHandler) {
2323
ContextHandler<Object> handler = contextHandler.map(ContextHandlerBuildItem::contextHandler).orElse(null);
2424
recorder.configureMutinyInfrastructure(executorBuildItem.getExecutorProxy(), shutdownContext, handler);
25+
return new MutinyRuntimeInitBuildItem();
2526
}
2627

2728
@BuildStep
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.quarkus.mutiny.deployment;
2+
3+
import io.quarkus.builder.item.SimpleBuildItem;
4+
5+
/**
6+
* Marker build item to detect when Mutiny has been initialized at runtime.
7+
*/
8+
public final class MutinyRuntimeInitBuildItem extends SimpleBuildItem {
9+
10+
}

extensions/vertx/deployment/src/main/java/io/quarkus/vertx/core/deployment/VertxCoreProcessor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
5353
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
5454
import io.quarkus.gizmo.Gizmo;
55+
import io.quarkus.mutiny.deployment.MutinyRuntimeInitBuildItem;
5556
import io.quarkus.netty.deployment.EventLoopSupplierBuildItem;
5657
import io.quarkus.vertx.VertxOptionsCustomizer;
5758
import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
@@ -234,7 +235,11 @@ CoreVertxBuildItem build(
234235
List<VertxOptionsConsumerBuildItem> vertxOptionsConsumers,
235236
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
236237
BuildProducer<EventLoopSupplierBuildItem> eventLoops,
237-
ExecutorBuildItem executorBuildItem) {
238+
ExecutorBuildItem executorBuildItem,
239+
MutinyRuntimeInitBuildItem mutinyRuntimeInitBuildItem) {
240+
241+
// Override the Mutiny infrastructure ScheduledExecutorService to dispatch scheduled operations to a Vert.x timer
242+
recorder.wrapMainExecutorForMutiny(executorBuildItem.getExecutorProxy());
238243

239244
Collections.sort(vertxOptionsConsumers);
240245
List<Consumer<VertxOptions>> consumers = new ArrayList<>(vertxOptionsConsumers.size());
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package io.quarkus.vertx;
2+
3+
import static org.awaitility.Awaitility.await;
4+
import static org.hamcrest.CoreMatchers.containsString;
5+
import static org.hamcrest.MatcherAssert.assertThat;
6+
import static org.hamcrest.Matchers.allOf;
7+
import static org.hamcrest.Matchers.everyItem;
8+
import static org.hamcrest.Matchers.greaterThan;
9+
import static org.hamcrest.Matchers.hasSize;
10+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
11+
import static org.hamcrest.Matchers.not;
12+
13+
import java.io.IOException;
14+
import java.time.Duration;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.concurrent.atomic.LongAdder;
20+
21+
import jakarta.enterprise.context.ApplicationScoped;
22+
import jakarta.inject.Inject;
23+
24+
import org.jboss.shrinkwrap.api.ShrinkWrap;
25+
import org.jboss.shrinkwrap.api.spec.JavaArchive;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.RegisterExtension;
28+
29+
import io.quarkus.test.QuarkusUnitTest;
30+
import io.smallrye.mutiny.Multi;
31+
import io.smallrye.mutiny.Uni;
32+
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
33+
import io.smallrye.mutiny.subscription.Cancellable;
34+
import io.vertx.core.Vertx;
35+
36+
public class ScheduledTasksTest {
37+
38+
@RegisterExtension
39+
static final QuarkusUnitTest config = new QuarkusUnitTest()
40+
.setArchiveProducer(() -> ShrinkWrap
41+
.create(JavaArchive.class).addClasses(MyBean.class));
42+
43+
@Inject
44+
Vertx vertx;
45+
46+
@Inject
47+
MyBean myBean;
48+
49+
@Test
50+
void uniFromRegularThread() {
51+
String hello = myBean.delayedHello().await().atMost(Duration.ofSeconds(5));
52+
assertThat(hello, containsString("Hello!"));
53+
assertThat(hello, containsString("executor-thread-"));
54+
}
55+
56+
@Test
57+
void uniFromEventLoop() {
58+
AtomicReference<String> result = new AtomicReference<>();
59+
vertx.runOnContext(v -> {
60+
myBean.delayedHello().subscribe().with(
61+
result::set,
62+
err -> result.set(err.getMessage()));
63+
});
64+
await().atMost(Duration.ofSeconds(5)).until(() -> result.get() != null);
65+
assertThat(result.get(), containsString("Hello!"));
66+
assertThat(result.get(), containsString("vert.x-eventloop-thread-"));
67+
}
68+
69+
@Test
70+
void multiFromRegularThread() {
71+
LongAdder counter = new LongAdder();
72+
AtomicBoolean cancelled = new AtomicBoolean();
73+
ArrayList<String> items = new ArrayList<>();
74+
Cancellable cancellable = myBean.ticks()
75+
.onCancellation().invoke(() -> cancelled.set(true))
76+
.subscribe().with(tick -> {
77+
items.add(tick);
78+
counter.increment();
79+
});
80+
long start = System.currentTimeMillis();
81+
await().atMost(Duration.ofSeconds(5)).untilAdder(counter, greaterThan(3L));
82+
cancellable.cancel();
83+
await().atMost(Duration.ofSeconds(1)).untilTrue(cancelled);
84+
long duration = System.currentTimeMillis() - start;
85+
assertThat((long) items.size(), lessThanOrEqualTo(duration / 100L + 1L));
86+
assertThat(items.get(0), containsString("Hello executor-thread-"));
87+
}
88+
89+
@Test
90+
void multiFromEventLoop() {
91+
LongAdder counter = new LongAdder();
92+
AtomicBoolean cancelled = new AtomicBoolean();
93+
ArrayList<String> items = new ArrayList<>();
94+
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
95+
vertx.runOnContext(v -> {
96+
cancellable.set(myBean.ticks()
97+
.onCancellation().invoke(() -> cancelled.set(true))
98+
.subscribe().with(tick -> {
99+
items.add(tick);
100+
counter.increment();
101+
}));
102+
});
103+
long start = System.currentTimeMillis();
104+
await().atMost(Duration.ofSeconds(5)).untilAdder(counter, greaterThan(3L));
105+
cancellable.get().cancel();
106+
await().atMost(Duration.ofSeconds(1)).untilTrue(cancelled);
107+
long duration = System.currentTimeMillis() - start;
108+
assertThat((long) items.size(), lessThanOrEqualTo(duration / 100L + 1L));
109+
assertThat(items.get(0), containsString("Hello vert.x-eventloop-thread-"));
110+
}
111+
112+
@Test
113+
void retryFromRegularThread() {
114+
List<String> threadTraces = new ArrayList<>();
115+
UniAssertSubscriber<String> sub = UniAssertSubscriber.create();
116+
myBean.backoff(threadTraces).subscribe().withSubscriber(sub);
117+
sub.awaitFailure().assertFailedWith(IOException.class, "boom");
118+
assertThat(threadTraces, hasSize(6));
119+
assertThat(threadTraces, allOf((everyItem(not(containsString("vert.x-eventloop-thread-"))))));
120+
}
121+
122+
@Test
123+
void retryFromEventLoop() {
124+
List<String> threadTraces = new ArrayList<>();
125+
UniAssertSubscriber<String> sub = UniAssertSubscriber.create();
126+
vertx.runOnContext(v -> myBean.backoff(threadTraces).subscribe().withSubscriber(sub));
127+
sub.awaitFailure().assertFailedWith(IOException.class, "boom");
128+
assertThat(threadTraces, hasSize(6));
129+
assertThat(threadTraces, allOf(everyItem(containsString("vert.x-eventloop-thread-"))));
130+
}
131+
132+
@ApplicationScoped
133+
static class MyBean {
134+
135+
public Uni<String> delayedHello() {
136+
return Uni.createFrom().item("Hello!")
137+
.onItem().delayIt().by(Duration.ofSeconds(1L))
138+
.onItem().transform(s -> s + " :: " + Thread.currentThread().getName());
139+
}
140+
141+
public Multi<String> ticks() {
142+
return Multi.createFrom().ticks()
143+
.every(Duration.ofMillis(100))
144+
.onItem().transform(tick -> "Hello " + Thread.currentThread().getName());
145+
}
146+
147+
public Uni<String> backoff(List<String> threadTraces) {
148+
return Uni.createFrom().<String> failure(new IOException("boom"))
149+
.onSubscription().invoke(() -> threadTraces.add(Thread.currentThread().getName()))
150+
.onFailure(IOException.class).retry().withBackOff(Duration.ofMillis(10)).atMost(5L);
151+
}
152+
}
153+
}

extensions/vertx/runtime/src/main/java/io/quarkus/vertx/core/runtime/VertxCoreRecorder.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,20 @@
88
import static io.vertx.core.file.impl.FileResolverImpl.CACHE_DIR_BASE_PROP_NAME;
99

1010
import java.io.File;
11-
import java.util.*;
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Locale;
16+
import java.util.Map;
17+
import java.util.Optional;
18+
import java.util.Random;
19+
import java.util.Set;
1220
import java.util.concurrent.CompletableFuture;
1321
import java.util.concurrent.ConcurrentMap;
1422
import java.util.concurrent.CountDownLatch;
1523
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.ScheduledExecutorService;
1625
import java.util.concurrent.ThreadFactory;
1726
import java.util.concurrent.TimeUnit;
1827
import java.util.concurrent.atomic.AtomicInteger;
@@ -39,6 +48,7 @@
3948
import io.quarkus.vertx.runtime.VertxCurrentContextFactory;
4049
import io.quarkus.vertx.runtime.jackson.QuarkusJacksonFactory;
4150
import io.smallrye.common.cpu.ProcessorInfo;
51+
import io.smallrye.mutiny.infrastructure.Infrastructure;
4252
import io.vertx.core.AsyncResult;
4353
import io.vertx.core.Context;
4454
import io.vertx.core.Handler;
@@ -755,4 +765,9 @@ private static void deleteDirectory(File directory) {
755765
}
756766
directory.delete();
757767
}
768+
769+
public void wrapMainExecutorForMutiny(ScheduledExecutorService service) {
770+
VertxTimerAwareScheduledExecutorService wrapper = new VertxTimerAwareScheduledExecutorService(service);
771+
Infrastructure.setDefaultExecutor(wrapper, false);
772+
}
758773
}

0 commit comments

Comments
 (0)