Skip to content

Commit

Permalink
refactor: 로컬 로직이 실패하면 분산트랜잭션도 ack되지 않도록 수정 (#22)
Browse files Browse the repository at this point in the history
* fix: interval 과정중 에러 방출시 무시하고 계속 interval 되도록 수정한다

* refactor: 로컬 로직이 실패하면 분산트랜잭션도 ack되지 않도록 수정한다

* test: TransactionHandlerAssertions을 사용하고 항상 초기화하도록 수정한다
  • Loading branch information
devxb authored Feb 14, 2024
1 parent 12d5ac2 commit 1d79943
Show file tree
Hide file tree
Showing 30 changed files with 503 additions and 257 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<br>


![version 0.1.4](https://img.shields.io/badge/version-0.1.4-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/jdk-17-orange?labelColor=black&style=flat-square)
![version 0.1.5](https://img.shields.io/badge/version-0.1.5-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square)

<img src = "https://github.com/rooftop-MSA/Netx/assets/62425964/5082ef20-10ad-4b6b-bff8-7e78a0f9e01f" width="500" align="right"/>

Expand Down Expand Up @@ -102,29 +102,30 @@ fun exists(param: Any): Mono<Any> {

#### Scenario4. Handle transaction event

다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 호출한 메소드에 맞는 트랜잭션 이벤트를
발행합니다.
이 이벤트들을 핸들링 함으로써, 다른서버에서 발생한 에러등을 수신하고 롤백할 수 있습니다.
_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._

다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 트랜잭션 상태에 맞는 핸들러를 호출합니다.
이 핸들러를 구현함으로써, 트랜잭션별 상태를 처리할 수 있습니다. (롤백등)
_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._
> [!WARNING]
> 트랜잭션 핸들러는 반드시 핸들러에 맞는 `TransactionEvent` **하나**만을 파라미터로 받아야 합니다.
```kotlin

@EventListener(TransactionStartEvent::class)
@TransactionStartHandler
fun handleTransactionStartEvent(event: TransactionStartEvent) {
// ...
}

@EventListener(TransactionJoinEvent::class)
@TransactionJoinHandler
fun handleTransactionJoinEvent(event: TransactionJoinEvent) {
// ...
}

@EventListener(TransactionCommitEvent::class)
@TransactionCommitHandler
fun handleTransactionCommitEvent(event: TransactionCommitEvent) {
// ...
}

@EventListener(TransactionRollbackEvent::class)
@TransactionRollbackHandler
fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) {
// ...
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.rooftop.netx.api

data class TransactionCommitEvent(
val transactionId: String,
val nodeName: String,
)
class TransactionCommitEvent(
transactionId: String,
nodeName: String,
group: String,
): TransactionEvent(transactionId, nodeName, group)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rooftop.netx.api

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionCommitHandler
7 changes: 7 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.rooftop.netx.api

abstract class TransactionEvent(
val transactionId: String,
val nodeName: String,
val group: String,
)
9 changes: 5 additions & 4 deletions src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.rooftop.netx.api

data class TransactionJoinEvent(
val transactionId: String,
val nodeName: String,
)
class TransactionJoinEvent(
transactionId: String,
nodeName: String,
group: String,
): TransactionEvent(transactionId, nodeName, group)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rooftop.netx.api

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionJoinHandler
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.rooftop.netx.api

data class TransactionRollbackEvent(
val transactionId: String,
val nodeName: String,
class TransactionRollbackEvent(
transactionId: String,
nodeName: String,
group: String,
val cause: String?,
val undo: String,
)
): TransactionEvent(transactionId, nodeName, group)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rooftop.netx.api

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionRollbackHandler
9 changes: 5 additions & 4 deletions src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.rooftop.netx.api

data class TransactionStartEvent(
val transactionId: String,
val nodeName: String,
)
class TransactionStartEvent(
transactionId: String,
nodeName: String,
group: String,
) : TransactionEvent(transactionId, nodeName, group)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.rooftop.netx.api

@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionStartHandler
Original file line number Diff line number Diff line change
@@ -1,84 +1,99 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.TransactionCommitEvent
import org.rooftop.netx.api.TransactionJoinEvent
import org.rooftop.netx.api.TransactionRollbackEvent
import org.rooftop.netx.api.TransactionStartEvent
import org.rooftop.netx.api.*
import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
import org.springframework.context.ApplicationEventPublisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KFunction

abstract class AbstractTransactionDispatcher(
private val eventPublisher: ApplicationEventPublisher,
) {
abstract class AbstractTransactionDispatcher {

fun subscribeStream(transactionId: String): Flux<Pair<Transaction, String>> {
return receive(transactionId)
.flatMap { dispatchAndAck(it.first, it.second) }
}
protected val transactionHandlerFunctions =
mutableMapOf<TransactionState, MutableList<Pair<KFunction<Mono<Any>>, Any>>>()

protected abstract fun receive(transactionId: String): Flux<Pair<Transaction, String>>
protected abstract fun initHandlers()

fun dispatchAndAck(transaction: Transaction, messageId: String): Flux<Pair<Transaction, String>> {
return Flux.just(transaction to messageId)
.dispatch()
.ack()
fun dispatch(transaction: Transaction, messageId: String): Flux<Any> {
return Mono.just(transaction.state)
.filter { state -> transactionHandlerFunctions.containsKey(state) }
.flatMapMany { state ->
Flux.fromIterable(
transactionHandlerFunctions[state]
?: throw cannotFindMatchedHandlerFunctionException
)
}
.flatMap { (function, instance) ->
mapToTransactionEvent(transaction)
.doOnNext { beforeInvokeHook(transaction, messageId) }
.flatMap { function.call(instance, it) }
}
.doOnComplete {
ack(transaction, messageId)
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
}
}

private fun Flux<Pair<Transaction, String>>.dispatch(): Flux<Pair<Transaction, String>> {
return this.flatMap { (transaction, messageId) ->
when (transaction.state) {
TransactionState.TRANSACTION_STATE_JOIN -> publishJoin(transaction)
TransactionState.TRANSACTION_STATE_COMMIT -> publishCommit(transaction)
TransactionState.TRANSACTION_STATE_ROLLBACK -> publishRollback(transaction)
TransactionState.TRANSACTION_STATE_START -> publishStart(transaction)
else -> error("Cannot find matched transaction state \"${transaction.state}\"")
}.map { transaction to messageId }
}
}
private fun mapToTransactionEvent(transaction: Transaction): Mono<TransactionEvent> {
return when (transaction.state) {
TransactionState.TRANSACTION_STATE_START -> Mono.just(
TransactionStartEvent(
transaction.id,
transaction.serverId,
transaction.group
)
)

private fun publishJoin(it: Transaction): Mono<Transaction> {
return Mono.just(it)
.doOnNext {
eventPublisher.publishEvent(
TransactionJoinEvent(
it.id,
it.serverId
)
TransactionState.TRANSACTION_STATE_COMMIT -> Mono.just(
TransactionCommitEvent(
transaction.id,
transaction.serverId,
transaction.group,
)
}
}
)

private fun publishCommit(it: Transaction): Mono<Transaction> {
return Mono.just(it)
.doOnNext { eventPublisher.publishEvent(TransactionCommitEvent(it.id, it.serverId)) }
}
TransactionState.TRANSACTION_STATE_JOIN -> Mono.just(
TransactionJoinEvent(
transaction.id,
transaction.serverId,
transaction.group,
)
)

private fun publishRollback(transaction: Transaction): Mono<Transaction> {
return findOwnTransaction(transaction)
.doOnNext {
eventPublisher.publishEvent(
TransactionState.TRANSACTION_STATE_ROLLBACK -> findOwnTransaction(transaction)
.map {
TransactionRollbackEvent(
transaction.id,
transaction.serverId,
transaction.group,
transaction.cause,
it.undo
transaction.undo,
)
)
}
.map { transaction }
}

else -> throw cannotFindMatchedTransactionEventException
}
}

protected abstract fun findOwnTransaction(transaction: Transaction): Mono<Transaction>

private fun publishStart(it: Transaction): Mono<Transaction> {
return Mono.just(it)
.doOnNext {
eventPublisher.publishEvent(TransactionStartEvent(it.id, it.serverId))
}
}
protected abstract fun ack(
transaction: Transaction,
messageId: String
): Mono<Pair<Transaction, String>>

protected abstract fun beforeInvokeHook(
transaction: Transaction,
messageId: String
)

protected abstract fun Flux<Pair<Transaction, String>>.ack(): Flux<Pair<Transaction, String>>
private companion object {
private val cannotFindMatchedTransactionEventException =
java.lang.IllegalStateException("Cannot find matched transaction event")

private val cannotFindMatchedHandlerFunctionException =
IllegalStateException("Cannot find matched handler function")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.rooftop.netx.engine

import org.rooftop.netx.idl.Transaction
import reactor.core.publisher.Flux

abstract class AbstractTransactionListener(
private val transactionDispatcher: AbstractTransactionDispatcher,
) {

fun subscribeStream(transactionId: String): Flux<Pair<Transaction, String>> {
return receive(transactionId)
.flatMap { (transaction, messageId) ->
transactionDispatcher.dispatch(transaction, messageId)
.map { transaction to messageId }
}
}

protected abstract fun receive(transactionId: String): Flux<Pair<Transaction, String>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ abstract class AbstractTransactionManager(
private val nodeGroup: String,
private val nodeName: String,
private val transactionIdGenerator: TransactionIdGenerator = TransactionIdGenerator(nodeId),
private val transactionDispatcher: AbstractTransactionDispatcher,
private val transactionListener: AbstractTransactionListener,
private val transactionRetrySupporter: AbstractTransactionRetrySupporter,
) : TransactionManager {

Expand Down Expand Up @@ -58,7 +58,7 @@ abstract class AbstractTransactionManager(

private fun Mono<String>.subscribeTransaction(): Mono<String> {
return this.doOnSuccess {
transactionDispatcher.subscribeStream(it)
transactionListener.subscribeStream(it)
.subscribeOn(Schedulers.parallel())
.subscribe()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ abstract class AbstractTransactionRetrySupporter(

init {
Flux.interval(recoveryMilli.milliseconds.toJavaDuration())
.publishOn(Schedulers.parallel())
.flatMap { handleOrphanTransaction() }
.flatMap {
handleOrphanTransaction()
.onErrorResume { Mono.empty() }
}
.subscribeOn(Schedulers.parallel())
.subscribe()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.rooftop.netx.autoconfig
package org.rooftop.netx.meta

import org.rooftop.netx.redis.RedisTransactionConfigurer
import org.springframework.context.annotation.Import
Expand Down
8 changes: 8 additions & 0 deletions src/main/kotlin/org/rooftop/netx/meta/TransactionHandler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.rooftop.netx.meta

import org.springframework.stereotype.Component

@Component
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionHandler
Loading

0 comments on commit 1d79943

Please sign in to comment.