Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add container init mixed txn tests #673

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

Expand All @@ -37,17 +33,16 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactoryCustomizer;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.BatchListenerWithCommit.BatchListenerWithCommitConfig;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.BatchListenerWithRollback.BatchListenerWithRollbackConfig;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.ListenerWithExternalTransaction.ListenerWithExternalTransactionConfig;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.ListenerWithExternalTransactionRollback.ListenerWithExternalTransactionRollbackConfig;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.RecordListenerWithCommit.RecordListenerWithCommitConfig;
import org.springframework.pulsar.listener.PulsarListenerTxnTests.RecordListenerWithRollback.RecordListenerWithRollbackConfig;
import org.springframework.pulsar.test.support.PulsarConsumerTestUtil;
import org.springframework.pulsar.transaction.PulsarTxnTestsBase;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -58,36 +53,6 @@
*/
class PulsarListenerTxnTests extends PulsarTxnTestsBase {

private void assertNoMessagesAvailableInOutputTopic(String topicOut) {
assertThat(PulsarConsumerTestUtil.<String>consumeMessages(pulsarClient)
.fromTopic(topicOut)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(7))
.get()).isEmpty();
}

private void assertMessagesAvailableInOutputTopic(String topicOut, String... expectedMessages) {
this.assertMessagesAvailableInOutputTopic(topicOut, Arrays.stream(expectedMessages).toList());
}

private void assertMessagesAvailableInOutputTopic(String topicOut, List<String> expectedMessages) {
assertThat(PulsarConsumerTestUtil.<String>consumeMessages(pulsarClient)
.fromTopic(topicOut)
.withSchema(Schema.STRING)
.awaitAtMost(Duration.ofSeconds(5))
.get()).map(Message::getValue).containsExactlyInAnyOrderElementsOf(expectedMessages);
}

private PulsarTemplate<String> newNonTransactionalTemplate(boolean sendInBatch, int numMessages) {
List<ProducerBuilderCustomizer<String>> customizers = List.of();
if (sendInBatch) {
customizers = List.of((pb) -> pb.enableBatching(true)
.batchingMaxPublishDelay(2, TimeUnit.SECONDS)
.batchingMaxMessages(numMessages));
}
return new PulsarTemplate<>(new DefaultPulsarProducerFactory<>(pulsarClient, null, customizers));
}

@Nested
@ContextConfiguration(classes = ListenerWithExternalTransactionConfig.class)
class ListenerWithExternalTransaction {
Expand All @@ -101,7 +66,7 @@ void producedMessageIsCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
nonTransactionalTemplate.send(topicIn, "msg1");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertMessagesAvailableInOutputTopic(topicOut, "msg1-out");
assertThatMessagesAreInTopic(topicOut, "msg1-out");
}

@EnablePulsar
Expand Down Expand Up @@ -135,7 +100,7 @@ void producedMessageIsNotCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
nonTransactionalTemplate.send(topicIn, "msg1");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertNoMessagesAvailableInOutputTopic(topicOut);
assertThatMessagesAreNotInTopic(topicOut, "msg1-out");
}

@EnablePulsar
Expand Down Expand Up @@ -170,7 +135,7 @@ void producedMessageIsCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
nonTransactionalTemplate.send(topicIn, "msg1");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertMessagesAvailableInOutputTopic(topicOut, "msg1-out");
assertThatMessagesAreInTopic(topicOut, "msg1-out");
}

@EnablePulsar
Expand Down Expand Up @@ -203,7 +168,7 @@ void producedMessageIsNotCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
nonTransactionalTemplate.send(topicIn, "msg1");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertNoMessagesAvailableInOutputTopic(topicOut);
assertThatMessagesAreNotInTopic(topicOut, "msg1-out");
}

@EnablePulsar
Expand Down Expand Up @@ -238,8 +203,8 @@ void producedMessagesAreCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(true, inputMsgs.size());
inputMsgs.forEach((msg) -> nonTransactionalTemplate.sendAsync(topicIn, msg));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
var outputMsgs = inputMsgs.stream().map((m) -> m.concat("-out")).toList();
assertMessagesAvailableInOutputTopic(topicOut, outputMsgs);
var outputMsgs = inputMsgs.stream().map((m) -> m.concat("-out")).toArray(String[]::new);
assertThatMessagesAreInTopic(topicOut, outputMsgs);
}

@EnablePulsar
Expand Down Expand Up @@ -275,7 +240,8 @@ void producedMessagesAreNotCommitted() throws Exception {
var nonTransactionalTemplate = newNonTransactionalTemplate(true, inputMsgs.size());
inputMsgs.forEach((msg) -> nonTransactionalTemplate.sendAsync(topicIn, msg));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertNoMessagesAvailableInOutputTopic(topicOut);
var outputMsgs = inputMsgs.stream().map((m) -> m.concat("-out")).toArray(String[]::new);
assertThatMessagesAreNotInTopic(topicOut, outputMsgs);
}

@EnablePulsar
Expand Down Expand Up @@ -306,8 +272,9 @@ void throwsExceptionWhenTransactionsAreRequired() {
assertThatIllegalStateException().isThrownBy(() -> {
var context = new AnnotationConfigApplicationContext();
context.register(TopLevelConfig.class, TransactionsDisabledOnListenerConfig.class);
context.registerBean("containerPropsRequiredCustomizer", PulsarContainerPropertiesCustomizer.class,
() -> (c) -> c.transactions().setRequired(true));
context.registerBean("containerPropsRequiredCustomizer",
ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> (cf) -> cf.getContainerProperties().transactions().setRequired(true));
context.refresh();
}).withMessage("Listener w/ id [%s] requested no transactions but txn are required".formatted(LISTENER_ID));
}
Expand All @@ -316,8 +283,9 @@ void throwsExceptionWhenTransactionsAreRequired() {
void disablesTransactionsWhenTransactionsAreNotRequired() {
try (var context = new AnnotationConfigApplicationContext()) {
context.register(TopLevelConfig.class, TransactionsDisabledOnListenerConfig.class);
context.registerBean("containerPropsNotRequiredCustomizer", PulsarContainerPropertiesCustomizer.class,
() -> (c) -> c.transactions().setRequired(false));
context.registerBean("containerPropsNotRequiredCustomizer",
ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> (cf) -> cf.getContainerProperties().transactions().setRequired(false));
context.refresh();
var container = context.getBean(PulsarListenerEndpointRegistry.class).getListenerContainer(LISTENER_ID);
assertThat(container).isNotNull();
Expand Down Expand Up @@ -348,8 +316,9 @@ void ignoresSettingWhenNoTxnManagerAvailable() {
assertThatException().isThrownBy(() -> {
var context = new AnnotationConfigApplicationContext();
context.register(TopLevelConfig.class, TransactionsEnabledOnListenerConfig.class);
context.registerBean("removeTxnManagerCustomizer", PulsarContainerPropertiesCustomizer.class,
() -> (c) -> c.transactions().setTransactionManager(null));
context.registerBean("removeTxnManagerCustomizer",
ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> (cf) -> cf.getContainerProperties().transactions().setTransactionManager(null));
context.refresh();
})
.withCauseInstanceOf(IllegalStateException.class)
Expand All @@ -361,8 +330,9 @@ void ignoresSettingWhenNoTxnManagerAvailable() {
void enablesTransactionsWhenTxnManagerAvailable() {
try (var context = new AnnotationConfigApplicationContext()) {
context.register(TopLevelConfig.class, TransactionsEnabledOnListenerConfig.class);
context.registerBean("containerPropsNotRequiredCustomizer", PulsarContainerPropertiesCustomizer.class,
() -> (c) -> c.transactions().setEnabled(false));
context.registerBean("containerPropsNotRequiredCustomizer",
ConcurrentPulsarListenerContainerFactoryCustomizer.class,
() -> (cf) -> cf.getContainerProperties().transactions().setEnabled(false));
context.refresh();
var container = context.getBean(PulsarListenerEndpointRegistry.class).getListenerContainer(LISTENER_ID);
assertThat(container).isNotNull();
Expand Down

This file was deleted.

Loading
Loading