From 1d79943cb3861828483013c3ccccc06682a312e6 Mon Sep 17 00:00:00 2001
From: xb205 <62425964+devxb@users.noreply.github.com>
Date: Thu, 15 Feb 2024 01:47:07 +0900
Subject: [PATCH] =?UTF-8?q?refactor:=20=EB=A1=9C=EC=BB=AC=20=EB=A1=9C?=
=?UTF-8?q?=EC=A7=81=EC=9D=B4=20=EC=8B=A4=ED=8C=A8=ED=95=98=EB=A9=B4=20?=
=?UTF-8?q?=EB=B6=84=EC=82=B0=ED=8A=B8=EB=9E=9C=EC=9E=AD=EC=85=98=EB=8F=84?=
=?UTF-8?q?=20ack=EB=90=98=EC=A7=80=20=EC=95=8A=EB=8F=84=EB=A1=9D=20?=
=?UTF-8?q?=EC=88=98=EC=A0=95=20(#22)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* fix: interval 과정중 에러 방출시 무시하고 계속 interval 되도록 수정한다
* refactor: 로컬 로직이 실패하면 분산트랜잭션도 ack되지 않도록 수정한다
* test: TransactionHandlerAssertions을 사용하고 항상 초기화하도록 수정한다
---
README.md | 21 +--
.../netx/api/TransactionCommitEvent.kt | 9 +-
.../netx/api/TransactionCommitHandler.kt | 5 +
.../org/rooftop/netx/api/TransactionEvent.kt | 7 +
.../rooftop/netx/api/TransactionJoinEvent.kt | 9 +-
.../netx/api/TransactionJoinHandler.kt | 5 +
.../netx/api/TransactionRollbackEvent.kt | 9 +-
.../netx/api/TransactionRollbackHandler.kt | 5 +
.../rooftop/netx/api/TransactionStartEvent.kt | 9 +-
.../netx/api/TransactionStartHandler.kt | 5 +
.../engine/AbstractTransactionDispatcher.kt | 129 ++++++++++--------
.../engine/AbstractTransactionListener.kt | 19 +++
.../netx/engine/AbstractTransactionManager.kt | 4 +-
.../AbstractTransactionRetrySupporter.kt | 7 +-
.../EnableDistributedTransaction.kt | 2 +-
.../rooftop/netx/meta/TransactionHandler.kt | 8 ++
.../redis/RedisStreamTransactionDispatcher.kt | 105 +++++++++-----
.../redis/RedisStreamTransactionListener.kt | 47 +++++++
.../redis/RedisStreamTransactionManager.kt | 6 +-
.../redis/RedisStreamTransactionRemover.kt | 52 ++++---
.../netx/redis/RedisTransactionConfigurer.kt | 24 +++-
.../redis/RedisTransactionRetrySupporter.kt | 5 +-
.../META-INF/spring/spring.factories | 2 +-
.../org/rooftop/netx/redis/EventCapture.kt | 24 ----
.../NoAckRedisStreamTransactionDispatcher.kt | 100 ++++++++------
.../redis/NoAckRedisTransactionConfigurer.kt | 34 +++--
.../RedisStreamTransactionManagerTest.kt | 18 +--
.../RedisStreamTransactionRemoverTest.kt | 23 ++--
.../RedisTransactionRetrySupporterTest.kt | 7 +
.../redis/TransactionHandlerAssertions.kt | 60 ++++++++
30 files changed, 503 insertions(+), 257 deletions(-)
create mode 100644 src/main/kotlin/org/rooftop/netx/api/TransactionCommitHandler.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/api/TransactionJoinHandler.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/api/TransactionRollbackHandler.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/api/TransactionStartHandler.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
rename src/main/kotlin/org/rooftop/netx/{autoconfig => meta}/EnableDistributedTransaction.kt (88%)
create mode 100644 src/main/kotlin/org/rooftop/netx/meta/TransactionHandler.kt
create mode 100644 src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
delete mode 100644 src/test/kotlin/org/rooftop/netx/redis/EventCapture.kt
create mode 100644 src/test/kotlin/org/rooftop/netx/redis/TransactionHandlerAssertions.kt
diff --git a/README.md b/README.md
index 5008b78..d427bc5 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-![version 0.1.4](https://img.shields.io/badge/version-0.1.4-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/jdk-17-orange?labelColor=black&style=flat-square)
+![version 0.1.5](https://img.shields.io/badge/version-0.1.5-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)
@@ -102,29 +102,30 @@ fun exists(param: Any): Mono {
#### Scenario4. Handle transaction event
-다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 호출한 메소드에 맞는 트랜잭션 이벤트를
-발행합니다.
-이 이벤트들을 핸들링 함으로써, 다른서버에서 발생한 에러등을 수신하고 롤백할 수 있습니다.
-_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._
-
+다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 트랜잭션 상태에 맞는 핸들러를 호출합니다.
+이 핸들러를 구현함으로써, 트랜잭션별 상태를 처리할 수 있습니다. (롤백등)
+_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._
+> [!WARNING]
+> 트랜잭션 핸들러는 반드시 핸들러에 맞는 `TransactionEvent` **하나**만을 파라미터로 받아야 합니다.
+
```kotlin
-@EventListener(TransactionStartEvent::class)
+@TransactionStartHandler
fun handleTransactionStartEvent(event: TransactionStartEvent) {
// ...
}
-@EventListener(TransactionJoinEvent::class)
+@TransactionJoinHandler
fun handleTransactionJoinEvent(event: TransactionJoinEvent) {
// ...
}
-@EventListener(TransactionCommitEvent::class)
+@TransactionCommitHandler
fun handleTransactionCommitEvent(event: TransactionCommitEvent) {
// ...
}
-@EventListener(TransactionRollbackEvent::class)
+@TransactionRollbackHandler
fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) {
// ...
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
index ef5e477..5f21b8c 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
@@ -1,6 +1,7 @@
package org.rooftop.netx.api
-data class TransactionCommitEvent(
- val transactionId: String,
- val nodeName: String,
-)
+class TransactionCommitEvent(
+ transactionId: String,
+ nodeName: String,
+ group: String,
+): TransactionEvent(transactionId, nodeName, group)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitHandler.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitHandler.kt
new file mode 100644
index 0000000..65fbd58
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitHandler.kt
@@ -0,0 +1,5 @@
+package org.rooftop.netx.api
+
+@Target(AnnotationTarget.FUNCTION)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TransactionCommitHandler
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
new file mode 100644
index 0000000..fa0121f
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
@@ -0,0 +1,7 @@
+package org.rooftop.netx.api
+
+abstract class TransactionEvent(
+ val transactionId: String,
+ val nodeName: String,
+ val group: String,
+)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
index 9e46e09..6932dc0 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
@@ -1,6 +1,7 @@
package org.rooftop.netx.api
-data class TransactionJoinEvent(
- val transactionId: String,
- val nodeName: String,
-)
+class TransactionJoinEvent(
+ transactionId: String,
+ nodeName: String,
+ group: String,
+): TransactionEvent(transactionId, nodeName, group)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinHandler.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinHandler.kt
new file mode 100644
index 0000000..dd39bd9
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinHandler.kt
@@ -0,0 +1,5 @@
+package org.rooftop.netx.api
+
+@Target(AnnotationTarget.FUNCTION)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TransactionJoinHandler
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
index 63de5c6..d0870ff 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
@@ -1,8 +1,9 @@
package org.rooftop.netx.api
-data class TransactionRollbackEvent(
- val transactionId: String,
- val nodeName: String,
+class TransactionRollbackEvent(
+ transactionId: String,
+ nodeName: String,
+ group: String,
val cause: String?,
val undo: String,
-)
+): TransactionEvent(transactionId, nodeName, group)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackHandler.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackHandler.kt
new file mode 100644
index 0000000..f37683c
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackHandler.kt
@@ -0,0 +1,5 @@
+package org.rooftop.netx.api
+
+@Target(AnnotationTarget.FUNCTION)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TransactionRollbackHandler
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
index 738514a..0236b32 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
@@ -1,6 +1,7 @@
package org.rooftop.netx.api
-data class TransactionStartEvent(
- val transactionId: String,
- val nodeName: String,
-)
+class TransactionStartEvent(
+ transactionId: String,
+ nodeName: String,
+ group: String,
+) : TransactionEvent(transactionId, nodeName, group)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartHandler.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionStartHandler.kt
new file mode 100644
index 0000000..6747331
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionStartHandler.kt
@@ -0,0 +1,5 @@
+package org.rooftop.netx.api
+
+@Target(AnnotationTarget.FUNCTION)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TransactionStartHandler
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
index 92e40f5..4f125b1 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
@@ -1,84 +1,99 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.api.TransactionCommitEvent
-import org.rooftop.netx.api.TransactionJoinEvent
-import org.rooftop.netx.api.TransactionRollbackEvent
-import org.rooftop.netx.api.TransactionStartEvent
+import org.rooftop.netx.api.*
import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
-import org.springframework.context.ApplicationEventPublisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.scheduler.Schedulers
+import kotlin.reflect.KFunction
-abstract class AbstractTransactionDispatcher(
- private val eventPublisher: ApplicationEventPublisher,
-) {
+abstract class AbstractTransactionDispatcher {
- fun subscribeStream(transactionId: String): Flux> {
- return receive(transactionId)
- .flatMap { dispatchAndAck(it.first, it.second) }
- }
+ protected val transactionHandlerFunctions =
+ mutableMapOf>, Any>>>()
- protected abstract fun receive(transactionId: String): Flux>
+ protected abstract fun initHandlers()
- fun dispatchAndAck(transaction: Transaction, messageId: String): Flux> {
- return Flux.just(transaction to messageId)
- .dispatch()
- .ack()
+ fun dispatch(transaction: Transaction, messageId: String): Flux {
+ return Mono.just(transaction.state)
+ .filter { state -> transactionHandlerFunctions.containsKey(state) }
+ .flatMapMany { state ->
+ Flux.fromIterable(
+ transactionHandlerFunctions[state]
+ ?: throw cannotFindMatchedHandlerFunctionException
+ )
+ }
+ .flatMap { (function, instance) ->
+ mapToTransactionEvent(transaction)
+ .doOnNext { beforeInvokeHook(transaction, messageId) }
+ .flatMap { function.call(instance, it) }
+ }
+ .doOnComplete {
+ ack(transaction, messageId)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+ }
}
- private fun Flux>.dispatch(): Flux> {
- return this.flatMap { (transaction, messageId) ->
- when (transaction.state) {
- TransactionState.TRANSACTION_STATE_JOIN -> publishJoin(transaction)
- TransactionState.TRANSACTION_STATE_COMMIT -> publishCommit(transaction)
- TransactionState.TRANSACTION_STATE_ROLLBACK -> publishRollback(transaction)
- TransactionState.TRANSACTION_STATE_START -> publishStart(transaction)
- else -> error("Cannot find matched transaction state \"${transaction.state}\"")
- }.map { transaction to messageId }
- }
- }
+ private fun mapToTransactionEvent(transaction: Transaction): Mono {
+ return when (transaction.state) {
+ TransactionState.TRANSACTION_STATE_START -> Mono.just(
+ TransactionStartEvent(
+ transaction.id,
+ transaction.serverId,
+ transaction.group
+ )
+ )
- private fun publishJoin(it: Transaction): Mono {
- return Mono.just(it)
- .doOnNext {
- eventPublisher.publishEvent(
- TransactionJoinEvent(
- it.id,
- it.serverId
- )
+ TransactionState.TRANSACTION_STATE_COMMIT -> Mono.just(
+ TransactionCommitEvent(
+ transaction.id,
+ transaction.serverId,
+ transaction.group,
)
- }
- }
+ )
- private fun publishCommit(it: Transaction): Mono {
- return Mono.just(it)
- .doOnNext { eventPublisher.publishEvent(TransactionCommitEvent(it.id, it.serverId)) }
- }
+ TransactionState.TRANSACTION_STATE_JOIN -> Mono.just(
+ TransactionJoinEvent(
+ transaction.id,
+ transaction.serverId,
+ transaction.group,
+ )
+ )
- private fun publishRollback(transaction: Transaction): Mono {
- return findOwnTransaction(transaction)
- .doOnNext {
- eventPublisher.publishEvent(
+ TransactionState.TRANSACTION_STATE_ROLLBACK -> findOwnTransaction(transaction)
+ .map {
TransactionRollbackEvent(
transaction.id,
transaction.serverId,
+ transaction.group,
transaction.cause,
- it.undo
+ transaction.undo,
)
- )
- }
- .map { transaction }
+ }
+
+ else -> throw cannotFindMatchedTransactionEventException
+ }
}
protected abstract fun findOwnTransaction(transaction: Transaction): Mono
- private fun publishStart(it: Transaction): Mono {
- return Mono.just(it)
- .doOnNext {
- eventPublisher.publishEvent(TransactionStartEvent(it.id, it.serverId))
- }
- }
+ protected abstract fun ack(
+ transaction: Transaction,
+ messageId: String
+ ): Mono>
+
+ protected abstract fun beforeInvokeHook(
+ transaction: Transaction,
+ messageId: String
+ )
- protected abstract fun Flux>.ack(): Flux>
+ private companion object {
+ private val cannotFindMatchedTransactionEventException =
+ java.lang.IllegalStateException("Cannot find matched transaction event")
+
+ private val cannotFindMatchedHandlerFunctionException =
+ IllegalStateException("Cannot find matched handler function")
+ }
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
new file mode 100644
index 0000000..05e31e8
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
@@ -0,0 +1,19 @@
+package org.rooftop.netx.engine
+
+import org.rooftop.netx.idl.Transaction
+import reactor.core.publisher.Flux
+
+abstract class AbstractTransactionListener(
+ private val transactionDispatcher: AbstractTransactionDispatcher,
+) {
+
+ fun subscribeStream(transactionId: String): Flux> {
+ return receive(transactionId)
+ .flatMap { (transaction, messageId) ->
+ transactionDispatcher.dispatch(transaction, messageId)
+ .map { transaction to messageId }
+ }
+ }
+
+ protected abstract fun receive(transactionId: String): Flux>
+}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
index 12a1f10..38ea9ad 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
@@ -12,7 +12,7 @@ abstract class AbstractTransactionManager(
private val nodeGroup: String,
private val nodeName: String,
private val transactionIdGenerator: TransactionIdGenerator = TransactionIdGenerator(nodeId),
- private val transactionDispatcher: AbstractTransactionDispatcher,
+ private val transactionListener: AbstractTransactionListener,
private val transactionRetrySupporter: AbstractTransactionRetrySupporter,
) : TransactionManager {
@@ -58,7 +58,7 @@ abstract class AbstractTransactionManager(
private fun Mono.subscribeTransaction(): Mono {
return this.doOnSuccess {
- transactionDispatcher.subscribeStream(it)
+ transactionListener.subscribeStream(it)
.subscribeOn(Schedulers.parallel())
.subscribe()
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
index 3f063c9..7ad62a6 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
@@ -13,8 +13,11 @@ abstract class AbstractTransactionRetrySupporter(
init {
Flux.interval(recoveryMilli.milliseconds.toJavaDuration())
- .publishOn(Schedulers.parallel())
- .flatMap { handleOrphanTransaction() }
+ .flatMap {
+ handleOrphanTransaction()
+ .onErrorResume { Mono.empty() }
+ }
+ .subscribeOn(Schedulers.parallel())
.subscribe()
}
diff --git a/src/main/kotlin/org/rooftop/netx/autoconfig/EnableDistributedTransaction.kt b/src/main/kotlin/org/rooftop/netx/meta/EnableDistributedTransaction.kt
similarity index 88%
rename from src/main/kotlin/org/rooftop/netx/autoconfig/EnableDistributedTransaction.kt
rename to src/main/kotlin/org/rooftop/netx/meta/EnableDistributedTransaction.kt
index aa5a32e..b36845c 100644
--- a/src/main/kotlin/org/rooftop/netx/autoconfig/EnableDistributedTransaction.kt
+++ b/src/main/kotlin/org/rooftop/netx/meta/EnableDistributedTransaction.kt
@@ -1,4 +1,4 @@
-package org.rooftop.netx.autoconfig
+package org.rooftop.netx.meta
import org.rooftop.netx.redis.RedisTransactionConfigurer
import org.springframework.context.annotation.Import
diff --git a/src/main/kotlin/org/rooftop/netx/meta/TransactionHandler.kt b/src/main/kotlin/org/rooftop/netx/meta/TransactionHandler.kt
new file mode 100644
index 0000000..fc0f6b3
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/meta/TransactionHandler.kt
@@ -0,0 +1,8 @@
+package org.rooftop.netx.meta
+
+import org.springframework.stereotype.Component
+
+@Component
+@Target(AnnotationTarget.CLASS)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class TransactionHandler
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
index 588587a..7ecbce1 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
@@ -1,50 +1,68 @@
package org.rooftop.netx.redis
+import jakarta.annotation.PostConstruct
+import org.rooftop.netx.api.TransactionCommitHandler
+import org.rooftop.netx.api.TransactionJoinHandler
+import org.rooftop.netx.api.TransactionRollbackHandler
+import org.rooftop.netx.api.TransactionStartHandler
import org.rooftop.netx.engine.AbstractTransactionDispatcher
import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
-import org.springframework.context.ApplicationEventPublisher
-import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
-import org.springframework.data.redis.connection.stream.Consumer
+import org.rooftop.netx.meta.TransactionHandler
+import org.springframework.context.ApplicationContext
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.core.ReactiveRedisTemplate
-import org.springframework.data.redis.stream.StreamReceiver
-import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.scheduler.Schedulers
-import kotlin.time.Duration.Companion.hours
-import kotlin.time.toJavaDuration
+import kotlin.reflect.KClass
+import kotlin.reflect.KFunction
+import kotlin.reflect.full.declaredMemberFunctions
class RedisStreamTransactionDispatcher(
- eventPublisher: ApplicationEventPublisher,
- connectionFactory: ReactiveRedisConnectionFactory,
- private val nodeGroup: String,
- private val nodeName: String,
+ private val applicationContext: ApplicationContext,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
-) : AbstractTransactionDispatcher(eventPublisher) {
-
- private val options = StreamReceiver.StreamReceiverOptions.builder()
- .pollTimeout(1.hours.toJavaDuration())
- .build()
+ private val redisStreamTransactionRemover: RedisStreamTransactionRemover,
+ private val nodeGroup: String,
+) : AbstractTransactionDispatcher() {
- private val receiver = StreamReceiver.create(connectionFactory, options)
+ @PostConstruct
+ @Suppress("Unchecked_cast")
+ override fun initHandlers() {
+ val transactionHandler = findHandlers(TransactionHandler::class)
+ transactionHandler.forEach { handler ->
+ handler::class.declaredMemberFunctions
+ .filter { it.returnType.classifier == Mono::class }
+ .forEach { function ->
+ function.annotations
+ .forEach { annotation ->
+ runCatching {
+ val transactionState = matchedTransactionState(annotation)
+ transactionHandlerFunctions.putIfAbsent(
+ transactionState,
+ mutableListOf()
+ )
+ transactionHandlerFunctions[transactionState]?.add(function as KFunction> to handler)
+ }
+ }
+ }
+ }
+ }
- override fun receive(transactionId: String): Flux> {
- return createGroupIfNotExists(transactionId)
- .flatMap {
- receiver.receive(
- Consumer.from(nodeGroup, nodeName),
- StreamOffset.create(transactionId, ReadOffset.from(">"))
- ).publishOn(Schedulers.parallel())
- .map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
- }
+ private fun findHandlers(type: KClass): List {
+ return applicationContext.getBeansWithAnnotation(type.java)
+ .entries.asSequence()
+ .map { it.value }
+ .toList()
}
- private fun createGroupIfNotExists(transactionId: String): Flux {
- return reactiveRedisTemplate.opsForStream()
- .createGroup(transactionId, ReadOffset.from("0"), nodeGroup)
- .flatMapMany { Flux.just(it) }
+ private fun matchedTransactionState(annotation: Annotation): TransactionState {
+ return when (annotation) {
+ is TransactionStartHandler -> TransactionState.TRANSACTION_STATE_START
+ is TransactionCommitHandler -> TransactionState.TRANSACTION_STATE_COMMIT
+ is TransactionJoinHandler -> TransactionState.TRANSACTION_STATE_JOIN
+ is TransactionRollbackHandler -> TransactionState.TRANSACTION_STATE_ROLLBACK
+ else -> throw notMatchedTransactionHandlerException
+ }
}
override fun findOwnTransaction(transaction: Transaction): Mono {
@@ -60,11 +78,24 @@ class RedisStreamTransactionDispatcher(
transaction.state == TransactionState.TRANSACTION_STATE_JOIN
|| transaction.state == TransactionState.TRANSACTION_STATE_START
- override fun Flux>.ack(): Flux> {
- return this.flatMap { (transaction, messageId) ->
- reactiveRedisTemplate.opsForStream()
- .acknowledge(transaction.id, nodeGroup, messageId)
- .flatMapMany { Flux.just(transaction to messageId) }
- }
+ override fun ack(transaction: Transaction, messageId: String): Mono> {
+ return reactiveRedisTemplate.opsForStream()
+ .acknowledge(transaction.id, nodeGroup, messageId)
+ .map { transaction to messageId }
+ .switchIfEmpty(
+ Mono.error {
+ error("Fail to ack transaction transactionId \"${transaction.id}\" messageId \"$messageId\"")
+ }
+ )
+ }
+
+ override fun beforeInvokeHook(
+ transaction: Transaction,
+ messageId: String
+ ) = redisStreamTransactionRemover.deleteElastic(transaction)
+
+ private companion object {
+ private val notMatchedTransactionHandlerException =
+ IllegalStateException("Cannot find matched Transaction handler")
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
new file mode 100644
index 0000000..7252c94
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
@@ -0,0 +1,47 @@
+package org.rooftop.netx.redis
+
+import org.rooftop.netx.engine.AbstractTransactionDispatcher
+import org.rooftop.netx.engine.AbstractTransactionListener
+import org.rooftop.netx.idl.Transaction
+import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
+import org.springframework.data.redis.connection.stream.Consumer
+import org.springframework.data.redis.connection.stream.ReadOffset
+import org.springframework.data.redis.connection.stream.StreamOffset
+import org.springframework.data.redis.core.ReactiveRedisTemplate
+import org.springframework.data.redis.stream.StreamReceiver
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
+import kotlin.time.Duration.Companion.hours
+import kotlin.time.toJavaDuration
+
+class RedisStreamTransactionListener(
+ transactionDispatcher: AbstractTransactionDispatcher,
+ connectionFactory: ReactiveRedisConnectionFactory,
+ private val nodeGroup: String,
+ private val nodeName: String,
+ private val reactiveRedisTemplate: ReactiveRedisTemplate,
+) : AbstractTransactionListener(transactionDispatcher) {
+
+ private val options = StreamReceiver.StreamReceiverOptions.builder()
+ .pollTimeout(1.hours.toJavaDuration())
+ .build()
+
+ private val receiver = StreamReceiver.create(connectionFactory, options)
+
+ override fun receive(transactionId: String): Flux> {
+ return createGroupIfNotExists(transactionId)
+ .flatMap {
+ receiver.receive(
+ Consumer.from(nodeGroup, nodeName),
+ StreamOffset.create(transactionId, ReadOffset.from(">"))
+ ).publishOn(Schedulers.parallel())
+ .map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
+ }
+ }
+
+ private fun createGroupIfNotExists(transactionId: String): Flux {
+ return reactiveRedisTemplate.opsForStream()
+ .createGroup(transactionId, ReadOffset.from("0"), nodeGroup)
+ .flatMapMany { Flux.just(it) }
+ }
+}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
index 2a46bf8..531f5ee 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
@@ -1,6 +1,6 @@
package org.rooftop.netx.redis
-import org.rooftop.netx.engine.AbstractTransactionDispatcher
+import org.rooftop.netx.engine.AbstractTransactionListener
import org.rooftop.netx.engine.AbstractTransactionManager
import org.rooftop.netx.engine.AbstractTransactionRetrySupporter
import org.rooftop.netx.idl.Transaction
@@ -13,14 +13,14 @@ class RedisStreamTransactionManager(
nodeId: Int,
nodeName: String,
nodeGroup: String,
- transactionDispatcher: AbstractTransactionDispatcher,
+ transactionListener: AbstractTransactionListener,
transactionRetrySupporter: AbstractTransactionRetrySupporter,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
) : AbstractTransactionManager(
nodeId = nodeId,
nodeName = nodeName,
nodeGroup = nodeGroup,
- transactionDispatcher = transactionDispatcher,
+ transactionListener = transactionListener,
transactionRetrySupporter = transactionRetrySupporter,
) {
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemover.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemover.kt
index 8316bf1..b7cb544 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemover.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemover.kt
@@ -1,10 +1,10 @@
package org.rooftop.netx.redis
-import org.rooftop.netx.api.TransactionCommitEvent
-import org.rooftop.netx.api.TransactionRollbackEvent
-import org.springframework.context.event.EventListener
+import org.rooftop.netx.idl.Transaction
+import org.rooftop.netx.idl.TransactionState
import org.springframework.data.domain.Range
import org.springframework.data.redis.core.ReactiveRedisTemplate
+import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.retry.RetrySpec
import java.time.Duration
@@ -14,34 +14,32 @@ class RedisStreamTransactionRemover(
private val reactiveRedisTemplate: ReactiveRedisTemplate,
) {
- @EventListener(TransactionCommitEvent::class)
- fun handleTransactionCommitEvent(event: TransactionCommitEvent) {
- deleteElastic(event.transactionId)
- }
-
- @EventListener(TransactionRollbackEvent::class)
- fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) {
- deleteElastic(event.transactionId)
- }
-
- private fun deleteElastic(transactionId: String) {
- reactiveRedisTemplate.opsForStream()
- .pending(transactionId, nodeGroup, Range.closed("-", "+"), Long.MAX_VALUE)
- .filter {
- when (it.get().toList().isEmpty()) {
- true -> true
- false -> error(TRANSACTION_IS_PENDING_STATUS)
- }
- }
- .retryWhen(retryIfTransactionPending)
+ fun deleteElastic(transaction: Transaction) {
+ Mono.just(transaction)
+ .filter { isTransactionEndState(transaction) }
.flatMap {
- reactiveRedisTemplate.opsForSet()
- .remove(nodeGroup, transactionId.toByteArray())
- }
- .subscribeOn(Schedulers.parallel())
+ reactiveRedisTemplate.opsForStream()
+ .pending(transaction.id, nodeGroup, Range.closed("-", "+"), Long.MAX_VALUE)
+ .filter {
+ when (it.get().toList().isEmpty()) {
+ true -> true
+ false -> error(TRANSACTION_IS_PENDING_STATUS)
+ }
+ }
+ .retryWhen(retryIfTransactionPending)
+ .flatMap {
+ reactiveRedisTemplate.opsForSet()
+ .remove(nodeGroup, transaction.id.toByteArray())
+ }
+ }.subscribeOn(Schedulers.parallel())
.subscribe()
}
+ private fun isTransactionEndState(transaction: Transaction): Boolean {
+ return transaction.state == TransactionState.TRANSACTION_STATE_ROLLBACK
+ || transaction.state == TransactionState.TRANSACTION_STATE_COMMIT
+ }
+
companion object {
private const val TRANSACTION_IS_PENDING_STATUS =
"Transaction message remains in pending status."
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
index 9a15925..8497c99 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
@@ -6,7 +6,7 @@ import org.redisson.config.Config
import org.rooftop.netx.api.TransactionManager
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
-import org.springframework.context.ApplicationEventPublisher
+import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
@@ -24,7 +24,7 @@ class RedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
- private val applicationEventPublisher: ApplicationEventPublisher,
+ private val applicationContext: ApplicationContext,
) {
@Bean
@@ -34,16 +34,16 @@ class RedisTransactionConfigurer(
nodeId = nodeId,
nodeName = nodeName,
nodeGroup = nodeGroup,
- transactionDispatcher = redisStreamTransactionDispatcher(),
+ transactionListener = redisStreamTransactionListener(),
transactionRetrySupporter = redisTransactionRetrySupporter(),
reactiveRedisTemplate = reactiveRedisTemplate(),
)
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
- RedisStreamTransactionDispatcher(
- eventPublisher = applicationEventPublisher,
+ fun redisStreamTransactionListener(): RedisStreamTransactionListener =
+ RedisStreamTransactionListener(
+ transactionDispatcher = redisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
nodeName = nodeName,
@@ -65,7 +65,17 @@ class RedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun redisStreamTransactionDeleter(): RedisStreamTransactionRemover =
+ fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
+ RedisStreamTransactionDispatcher(
+ applicationContext = applicationContext,
+ reactiveRedisTemplate = reactiveRedisTemplate(),
+ redisStreamTransactionRemover = redisStreamTransactionRemover(),
+ nodeGroup = nodeGroup,
+ )
+
+ @Bean
+ @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
+ fun redisStreamTransactionRemover(): RedisStreamTransactionRemover =
RedisStreamTransactionRemover(
nodeGroup = nodeGroup,
reactiveRedisTemplate = reactiveRedisTemplate(),
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
index 065c433..99a66f5 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
@@ -34,7 +34,10 @@ class RedisTransactionRetrySupporter(
.members(nodeGroup)
.flatMap { claimTransactions(String(it)) }
.publishOn(Schedulers.parallel())
- .flatMap { transactionDispatcher.dispatchAndAck(it.first, it.second) }
+ .flatMap { (transaction, messageId) ->
+ transactionDispatcher.dispatch(transaction, messageId)
+ .map { transaction to messageId }
+ }
}
private fun claimTransactions(transactionId: String): Flux> {
diff --git a/src/main/resources/META-INF/spring/spring.factories b/src/main/resources/META-INF/spring/spring.factories
index 60c397d..007d7c1 100644
--- a/src/main/resources/META-INF/spring/spring.factories
+++ b/src/main/resources/META-INF/spring/spring.factories
@@ -1,2 +1,2 @@
-org.rooftop.netx.autoconfig.EnableDistributedTransaction=\
+org.rooftop.netx.meta.EnableDistributedTransaction=\
org.rooftop.netx.redis.RedisTransactionConfigurer
diff --git a/src/test/kotlin/org/rooftop/netx/redis/EventCapture.kt b/src/test/kotlin/org/rooftop/netx/redis/EventCapture.kt
deleted file mode 100644
index 54d23d9..0000000
--- a/src/test/kotlin/org/rooftop/netx/redis/EventCapture.kt
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.rooftop.netx.redis
-
-import org.springframework.boot.test.context.TestComponent
-import org.springframework.context.event.EventListener
-import kotlin.reflect.KClass
-
-@TestComponent
-class EventCapture {
-
- private val eventCapture: MutableMap, Long> = mutableMapOf()
-
- fun clear() {
- eventCapture.clear()
- }
-
- fun capturedCount(type: KClass<*>): Long {
- return eventCapture[type] ?: 0
- }
-
- @EventListener(Any::class)
- fun captureEvent(type: Any) {
- eventCapture[type::class] = eventCapture.getOrDefault(type::class, 0) + 1
- }
-}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisStreamTransactionDispatcher.kt b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisStreamTransactionDispatcher.kt
index 6f79e54..a5b504d 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisStreamTransactionDispatcher.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisStreamTransactionDispatcher.kt
@@ -1,58 +1,64 @@
package org.rooftop.netx.redis
+import org.rooftop.netx.api.TransactionCommitHandler
+import org.rooftop.netx.api.TransactionJoinHandler
+import org.rooftop.netx.api.TransactionRollbackHandler
+import org.rooftop.netx.api.TransactionStartHandler
import org.rooftop.netx.engine.AbstractTransactionDispatcher
import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
-import org.springframework.context.ApplicationEventPublisher
-import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
-import org.springframework.data.redis.connection.stream.Consumer
+import org.rooftop.netx.meta.TransactionHandler
+import org.springframework.context.ApplicationContext
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.core.ReactiveRedisTemplate
-import org.springframework.data.redis.stream.StreamReceiver
-import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.scheduler.Schedulers
-import kotlin.time.Duration.Companion.hours
-import kotlin.time.toJavaDuration
+import kotlin.reflect.KClass
+import kotlin.reflect.KFunction
+import kotlin.reflect.full.declaredMemberFunctions
class NoAckRedisStreamTransactionDispatcher(
- eventPublisher: ApplicationEventPublisher,
- connectionFactory: ReactiveRedisConnectionFactory,
- private val nodeGroup: String,
- private val nodeName: String,
+ private val applicationContext: ApplicationContext,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
-) : AbstractTransactionDispatcher(eventPublisher) {
-
- private val options = StreamReceiver.StreamReceiverOptions.builder()
- .pollTimeout(1.hours.toJavaDuration())
- .build()
-
- private val receiver = StreamReceiver.create(connectionFactory, options)
-
- override fun receive(transactionId: String): Flux> {
- return createGroupIfNotExists(transactionId)
- .flatMap {
- receiver.receive(
- Consumer.from(nodeGroup, nodeName),
- StreamOffset.create(transactionId, ReadOffset.from(">"))
- ).publishOn(Schedulers.parallel())
- .map { Transaction.parseFrom(it.value["data"]?.toByteArray()) to it.id.value }
- .flatMap { (transaction, messageId) ->
- when (transaction.state) {
- TransactionState.TRANSACTION_STATE_ROLLBACK ->
- findOwnTransaction(transaction).map { it to messageId }
-
- else -> Mono.just(transaction to messageId)
+ private val nodeGroup: String,
+) : AbstractTransactionDispatcher() {
+ @Suppress("Unchecked_cast")
+ override fun initHandlers() {
+ val transactionHandler = findHandlers(TransactionHandler::class)
+ transactionHandler.forEach { handler ->
+ handler::class.declaredMemberFunctions
+ .filter { it.returnType is Mono<*> }
+ .forEach { function ->
+ function.annotations
+ .forEach { annotation ->
+ runCatching {
+ val transactionState = matchedTransactionState(annotation)
+ val handlerFunctions = transactionHandlerFunctions.getOrDefault(
+ transactionState,
+ mutableListOf()
+ )
+ handlerFunctions.add(function as KFunction> to handler)
+ }
}
- }
- }
+ }
+ }
}
- private fun createGroupIfNotExists(transactionId: String): Flux {
- return reactiveRedisTemplate.opsForStream()
- .createGroup(transactionId, ReadOffset.from("0"), nodeGroup)
- .flatMapMany { Flux.just(it) }
+ private fun findHandlers(type: KClass): List {
+ return applicationContext.getBeansWithAnnotation(type.java)
+ .entries.asSequence()
+ .map { it.value }
+ .toList()
+ }
+
+ private fun matchedTransactionState(annotation: Annotation): TransactionState {
+ return when (annotation) {
+ is TransactionStartHandler -> TransactionState.TRANSACTION_STATE_START
+ is TransactionCommitHandler -> TransactionState.TRANSACTION_STATE_COMMIT
+ is TransactionJoinHandler -> TransactionState.TRANSACTION_STATE_JOIN
+ is TransactionRollbackHandler -> TransactionState.TRANSACTION_STATE_ROLLBACK
+ else -> throw notMatchedTransactionHandlerException
+ }
}
override fun findOwnTransaction(transaction: Transaction): Mono {
@@ -68,6 +74,18 @@ class NoAckRedisStreamTransactionDispatcher(
transaction.state == TransactionState.TRANSACTION_STATE_JOIN
|| transaction.state == TransactionState.TRANSACTION_STATE_START
- override fun Flux>.ack(): Flux> = this
+ override fun ack(transaction: Transaction, messageId: String): Mono> =
+ Mono.just(transaction to messageId)
+
+ override fun beforeInvokeHook(
+ transaction: Transaction,
+ messageId: String
+ ) {
+ }
+
+ private companion object {
+ private val notMatchedTransactionHandlerException =
+ IllegalStateException("Cannot find matched Transaction handler")
+ }
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
index 7894e60..696ba6d 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
@@ -7,7 +7,7 @@ import org.rooftop.netx.api.TransactionManager
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.test.context.TestConfiguration
-import org.springframework.context.ApplicationEventPublisher
+import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
@@ -24,7 +24,7 @@ class NoAckRedisTransactionConfigurer(
@Value("\${netx.node-name}") private val nodeName: String,
@Value("\${netx.recovery-milli:60000}") private val recoveryMilli: Long,
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
- private val applicationEventPublisher: ApplicationEventPublisher,
+ private val applicationContext: ApplicationContext,
) {
@Bean
@@ -34,16 +34,16 @@ class NoAckRedisTransactionConfigurer(
nodeId = nodeId,
nodeName = nodeName,
nodeGroup = nodeGroup,
- transactionDispatcher = noAckRedisStreamTransactionDispatcher(),
+ transactionListener = redisStreamTransactionListener(),
transactionRetrySupporter = redisTransactionRetrySupporter(),
reactiveRedisTemplate = reactiveRedisTemplate(),
)
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
- RedisStreamTransactionDispatcher(
- eventPublisher = applicationEventPublisher,
+ fun redisStreamTransactionListener(): RedisStreamTransactionListener =
+ RedisStreamTransactionListener(
+ transactionDispatcher = noAckRedisStreamTransactionDispatcher(),
connectionFactory = reactiveRedisConnectionFactory(),
nodeGroup = nodeGroup,
nodeName = nodeName,
@@ -54,10 +54,8 @@ class NoAckRedisTransactionConfigurer(
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun noAckRedisStreamTransactionDispatcher(): NoAckRedisStreamTransactionDispatcher =
NoAckRedisStreamTransactionDispatcher(
- eventPublisher = applicationEventPublisher,
- connectionFactory = reactiveRedisConnectionFactory(),
+ applicationContext = applicationContext,
nodeGroup = nodeGroup,
- nodeName = nodeName,
reactiveRedisTemplate = reactiveRedisTemplate()
)
@@ -74,6 +72,24 @@ class NoAckRedisTransactionConfigurer(
recoveryMilli = recoveryMilli,
)
+ @Bean
+ @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
+ fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
+ RedisStreamTransactionDispatcher(
+ applicationContext = applicationContext,
+ reactiveRedisTemplate = reactiveRedisTemplate(),
+ redisStreamTransactionRemover = redisStreamTransactionRemover(),
+ nodeGroup = nodeGroup,
+ )
+
+ @Bean
+ @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
+ fun redisStreamTransactionRemover(): RedisStreamTransactionRemover =
+ RedisStreamTransactionRemover(
+ nodeGroup = nodeGroup,
+ reactiveRedisTemplate = reactiveRedisTemplate(),
+ )
+
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun reactiveRedisTemplate(): ReactiveRedisTemplate {
diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt
index 7884dc5..0d9892c 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManagerTest.kt
@@ -5,7 +5,7 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.shouldBe
import org.rooftop.netx.api.*
-import org.rooftop.netx.autoconfig.EnableDistributedTransaction
+import org.rooftop.netx.meta.EnableDistributedTransaction
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import reactor.test.StepVerifier
@@ -14,19 +14,19 @@ import kotlin.time.Duration.Companion.minutes
@EnableDistributedTransaction
@ContextConfiguration(
classes = [
- EventCapture::class,
RedisContainer::class,
+ TransactionHandlerAssertions::class,
]
)
@DisplayName("RedisStreamTransactionManager 클래스의")
@TestPropertySource("classpath:application.properties")
internal class RedisStreamTransactionManagerTest(
- private val eventCapture: EventCapture,
private val transactionManager: TransactionManager,
+ private val transactionHandlerAssertions: TransactionHandlerAssertions,
) : DescribeSpec({
beforeEach {
- eventCapture.clear()
+ transactionHandlerAssertions.clear()
}
describe("start 메소드는") {
@@ -35,7 +35,7 @@ internal class RedisStreamTransactionManagerTest(
transactionManager.start(REPLAY).subscribe()
eventually(5.minutes) {
- eventCapture.capturedCount(TransactionStartEvent::class) shouldBe 1
+ transactionHandlerAssertions.startCountShouldBe(1)
}
}
}
@@ -46,7 +46,7 @@ internal class RedisStreamTransactionManagerTest(
transactionManager.start(REPLAY).block()
eventually(5.minutes) {
- eventCapture.capturedCount(TransactionStartEvent::class) shouldBe 2
+ transactionHandlerAssertions.startCountShouldBe(2)
}
}
}
@@ -60,7 +60,7 @@ internal class RedisStreamTransactionManagerTest(
transactionManager.join(transactionId, REPLAY).subscribe()
eventually(5.minutes) {
- eventCapture.capturedCount(TransactionJoinEvent::class) shouldBe 1
+ transactionHandlerAssertions.joinCountShouldBe(1)
}
}
}
@@ -106,7 +106,7 @@ internal class RedisStreamTransactionManagerTest(
transactionManager.commit(transactionId).block()
eventually(5.minutes) {
- eventCapture.capturedCount(TransactionCommitEvent::class)
+ transactionHandlerAssertions.commitCountShouldBe(1)
}
}
}
@@ -129,7 +129,7 @@ internal class RedisStreamTransactionManagerTest(
transactionManager.rollback(transactionId, "rollback occured for test").block()
eventually(5.minutes) {
- eventCapture.capturedCount(TransactionRollbackEvent::class) shouldBe 1
+ transactionHandlerAssertions.rollbackCountShouldBe(1)
}
}
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemoverTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemoverTest.kt
index e863714..de4f86c 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemoverTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/RedisStreamTransactionRemoverTest.kt
@@ -3,14 +3,12 @@ package org.rooftop.netx.redis
import io.kotest.assertions.nondeterministic.eventually
import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
-import io.kotest.matchers.shouldBe
-import org.rooftop.netx.api.TransactionCommitEvent
import org.rooftop.netx.api.TransactionManager
-import org.rooftop.netx.api.TransactionRollbackEvent
-import org.rooftop.netx.autoconfig.EnableDistributedTransaction
+import org.rooftop.netx.meta.EnableDistributedTransaction
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import reactor.core.scheduler.Schedulers
+import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
@EnableDistributedTransaction
@@ -18,7 +16,7 @@ import kotlin.time.Duration.Companion.seconds
classes = [
RedisContainer::class,
RedisAssertions::class,
- EventCapture::class,
+ TransactionHandlerAssertions::class,
]
)
@TestPropertySource("classpath:application.properties")
@@ -26,8 +24,13 @@ import kotlin.time.Duration.Companion.seconds
internal class RedisStreamTransactionRemoverTest(
private val redisAssertions: RedisAssertions,
private val transactionManager: TransactionManager,
- private val eventCapture: EventCapture,
+ private val transactionHandlerAssertions: TransactionHandlerAssertions,
) : DescribeSpec({
+
+ beforeEach {
+ transactionHandlerAssertions.clear()
+ }
+
describe("handleTransactionCommitEvent 메소드는") {
context("TransactionCommitEvent 가 발행되면,") {
val transactionId = transactionManager.start("RedisStreamTransactionRemoverTest")
@@ -38,8 +41,8 @@ internal class RedisStreamTransactionRemoverTest(
.subscribeOn(Schedulers.parallel())
.subscribe()
- eventually(10.seconds) {
- eventCapture.capturedCount(TransactionCommitEvent::class) shouldBe 1
+ eventually(5.minutes) {
+ transactionHandlerAssertions.commitCountShouldBe(1)
redisAssertions.retryTransactionShouldBeNotExists(transactionId)
}
}
@@ -52,8 +55,8 @@ internal class RedisStreamTransactionRemoverTest(
it("Transaction 을 retry watch 대기열에서 삭제한다.") {
transactionManager.rollback(transactionId, "rollback occured for test").block()
- eventually(10.seconds) {
- eventCapture.capturedCount(TransactionRollbackEvent::class) shouldBe 1
+ eventually(5.minutes) {
+ transactionHandlerAssertions.rollbackCountShouldBe(1)
redisAssertions.retryTransactionShouldBeNotExists(transactionId)
}
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt b/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
index 147575a..ff7b3e7 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporterTest.kt
@@ -13,6 +13,7 @@ import kotlin.time.Duration.Companion.minutes
RedisContainer::class,
RedisAssertions::class,
NoAckRedisTransactionConfigurer::class,
+ TransactionHandlerAssertions::class,
]
)
@TestPropertySource("classpath:application.properties")
@@ -20,8 +21,13 @@ import kotlin.time.Duration.Companion.minutes
internal class RedisTransactionRetrySupporterTest(
private val redisAssertions: RedisAssertions,
private val transactionManager: TransactionManager,
+ private val transactionHandlerAssertions: TransactionHandlerAssertions,
) : DescribeSpec({
+ beforeEach {
+ transactionHandlerAssertions.clear()
+ }
+
describe("handleOrphanTransaction 메소드는") {
context("pending되었지만, ack되지 않은 트랜잭션이 있다면,") {
it("해당 트랜잭션을 찾아서 처리하고, ack 상태로 변경한다.") {
@@ -30,6 +36,7 @@ internal class RedisTransactionRetrySupporterTest(
Thread.sleep(3_000)
eventually(10.minutes) {
+ transactionHandlerAssertions.startCountShouldBe(1)
redisAssertions.pendingMessageCountShouldBe(transactionId, 0)
}
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/TransactionHandlerAssertions.kt b/src/test/kotlin/org/rooftop/netx/redis/TransactionHandlerAssertions.kt
new file mode 100644
index 0000000..9cfa3a9
--- /dev/null
+++ b/src/test/kotlin/org/rooftop/netx/redis/TransactionHandlerAssertions.kt
@@ -0,0 +1,60 @@
+package org.rooftop.netx.redis
+
+import io.kotest.matchers.shouldBe
+import org.rooftop.netx.api.*
+import org.rooftop.netx.meta.TransactionHandler
+import reactor.core.publisher.Mono
+
+@TransactionHandler
+class TransactionHandlerAssertions {
+
+ private val methodInvocationCounts = mutableMapOf()
+
+ fun clear() {
+ methodInvocationCounts.clear()
+ }
+
+ fun joinCountShouldBe(count: Int) {
+ (methodInvocationCounts["JOIN"] ?: 0) shouldBe count
+ }
+
+ fun startCountShouldBe(count: Int) {
+ (methodInvocationCounts["START"] ?: 0) shouldBe count
+ }
+
+ fun commitCountShouldBe(count: Int) {
+ (methodInvocationCounts["COMMIT"] ?: 0) shouldBe count
+ }
+
+ fun rollbackCountShouldBe(count: Int) {
+ (methodInvocationCounts["ROLLBACK"] ?: 0) shouldBe count
+ }
+
+ @TransactionRollbackHandler
+ fun handleRollback(event: TransactionRollbackEvent): Mono {
+ put("ROLLBACK")
+ return Mono.just(Unit)
+ }
+
+ @TransactionCommitHandler
+ fun handleCommit(event: TransactionCommitEvent): Mono {
+ put("COMMIT")
+ return Mono.just(Unit)
+ }
+
+ @TransactionStartHandler
+ fun handleStart(event: TransactionStartEvent): Mono {
+ put("START")
+ return Mono.just(Unit)
+ }
+
+ @TransactionJoinHandler
+ fun handleJoin(event: TransactionJoinEvent): Mono {
+ put("JOIN")
+ return Mono.just(Unit)
+ }
+
+ private fun put(key: String) {
+ methodInvocationCounts[key] = methodInvocationCounts.getOrDefault(key, 0) + 1
+ }
+}