Skip to content

Commit 7b09b71

Browse files
committed
Add tests for container initiated mixed txn
* Adds tests for using a transactional PulsarTemplate from within a `@Transactional` `@PulsarListener` method. The test sends a message to Pulsar and inserts a row in DB and does some form of rollback (or not) to make sure things are as expected. Resolves #661
1 parent 3a2db30 commit 7b09b71

File tree

2 files changed

+200
-14
lines changed

2 files changed

+200
-14
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
* Copyright 2023-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.transaction;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.apache.pulsar.client.api.PulsarClient;
25+
import org.apache.pulsar.common.util.ObjectMapperFactory;
26+
import org.junit.jupiter.api.Nested;
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.jdbc.core.JdbcTemplate;
32+
import org.springframework.pulsar.annotation.PulsarListener;
33+
import org.springframework.pulsar.core.PulsarTemplate;
34+
import org.springframework.pulsar.listener.AckMode;
35+
import org.springframework.pulsar.transaction.PulsarListenerWithDbTransactionTests.WithDbAndPulsarTransactionCommit.WithDbAndPulsarTransactionCommitConfig;
36+
import org.springframework.pulsar.transaction.PulsarListenerWithDbTransactionTests.WithDbTransactionRollback.WithDbTransactionRollbackConfig;
37+
import org.springframework.pulsar.transaction.PulsarListenerWithDbTransactionTests.WithPulsarTransactionRollback.WithPulsarTransactionRollbackConfig;
38+
import org.springframework.test.context.ContextConfiguration;
39+
import org.springframework.transaction.annotation.EnableTransactionManagement;
40+
import org.springframework.transaction.annotation.Transactional;
41+
import org.springframework.transaction.interceptor.TransactionAspectSupport;
42+
43+
/**
44+
* Tests transaction support of {@link PulsarListener} when mixed with database
45+
* transactions.
46+
*
47+
* @author Chris Bono
48+
*/
49+
class PulsarListenerWithDbTransactionTests extends PulsarTxnWithDbTxnTestsBase {
50+
51+
@Nested
52+
@ContextConfiguration(classes = WithDbAndPulsarTransactionCommitConfig.class)
53+
class WithDbAndPulsarTransactionCommit {
54+
55+
static final CountDownLatch latch = new CountDownLatch(1);
56+
static final String topicIn = "plwdbtxn-happy-in";
57+
static final String topicOut = "plwdbtxn-happy-out";
58+
59+
@Test
60+
void whenDbTxnIsCommittedThenMessagesAreCommitted() throws Exception {
61+
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
62+
var thing = new Thing(1L, "msg1");
63+
var thingJson = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(thing);
64+
nonTransactionalTemplate.send(topicIn, thingJson);
65+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
66+
assertThatMessagesAreInTopic(topicOut, thing.name());
67+
assertThatMessagesAreInDb(thing);
68+
}
69+
70+
@EnableTransactionManagement
71+
@Configuration(proxyBeanMethods = false)
72+
static class WithDbAndPulsarTransactionCommitConfig {
73+
74+
@Autowired
75+
private JdbcTemplate jdbcTemplate;
76+
77+
@Autowired
78+
private PulsarTemplate<String> transactionalPulsarTemplate;
79+
80+
@Transactional("dataSourceTransactionManager")
81+
@PulsarListener(topics = topicIn, ackMode = AckMode.RECORD)
82+
void listen(String msgJson) throws Exception {
83+
var thing = ObjectMapperFactory.getMapper().getObjectMapper().readValue(msgJson, Thing.class);
84+
this.transactionalPulsarTemplate.send(topicOut, thing.name());
85+
PulsarTxnWithDbTxnTestsBase.insertThingIntoDb(jdbcTemplate, thing);
86+
latch.countDown();
87+
}
88+
89+
}
90+
91+
}
92+
93+
@Nested
94+
@ContextConfiguration(classes = WithDbTransactionRollbackConfig.class)
95+
class WithDbTransactionRollback {
96+
97+
static final CountDownLatch latch = new CountDownLatch(1);
98+
static final String topicIn = "plwdbtxn-dbr-in";
99+
static final String topicOut = "plwdbtxn-dbr-out";
100+
101+
@Test
102+
void whenDbTxnIsSetRollbackOnlyThenMessageCommittedInPulsarButNotInDb() throws Exception {
103+
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
104+
var thing = new Thing(2L, "msg2");
105+
var thingJson = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(thing);
106+
nonTransactionalTemplate.send(topicIn, thingJson);
107+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
108+
assertThatMessagesAreNotInDb(thing);
109+
assertThatMessagesAreInTopic(topicOut, thing.name());
110+
}
111+
112+
@EnableTransactionManagement
113+
@Configuration(proxyBeanMethods = false)
114+
static class WithDbTransactionRollbackConfig {
115+
116+
@Autowired
117+
private JdbcTemplate jdbcTemplate;
118+
119+
@Autowired
120+
private PulsarTemplate<String> transactionalPulsarTemplate;
121+
122+
@Transactional("dataSourceTransactionManager")
123+
@PulsarListener(topics = topicIn, ackMode = AckMode.RECORD)
124+
void listen(String msgJson) throws Exception {
125+
var thing = ObjectMapperFactory.getMapper().getObjectMapper().readValue(msgJson, Thing.class);
126+
this.transactionalPulsarTemplate.send(topicOut, thing.name());
127+
PulsarTxnWithDbTxnTestsBase.insertThingIntoDb(jdbcTemplate, thing);
128+
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
129+
latch.countDown();
130+
}
131+
132+
}
133+
134+
}
135+
136+
@Nested
137+
@ContextConfiguration(classes = WithPulsarTransactionRollbackConfig.class)
138+
class WithPulsarTransactionRollback {
139+
140+
static final CountDownLatch latch = new CountDownLatch(1);
141+
static final String topicIn = "plwdbtxn-pr-in";
142+
static final String topicOut = "plwdbtxn-pr-out";
143+
144+
@Test
145+
void whenPulsarTxnIsSetRollbackOnlyThenMessageCommittedInDbButNotInPulsar() throws Exception {
146+
var nonTransactionalTemplate = newNonTransactionalTemplate(false, 1);
147+
var thing = new Thing(3L, "msg3");
148+
var thingJson = ObjectMapperFactory.getMapper().getObjectMapper().writeValueAsString(thing);
149+
nonTransactionalTemplate.send(topicIn, thingJson);
150+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
151+
assertThatMessagesAreInDb(thing);
152+
assertThatMessagesAreNotInTopic(topicOut, thing.name());
153+
}
154+
155+
@EnableTransactionManagement
156+
@Configuration(proxyBeanMethods = false)
157+
static class WithPulsarTransactionRollbackConfig {
158+
159+
@Autowired
160+
private JdbcTemplate jdbcTemplate;
161+
162+
@Autowired
163+
private PulsarTemplate<String> transactionalPulsarTemplate;
164+
165+
@Autowired
166+
private PulsarClient pulsarClient;
167+
168+
@Transactional("dataSourceTransactionManager")
169+
@PulsarListener(topics = topicIn, ackMode = AckMode.RECORD)
170+
void listen(String msgJson) throws Exception {
171+
if (latch.getCount() == 0) {
172+
return;
173+
}
174+
var thing = ObjectMapperFactory.getMapper().getObjectMapper().readValue(msgJson, Thing.class);
175+
this.transactionalPulsarTemplate.send(topicOut, thing.name());
176+
PulsarTxnWithDbTxnTestsBase.insertThingIntoDb(jdbcTemplate, thing);
177+
PulsarTransactionUtils.getResourceHolder(this.pulsarClient).setRollbackOnly();
178+
latch.countDown();
179+
}
180+
181+
}
182+
183+
}
184+
185+
}
Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,59 +26,60 @@
2626
import org.springframework.jdbc.core.JdbcTemplate;
2727
import org.springframework.pulsar.PulsarException;
2828
import org.springframework.pulsar.core.PulsarTemplate;
29-
import org.springframework.pulsar.transaction.PulsarMixedTransactionTests.PulsarProducerWithDbTransaction.PulsarProducerWithDbTransactionConfig;
30-
import org.springframework.pulsar.transaction.PulsarMixedTransactionTests.PulsarProducerWithDbTransaction.PulsarProducerWithDbTransactionConfig.ProducerOnlyService;
29+
import org.springframework.pulsar.transaction.PulsarTemplateWithDbTransactionTests.PulsarTemplateSynchronizedWithDbTransaction.PulsarTemplateSynchronizedWithDbTransactionConfig;
30+
import org.springframework.pulsar.transaction.PulsarTemplateWithDbTransactionTests.PulsarTemplateSynchronizedWithDbTransaction.PulsarTemplateSynchronizedWithDbTransactionConfig.TestService;
3131
import org.springframework.stereotype.Service;
3232
import org.springframework.test.context.ContextConfiguration;
3333
import org.springframework.transaction.annotation.EnableTransactionManagement;
3434
import org.springframework.transaction.annotation.Transactional;
3535
import org.springframework.transaction.interceptor.TransactionAspectSupport;
3636

3737
/**
38-
* Tests for Pulsar transaction support with other resource transactions.
38+
* Tests transaction support of {@link PulsarTemplate} when mixed with database
39+
* transactions.
3940
*
4041
* @author Chris Bono
4142
*/
42-
class PulsarMixedTransactionTests extends PulsarTxnWithDbTxnTestsBase {
43+
class PulsarTemplateWithDbTransactionTests extends PulsarTxnWithDbTxnTestsBase {
4344

4445
@Nested
45-
@ContextConfiguration(classes = PulsarProducerWithDbTransactionConfig.class)
46-
class PulsarProducerWithDbTransaction {
46+
@ContextConfiguration(classes = PulsarTemplateSynchronizedWithDbTransactionConfig.class)
47+
class PulsarTemplateSynchronizedWithDbTransaction {
4748

4849
static final String topic = "ppwdbt-topic";
4950

5051
@Test
51-
void whenDbTxnIsCommittedThenMessagesAreCommitted(@Autowired ProducerOnlyService producerService) {
52+
void whenDbTxnIsCommittedThenMessagesAreCommitted(@Autowired TestService transactionalService) {
5253
var thing1 = new Thing(1L, "msg1");
53-
producerService.handleRequest(thing1, false, false);
54+
transactionalService.handleRequest(thing1, false, false);
5455
assertThatMessagesAreInTopic(topic, thing1.name());
5556
assertThatMessagesAreInDb(thing1);
5657
}
5758

5859
@Test
59-
void whenDbTxnIsSetRollbackOnlyThenMessagesAreNotCommitted(@Autowired ProducerOnlyService producerService) {
60+
void whenDbTxnIsSetRollbackOnlyThenMessagesAreNotCommitted(@Autowired TestService transactionalService) {
6061
var thing2 = new Thing(2L, "msg2");
61-
producerService.handleRequest(thing2, true, false);
62+
transactionalService.handleRequest(thing2, true, false);
6263
assertThatMessagesAreNotInTopic(topic, thing2.name());
6364
assertThatMessagesAreNotInDb(thing2);
6465
}
6566

6667
@Test
67-
void whenServiceThrowsExceptionThenMessagesAreNotCommitted(@Autowired ProducerOnlyService producerService) {
68+
void whenServiceThrowsExceptionThenMessagesAreNotCommitted(@Autowired TestService transactionalService) {
6869
var thing3 = new Thing(3L, "msg3");
6970
assertThatExceptionOfType(PulsarException.class)
70-
.isThrownBy(() -> producerService.handleRequest(thing3, false, true))
71+
.isThrownBy(() -> transactionalService.handleRequest(thing3, false, true))
7172
.withMessage("Failed to commit due to chaos");
7273
assertThatMessagesAreNotInTopic(topic, thing3.name());
7374
assertThatMessagesAreNotInDb(thing3);
7475
}
7576

7677
@EnableTransactionManagement
7778
@Configuration
78-
static class PulsarProducerWithDbTransactionConfig {
79+
static class PulsarTemplateSynchronizedWithDbTransactionConfig {
7980

8081
@Service
81-
class ProducerOnlyService {
82+
class TestService {
8283

8384
@Autowired
8485
private JdbcTemplate jdbcTemplate;

0 commit comments

Comments
 (0)