Skip to content

Commit 7e8b624

Browse files
authored
Merge pull request #54 from ozangunalp/turn_off_graceful_shutdown_for_tests
Turn off graceful shutdown for connector tests by default
2 parents c34bfcd + 0b1cc5d commit 7e8b624

File tree

14 files changed

+58
-50
lines changed

14 files changed

+58
-50
lines changed

quarkus-solace-messaging-connector/runtime/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@
1111
<artifactId>quarkus-solace-messaging-connector</artifactId>
1212
<name>Quarkus Solace Messaging Connector - Runtime</name>
1313
<dependencies>
14-
<dependency>
15-
<groupId>io.smallrye.config</groupId>
16-
<artifactId>smallrye-config</artifactId>
17-
</dependency>
1814
<dependency>
1915
<groupId>io.quarkus</groupId>
2016
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>

quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,19 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
119119
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
120120
: m)
121121
.onItem().invoke(() -> alive.set(true))
122-
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> {
123-
failures.add(t);
124-
alive.set(false);
125-
});
122+
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);
126123
if (!lazyStart) {
127124
receiver.start();
128125
}
129126
}
130127

131-
private void reportFailure(Throwable throwable) {
132-
failures.add(throwable);
128+
private synchronized void reportFailure(Throwable throwable) {
133129
alive.set(false);
130+
// Don't keep all the failures, there are only there for reporting.
131+
if (failures.size() == 10) {
132+
failures.remove(0);
133+
}
134+
failures.add(throwable);
134135
}
135136

136137
private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
@@ -208,12 +209,16 @@ public void close() {
208209
}
209210
closed.compareAndSet(false, true);
210211
if (this.pollerThread != null) {
211-
this.pollerThread.shutdown();
212-
try {
213-
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
214-
} catch (InterruptedException e) {
215-
SolaceLogging.log.shutdownException(e.getMessage());
216-
throw new RuntimeException(e);
212+
if (this.gracefulShutdown) {
213+
this.pollerThread.shutdown();
214+
try {
215+
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
216+
} catch (InterruptedException e) {
217+
SolaceLogging.log.shutdownException(e.getMessage());
218+
throw new RuntimeException(e);
219+
}
220+
} else {
221+
this.pollerThread.shutdownNow();
217222
}
218223
}
219224
receiver.terminate(3000);
@@ -233,10 +238,8 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
233238
synchronized (this) {
234239
reportedFailures = new ArrayList<>(failures);
235240
}
236-
237241
builder.add(channel, solace.isConnected() && alive.get(),
238242
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
239-
failures.removeAll(reportedFailures);
240243
} else {
241244
builder.add(channel, solace.isConnected() && alive.get());
242245
}

quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
7676
boolean lazyStart = oc.getClientLazyStart();
7777
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
7878
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
79-
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> {
80-
failures.add(t);
81-
alive.set(false);
82-
}));
79+
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure));
8380
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
8481
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
8582
if (!lazyStart) {
@@ -106,11 +103,18 @@ private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean wai
106103
return Uni.createFrom().completionStage(m.getAck());
107104
})
108105
.onFailure().recoverWithUni(t -> {
109-
failures.add(t);
110-
alive.set(false);
106+
reportFailure(t);
111107
return Uni.createFrom().completionStage(m.nack(t));
112108
});
109+
}
113110

111+
private synchronized void reportFailure(Throwable throwable) {
112+
alive.set(false);
113+
// Don't keep all the failures, there are only there for reporting.
114+
if (failures.size() == 10) {
115+
failures.remove(0);
116+
}
117+
failures.add(throwable);
114118
}
115119

116120
private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher, Message<?> m,
@@ -249,9 +253,9 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
249253
synchronized (this) {
250254
reportedFailures = new ArrayList<>(failures);
251255
}
256+
System.out.println(reportedFailures);
252257
builder.add(channel, solace.isConnected() && alive.get(),
253258
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
254-
failures.removeAll(reportedFailures);
255259
} else {
256260
builder.add(channel, solace.isConnected() && alive.get());
257261
}

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private SolaceConsumerTest() {
4646
@Test
4747
@Order(1)
4848
void consumer() {
49-
MapBasedConfig config = new MapBasedConfig()
49+
MapBasedConfig config = commonConfig()
5050
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
5151
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
5252
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
@@ -74,7 +74,7 @@ void consumer() {
7474
@Test
7575
@Order(2)
7676
void consumerReplay() {
77-
MapBasedConfig config = new MapBasedConfig()
77+
MapBasedConfig config = commonConfig()
7878
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
7979
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
8080
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
@@ -94,7 +94,7 @@ void consumerReplay() {
9494
@Test
9595
@Order(3)
9696
void consumerWithSelectorQuery() {
97-
MapBasedConfig config = new MapBasedConfig()
97+
MapBasedConfig config = commonConfig()
9898
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
9999
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
100100
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
@@ -124,7 +124,7 @@ void consumerWithSelectorQuery() {
124124
@Test
125125
@Order(4)
126126
void consumerFailedProcessingPublishToErrorTopic() {
127-
MapBasedConfig config = new MapBasedConfig()
127+
MapBasedConfig config = commonConfig()
128128
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
129129
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
130130
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
@@ -158,7 +158,7 @@ void consumerFailedProcessingPublishToErrorTopic() {
158158
@Test
159159
@Order(5)
160160
void consumerFailedProcessingMoveToDMQ() {
161-
MapBasedConfig config = new MapBasedConfig()
161+
MapBasedConfig config = commonConfig()
162162
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
163163
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
164164
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
@@ -193,7 +193,7 @@ void consumerFailedProcessingMoveToDMQ() {
193193
@Test
194194
@Order(6)
195195
void partitionedQueue() {
196-
MapBasedConfig config = new MapBasedConfig()
196+
MapBasedConfig config = commonConfig()
197197
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
198198
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
199199
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
@@ -262,7 +262,7 @@ void partitionedQueue() {
262262
@Test
263263
@Order(7)
264264
void consumerPublishToErrorTopicPermissionException() {
265-
MapBasedConfig config = new MapBasedConfig()
265+
MapBasedConfig config = commonConfig()
266266
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
267267
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
268268
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
@@ -340,7 +340,7 @@ void consumerGracefulCloseTest() {
340340
@Test
341341
@Order(9)
342342
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
343-
MapBasedConfig config = new MapBasedConfig()
343+
MapBasedConfig config = commonConfig()
344344
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
345345
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
346346
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class SolaceProcessorTest extends WeldTestBase {
2828
@Test
2929
void consumer() {
3030
String processedTopic = topic + "/processed";
31-
MapBasedConfig config = new MapBasedConfig()
31+
MapBasedConfig config = commonConfig()
3232
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
3333
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
3434
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolacePublisherTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class SolacePublisherTest extends WeldTestBase {
3030

3131
@Test
3232
void publisher() {
33-
MapBasedConfig config = new MapBasedConfig()
33+
MapBasedConfig config = commonConfig()
3434
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
3535
.with("mp.messaging.outgoing.out.producer.topic", topic);
3636

@@ -53,7 +53,7 @@ void publisher() {
5353

5454
@Test
5555
void publisherWithDynamicDestination() {
56-
MapBasedConfig config = new MapBasedConfig()
56+
MapBasedConfig config = commonConfig()
5757
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
5858
.with("mp.messaging.outgoing.out.producer.topic", topic);
5959

@@ -80,7 +80,7 @@ void publisherWithDynamicDestination() {
8080

8181
@Test
8282
void publisherWithBackPressureReject() {
83-
MapBasedConfig config = new MapBasedConfig()
83+
MapBasedConfig config = commonConfig()
8484
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
8585
.with("mp.messaging.outgoing.out.producer.topic", topic)
8686
.with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1);

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static SolaceContainer createSolaceContainer() {
4444
public void startSolaceBroker() {
4545
solace = createSolaceContainer()
4646
.withCredentials("user", "pass")
47-
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
47+
.withExposedPorts(SolaceContainer.Service.SMF.getPort(), 8080)
4848
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
4949
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
5050
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/WeldTestBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,9 @@ public boolean isAlive() {
142142
return getHealth().getLiveness().isOk();
143143
}
144144

145+
public MapBasedConfig commonConfig() {
146+
return new MapBasedConfig()
147+
.with("mp.messaging.connector.quarkus-solace.client.graceful-shutdown", false);
148+
}
149+
145150
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.smallrye.reactive.messaging.health.HealthReport;
2323
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
2424

25-
public class SolacePublisherHealthCheck extends WeldTestBase {
25+
public class SolacePublisherHealthTest extends WeldTestBase {
2626
@Test
2727
void publisherHealthCheck() {
2828
MapBasedConfig config = new MapBasedConfig()
@@ -73,7 +73,9 @@ void publisherLivenessCheck() {
7373
// Run app that publish messages
7474
MyApp app = runApplication(config, MyApp.class);
7575

76-
await().until(() -> isStarted() && isReady());
76+
await().until(() -> isStarted() && isReady() && !isAlive());
77+
78+
await().until(() -> !isAlive());
7779

7880
HealthReport startup = getHealth().getStartup();
7981
HealthReport liveness = getHealth().getLiveness();
@@ -94,7 +96,6 @@ static class MyApp {
9496

9597
@Outgoing("out")
9698
Multi<Message<String>> out() {
97-
9899
return Multi.createFrom().items("1", "2", "3", "4", "5")
99100
.map(payload -> Message.of(payload).withAck(() -> {
100101
acked.add(payload);

quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
public class LocalPropagationAckTest extends WeldTestBase {
2929

3030
private MapBasedConfig dataconfig() {
31-
return new MapBasedConfig()
31+
return commonConfig()
3232
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
3333
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
3434
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")

0 commit comments

Comments
 (0)