Skip to content

Commit

Permalink
Close created client in PulsarConsumerTestUtil (#618)
Browse files Browse the repository at this point in the history
  • Loading branch information
onobc authored Mar 18, 2024
1 parent 9bc6bda commit 952c8ba
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.PulsarException;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarConsumerFactory;
Expand All @@ -47,6 +49,10 @@
*/
public final class PulsarConsumerTestUtil<T> implements TopicSpec<T>, SchemaSpec<T>, ConditionsSpec<T> {

private static final LogAccessor LOG = new LogAccessor(PulsarConsumerTestUtil.class);

private final PulsarClient locallyCreatedPulsarClient;

private final PulsarConsumerFactory<T> consumerFactory;

private ConsumedMessagesCondition<T> condition;
Expand Down Expand Up @@ -82,7 +88,9 @@ public static <T> TopicSpec<T> consumeMessages() {
public static <T> TopicSpec<T> consumeMessages(String url) {
Assert.notNull(url, "url must not be null");
try {
return PulsarConsumerTestUtil.consumeMessages(PulsarClient.builder().serviceUrl(url).build());
var pulsarClient = PulsarClient.builder().serviceUrl(url).build();
return PulsarConsumerTestUtil.consumeMessagesInternal(pulsarClient,
new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
}
catch (PulsarClientException ex) {
throw new PulsarException(ex);
Expand All @@ -97,7 +105,8 @@ public static <T> TopicSpec<T> consumeMessages(String url) {
*/
public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
Assert.notNull(pulsarClient, "pulsarClient must not be null");
return PulsarConsumerTestUtil.consumeMessages(new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
return PulsarConsumerTestUtil.consumeMessagesInternal(null,
new DefaultPulsarConsumerFactory<>(pulsarClient, List.of()));
}

/**
Expand All @@ -107,12 +116,19 @@ public static <T> TopicSpec<T> consumeMessages(PulsarClient pulsarClient) {
* @return the {@link TopicSpec topic step} of the builder
*/
public static <T> TopicSpec<T> consumeMessages(PulsarConsumerFactory<T> pulsarConsumerFactory) {
return new PulsarConsumerTestUtil<>(pulsarConsumerFactory);
return PulsarConsumerTestUtil.consumeMessagesInternal(null, pulsarConsumerFactory);
}

private static <T> TopicSpec<T> consumeMessagesInternal(PulsarClient locallyCreatedPulsarClient,
PulsarConsumerFactory<T> pulsarConsumerFactory) {
return new PulsarConsumerTestUtil<>(locallyCreatedPulsarClient, pulsarConsumerFactory);
}

private PulsarConsumerTestUtil(PulsarConsumerFactory<T> consumerFactory) {
private PulsarConsumerTestUtil(@Nullable PulsarClient locallyCreatedPulsarClient,
PulsarConsumerFactory<T> consumerFactory) {
Assert.notNull(consumerFactory, "PulsarConsumerFactory must not be null");
this.consumerFactory = consumerFactory;
this.locallyCreatedPulsarClient = locallyCreatedPulsarClient;
}

@Override
Expand Down Expand Up @@ -173,6 +189,16 @@ public List<Message<T>> get() {
catch (PulsarClientException ex) {
throw new PulsarException(ex);
}
finally {
if (this.locallyCreatedPulsarClient != null && !this.locallyCreatedPulsarClient.isClosed()) {
try {
this.locallyCreatedPulsarClient.close();
}
catch (PulsarClientException e) {
LOG.error(e, () -> "Failed to close locally created Pulsar client due to: " + e.getMessage());
}
}
}
if (this.condition != null && !this.condition.meets(messages)) {
throw new ConditionTimeoutException(
"Condition was not met within %d seconds".formatted(timeout.toSeconds()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -78,13 +79,13 @@ void cleanupFromTest() throws PulsarClientException {
void whenConditionIsSpecifiedThenMessagesConsumedUntilConditionMet() {
var topic = testTopic("cond");
IntStream.range(0, 5).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
var msgs = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
var consumerTestUtil = PulsarConsumerTestUtil.consumeMessages(pulsarConsumerFactory)
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(desiredMessageCount(3))
.get();
assertThat(msgs).hasSize(3);
.until(desiredMessageCount(3));
assertThat(consumerTestUtil.get()).hasSize(3);
assertThatLocallyCreatedClientIsNull(consumerTestUtil);
}

@Test
Expand Down Expand Up @@ -132,17 +133,18 @@ void whenConditionNotMetWithinAwaitDurationThenExceptionIsThrown() {
void consumeMessagesWithNoArgsUsesPulsarContainerIfAvailable() {
var topic = testTopic("no-arg");
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
var msgs = PulsarConsumerTestUtil.<String>consumeMessages()
var consumerTestUtil = PulsarConsumerTestUtil.<String>consumeMessages()
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(desiredMessageCount(2))
.get();
assertThat(msgs).hasSize(2);
.until(desiredMessageCount(2));
assertThat(consumerTestUtil.get()).hasSize(2);
assertThatLocallyCreatedClientIsClosed(consumerTestUtil);
}

@Test
void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() {
// @formatter::off
try (MockedStatic<PulsarTestContainerSupport> containerSupport = Mockito
.mockStatic(PulsarTestContainerSupport.class)) {
containerSupport.when(PulsarTestContainerSupport::isContainerStarted).thenReturn(false);
Expand All @@ -155,32 +157,46 @@ void consumeMessagesWithNoArgsUsesDefaultUrlWhenPulsarContainerNotAvailable() {
.get())
.withStackTraceContaining("Connection refused: localhost");
}
// @formatter:on
}

@Test
void consumeMessagesWithBrokerUrl() {
var topic = testTopic("url-arg");
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl())
var consumerTestUtil = PulsarConsumerTestUtil
.<String>consumeMessages(PulsarTestContainerSupport.getPulsarBrokerUrl())
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(desiredMessageCount(2))
.get();
assertThat(msgs).hasSize(2);
.until(desiredMessageCount(2));
assertThat(consumerTestUtil.get()).hasSize(2);
assertThatLocallyCreatedClientIsClosed(consumerTestUtil);
}

@Test
void consumeMessagesWithPulsarClient() {
var topic = testTopic("client-arg");
IntStream.range(0, 2).forEach(i -> pulsarTemplate.send(topic, "message-" + i));
var msgs = PulsarConsumerTestUtil.<String>consumeMessages(this.pulsarClient)
var consumerTestUtil = PulsarConsumerTestUtil.<String>consumeMessages(this.pulsarClient)
.fromTopic(topic)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.until(desiredMessageCount(2))
.get();
assertThat(msgs).hasSize(2);
.until(desiredMessageCount(2));
assertThat(consumerTestUtil.get()).hasSize(2);
assertThatLocallyCreatedClientIsNull(consumerTestUtil);
}

private void assertThatLocallyCreatedClientIsNull(ConditionsSpec<?> consumerTestUtil) {
assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient").isNull();
}

private void assertThatLocallyCreatedClientIsClosed(ConditionsSpec<?> consumerTestUtil) {
assertThat(consumerTestUtil).extracting("locallyCreatedPulsarClient")
.isNotNull()
.asInstanceOf(InstanceOfAssertFactories.type(PulsarClient.class))
.extracting(PulsarClient::isClosed)
.isEqualTo(Boolean.TRUE);
}

@Test
Expand Down

0 comments on commit 952c8ba

Please sign in to comment.