From 952c8bab5352f0e564138f2f14b36c4ec4c12603 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 18 Mar 2024 13:37:40 -0500 Subject: [PATCH] Close created client in PulsarConsumerTestUtil (#618) --- .../test/support/PulsarConsumerTestUtil.java | 34 +++++++++++-- .../support/PulsarConsumerTestUtilTests.java | 48 ++++++++++++------- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java index 52dd21ff2..87552cab3 100644 --- a/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java +++ b/spring-pulsar-test/src/main/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtil.java @@ -29,6 +29,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.springframework.core.log.LogAccessor; +import org.springframework.lang.Nullable; import org.springframework.pulsar.PulsarException; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.PulsarConsumerFactory; @@ -47,6 +49,10 @@ */ public final class PulsarConsumerTestUtil implements TopicSpec, SchemaSpec, ConditionsSpec { + private static final LogAccessor LOG = new LogAccessor(PulsarConsumerTestUtil.class); + + private final PulsarClient locallyCreatedPulsarClient; + private final PulsarConsumerFactory consumerFactory; private ConsumedMessagesCondition condition; @@ -82,7 +88,9 @@ public static TopicSpec consumeMessages() { public static TopicSpec consumeMessages(String url) { Assert.notNull(url, "url must not be null"); try { - return PulsarConsumerTestUtil.consumeMessages(PulsarClient.builder().serviceUrl(url).build()); + var pulsarClient = PulsarClient.builder().serviceUrl(url).build(); + return PulsarConsumerTestUtil.consumeMessagesInternal(pulsarClient, + new DefaultPulsarConsumerFactory<>(pulsarClient, List.of())); } catch (PulsarClientException ex) { throw new PulsarException(ex); @@ -97,7 +105,8 @@ public static TopicSpec consumeMessages(String url) { */ public static TopicSpec consumeMessages(PulsarClient pulsarClient) { Assert.notNull(pulsarClient, "pulsarClient must not be null"); - return PulsarConsumerTestUtil.consumeMessages(new DefaultPulsarConsumerFactory<>(pulsarClient, List.of())); + return PulsarConsumerTestUtil.consumeMessagesInternal(null, + new DefaultPulsarConsumerFactory<>(pulsarClient, List.of())); } /** @@ -107,12 +116,19 @@ public static TopicSpec consumeMessages(PulsarClient pulsarClient) { * @return the {@link TopicSpec topic step} of the builder */ public static TopicSpec consumeMessages(PulsarConsumerFactory pulsarConsumerFactory) { - return new PulsarConsumerTestUtil<>(pulsarConsumerFactory); + return PulsarConsumerTestUtil.consumeMessagesInternal(null, pulsarConsumerFactory); + } + + private static TopicSpec consumeMessagesInternal(PulsarClient locallyCreatedPulsarClient, + PulsarConsumerFactory pulsarConsumerFactory) { + return new PulsarConsumerTestUtil<>(locallyCreatedPulsarClient, pulsarConsumerFactory); } - private PulsarConsumerTestUtil(PulsarConsumerFactory consumerFactory) { + private PulsarConsumerTestUtil(@Nullable PulsarClient locallyCreatedPulsarClient, + PulsarConsumerFactory consumerFactory) { Assert.notNull(consumerFactory, "PulsarConsumerFactory must not be null"); this.consumerFactory = consumerFactory; + this.locallyCreatedPulsarClient = locallyCreatedPulsarClient; } @Override @@ -173,6 +189,16 @@ public List> get() { catch (PulsarClientException ex) { throw new PulsarException(ex); } + finally { + if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) { + try { + this.locallyCreatedPulsarClient.close(); + } + catch (PulsarClientException e) { + LOG.error(e, () -> "Failed to close locally created Pulsar client due to: " + e.getMessage()); + } + } + } if (this.condition != null && !this.condition.meets(messages)) { throw new ConditionTimeoutException( "Condition was not met within %d seconds".formatted(timeout.toSeconds())); diff --git a/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java index c79caae80..a71ff21ea 100644 --- a/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java +++ b/spring-pulsar-test/src/test/java/org/springframework/pulsar/test/support/PulsarConsumerTestUtilTests.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -78,13 +79,13 @@ void cleanupFromTest() throws PulsarClientException { void whenConditionIsSpecifiedThenMessagesConsumedUntilConditionMet() { var topic = testTopic("cond"); IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i)); - var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) + var consumerTestUtil = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory) .fromTopic(topic) .withSchema(Schema.STRING) .awaitAtMost(Duration.ofSeconds(5)) - .until(desiredMessageCount(3)) - .get(); - assertThat(msgs).hasSize(3); + .until(desiredMessageCount(3)); + assertThat(consumerTestUtil.get()).hasSize(3); + assertThatLocallyCreatedClientIsNull(consumerTestUtil); } @Test @@ -132,17 +133,18 @@ void whenConditionNotMetWithinAwaitDurationThenExceptionIsThrown() { void consumeMessagesWithNoArgsUsesPulsarContainerIfAvailable() { var topic = testTopic("no-arg"); IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i)); - var msgs = PulsarConsumerTestUtil.consumeMessages() + var consumerTestUtil = PulsarConsumerTestUtil.consumeMessages() .fromTopic(topic) .withSchema(Schema.STRING) .awaitAtMost(Duration.ofSeconds(5)) - .until(desiredMessageCount(2)) - .get(); - assertThat(msgs).hasSize(2); + .until(desiredMessageCount(2)); + assertThat(consumerTestUtil.get()).hasSize(2); + assertThatLocallyCreatedClientIsClosed(consumerTestUtil); } @Test void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() { + // @formatter::off try (MockedStatic containerSupport = Mockito .mockStatic(PulsarTestContainerSupport.class)) { containerSupport.when(PulsarTestContainerSupport::isContainerStarted).thenReturn(false); @@ -155,32 +157,46 @@ void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() { .get()) .withStackTraceContaining("Connection refused: localhost"); } + // @formatter:on } @Test void consumeMessagesWithBrokerUrl() { var topic = testTopic("url-arg"); IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i)); - var msgs = PulsarConsumerTestUtil.consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl()) + var consumerTestUtil = PulsarConsumerTestUtil + .consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl()) .fromTopic(topic) .withSchema(Schema.STRING) .awaitAtMost(Duration.ofSeconds(5)) - .until(desiredMessageCount(2)) - .get(); - assertThat(msgs).hasSize(2); + .until(desiredMessageCount(2)); + assertThat(consumerTestUtil.get()).hasSize(2); + assertThatLocallyCreatedClientIsClosed(consumerTestUtil); } @Test void consumeMessagesWithPulsarClient() { var topic = testTopic("client-arg"); IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i)); - var msgs = PulsarConsumerTestUtil.consumeMessages(this.pulsarClient) + var consumerTestUtil = PulsarConsumerTestUtil.consumeMessages(this.pulsarClient) .fromTopic(topic) .withSchema(Schema.STRING) .awaitAtMost(Duration.ofSeconds(5)) - .until(desiredMessageCount(2)) - .get(); - assertThat(msgs).hasSize(2); + .until(desiredMessageCount(2)); + assertThat(consumerTestUtil.get()).hasSize(2); + assertThatLocallyCreatedClientIsNull(consumerTestUtil); + } + + private void assertThatLocallyCreatedClientIsNull(ConditionsSpec consumerTestUtil) { + assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient").isNull(); + } + + private void assertThatLocallyCreatedClientIsClosed(ConditionsSpec consumerTestUtil) { + assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient") + .isNotNull() + .asInstanceOf(InstanceOfAssertFactories.type(PulsarClient.class)) + .extracting(PulsarClient::isClosed) + .isEqualTo(Boolean.TRUE); } @Test