diff --git a/README.md b/README.md
index 81ae77b..e48489a 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-![version 0.3.8](https://img.shields.io/badge/version-0.3.8-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
+![version 0.3.9](https://img.shields.io/badge/version-0.3.9-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
Redis-Stream을 지원하는 Saga frame work 입니다.
@@ -14,18 +14,18 @@ Redis-Stream을 지원하는 Saga frame work 입니다.
1. 동기 API와 비동기[Reactor](https://projectreactor.io/) API 지원
2. 함수형 Orchestrator 방식과 Event 기반 Choreograph 방식 지원
-3. 처리되지 않은 트랜잭션을 찾아 자동으로 재실행
-4. Backpressure 지원으로 노드별 처리가능한 트랜잭션 수 조절
-5. 여러 노드가 중복 트랜잭션 이벤트를 수신하는 문제 방지
+3. 설정한 주기 마다 처리되지 않은 이벤트를 찾아 자동으로 재실행
+4. Backpressure 지원으로 노드별 처리가능한 이벤트 수 조절
+5. 같은 그룹의 여러 노드가 이벤트를 중복 수신하는 문제 방지
6. `At Least Once` 방식의 메시지 전달 보장
## How to use
-Netx는 스프링 환경에서 사용할 수 있으며, 아래와 같이 `@EnableDistributedTransaciton` 어노테이션을 붙이는것으로 손쉽게 사용할 수 있습니다.
+Netx는 스프링 환경에서 사용할 수 있으며, 아래와 같이 `@EnableSaga` 어노테이션을 붙이는것으로 손쉽게 구성할 수 있습니다.
```kotlin
+@EnableSaga
@SpringBootApplication
-@EnableDistributedTransaciton
class Application {
companion object {
@@ -37,32 +37,32 @@ class Application {
}
```
-`@EnableDistributedTransaciton` 어노테이션으로 자동 구성할 경우 netx는 아래 프로퍼티를 사용해 메시지 큐와 커넥션을 맺습니다.
+`@EnableSaga` 어노테이션으로 자동 구성할 경우 netx는 아래 프로퍼티를 사용해 이벤트 스트림 서비스와 커넥션을 맺습니다.
#### Properties
-| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
-|-------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
-| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
-| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
-| **netx.password** | 0000 | 트랜잭션 관리에 사용할 메시지큐에 접속하는데 사용하는 password 입니다. 설정하지 않을시 0000이 비밀번호로 매핑됩니다. | 0000 |
-| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
-| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
-| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
-| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
-| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
-| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
-| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
-| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
-| **netx.pool-size** | 40 | 커넥션을 계속해서 맺어야할때, 최대 커넥션 수를 조절하는데 사용됩니다. | 10 |
+| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
+|-------------------------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
+| **netx.mode** | redis | Saga 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
+| **netx.host** | localhost | Saga 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
+| **netx.password** | 0000 | Saga 관리에 사용할 메시지큐에 접속하는데 사용하는 password 입니다. 설정하지 않을시 0000이 비밀번호로 매핑됩니다. | 0000 |
+| **netx.port** | 6379 | Saga 관리에 사용할 메시지 큐의 port 입니다. | |
+| **netx.group** | pay-group | 분산 노드의 그룹입니다. Saga 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
+| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
+| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
+| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 Saga를 찾아 재실행합니다. | 1000 |
+| **netx.orphan-milli** | 60000 | PENDING 상태가된 이벤트 중, orphan-milli가 지나도 ACK 상태가 되지 않은 이벤트르 찾아 재시작합니다. | 60000 |
+| **netx.backpressure** | 40 | 한번에 수신가능한 이벤트 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 이벤트 수신량과 처리에 실패한 이벤트 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 이벤트는 자동으로 재시도 대기열에 들어갑니다. | 40 |
+| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
+| **netx.pool-size** | 40 | 커넥션을 계속해서 맺어야할때, 최대 커넥션 수를 조절하는데 사용됩니다. | 10 |
### Usage example
#### Orchestrator-example.
> [!TIP]
-> Orchestrator 사용시, Transactional Message Pattern이 자동 적용됩니다.
-> 메시지 유실에대한 retry 단위는 Orchestrator의 각 연산(하나의 function) 단위이며, 모든 체인이 성공하거나 rollback이 호출됩니다.
+> Orchestrator 사용시, `Transactional messaging pattern` 이 자동 적용됩니다.
+> 이벤트 유실에대한 retry 단위는 Orchestrator의 각 연산(하나의 function) 단위이며, 모든 체인이 성공하거나 rollback이 호출됩니다.
```kotlin
// Use Orchestrator
@@ -70,11 +70,9 @@ class Application {
class OrderService(private val orderOrchestrator: Orchestrator) {
fun order(orderRequest: Order): OrderResult {
- val result = orderOrchestrator.transactionSync(orderRequest)
- if (!result.isSuccess) {
- result.throwError()
- }
- return result.decodeResult(OrderResult::class)
+ val result = orderOrchestrator.sagaSync(orderRequest)
+
+ result.decodeResultOrThrow(OrderResult::class) // If success get result or else throw exception
}
}
@@ -89,7 +87,7 @@ class OrchestratorConfigurer(
return orchestratorFactory.create("orderOrchestrator")
.start(
orchestrate = { order -> // its order type
- // Start Transaction with your bussiness logic
+ // Do your bussiness logic
// something like ... "Check valid seller"
return@start user
},
@@ -123,149 +121,153 @@ class OrchestratorConfigurer(
}
```
+#### Events-Scenario0. Handle saga event
+
+다른 분산서버가 (혹은 자기자신이) sagaManager를 통해서 saga를 시작하거나 saga의 상태를 변경했을때, 상태에 맞는 핸들러를 호출합니다.
+이 핸들러를 구현함으로써, saga 상태별 로직을 구현할 수 있습니다.
+각 핸들러에서 에러가 던져지면, 자동으로 rollback 이 호출되며, 핸들러가 종료되면, 어노테이션에 설정된 successWith 상태가 자동으로 호출니다.
+
> [!WARNING]
-> Event사용시 Transactional Message Pattern을 직접 적용해줘야합니다.
-> 아래 이벤트 사용 예시는 적용된 예시가 아니며, 적용을 위해서는 transactionManager 를 호출하는 부분과 트랜잭션 이벤트를 받는부분을 분리해야합니다.
-> 비즈니스로직을 모두 @Transaction...Listener 안으로 이동함으로써 손쉽게 적용할 수 있습니다.
+> Saga 핸들러는 반드시 핸들러에 맞는 `Saga...Event` **하나**만을 파라미터로 받아야 합니다.
+> Event사용시 `Transactional messaging pattern` 을 직접 적용해줘야합니다.
+> 아래 예시와 같이, 비즈니스로직을 모두 @Saga...Listener 안으로 이동함으로써 손쉽게 적용할 수 있습니다.
+
+```kotlin
+
+@SagaHandler
+class SagaHandler(
+ private val sagaManager: SagaManager,
+) {
+
+ fun start() {
+ val foo = Foo("...")
+ sagaManager.startSync(foo) // it will call
+ }
+
+ @SagaStartListener(event = Foo::class, successWith = SuccessWith.PUBLISH_JOIN) // Receive saga event when event can be mapped to Foo.class
+ fun handleSagaStartEvent(event: SagaStartEvent) {
+ val foo: Foo = event.decodeEvent(Foo::class) // Get event field to Foo.class
+ // ...
+ event.setNextEvent(nextFoo) // When this handler terminates and calls the next event or rollback, the event set here is published together.
+ }
+
+ @SagaJoinListener(successWith = SuccessWith.PUBLISH_COMMIT) // Receive all saga event when no type is defined. And, when terminated this function, publish commit state
+ fun handleSagaJoinEvent(event: SagaJoinEvent) {
+ // ...
+ }
+
+ @SagaCommitListener(
+ event = Foo::class,
+ noRollbackFor = [IllegalArgumentException::class] // Don't rollback when throw IllegalArgumentException. *Rollback if throw Throwable or IllegalArgumentException's super type*
+ )
+ fun handleSagaCommitEvent(event: SagaCommitEvent): Mono { // In Webflux framework, publisher must be returned.
+ throw IllegalArgumentException("Ignore this exception")
+ // ...
+ }
+
+ @SagaRollbackListener(Foo::class)
+ fun handleSagaRollbackEvent(event: SagaRollbackEvent) { // In Mvc framework, publisher must not returned.
+ val undo: Foo = event.decodeUndo(Foo::class) // Get event field to Foo.class
+ }
+}
+```
-#### Event-example. Start pay transaction
+#### Events-Scenario1. Start pay saga
```kotlin
// Sync
fun pay(param: Any): Any {
- val transactionId =
- transactionManager.syncStart(Pay(id = 1L, paid = 1000L)) // start transaction
+ val sagaId = sagaManager.syncStart(Pay(id = 1L, paid = 1000L)) // start saga
runCatching {
// Do your bussiness logic
}.fold(
- onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
+ onSuccess = { sagaManager.syncCommit(sagaId) }, // commit saga
onFailure = {
- transactionManager.syncRollback(
- transactionId,
+ sagaManager.syncRollback(
+ sagaId,
it.message
)
- } // rollback transaction
+ } // rollback saga
)
}
// Async
fun pay(param: Any): Mono {
- return transactionManager.start(
+ return sagaManager.start(
Pay(
id = 1L,
paid = 1000L
)
- ) // Start distributed transaction and publish transaction start event
- .flatMap { transactionId ->
+ ) // Start distributed saga and publish saga start event
+ .flatMap { sagaId ->
service.pay(param)
.doOnError { throwable ->
- transactionManager.rollback(
- transactionId,
+ sagaManager.rollback(
+ sagaId,
throwable.message
- ) // Publish rollback event to all transaction joined node
+ ) // Publish rollback event to all saga joined node
}
- }.doOnSuccess { transactionId ->
- transactionManager.commit(transactionId) // Publish commit event to all transaction joined node
+ }.doOnSuccess { sagaId ->
+ sagaManager.commit(sagaId) // Publish commit event to all saga joined node
}
}
```
-#### Events-Scenario2. Join order transaction
+#### Events-Scenario2. Join order saga
```kotlin
//Sync
fun order(param: Any): Any {
- val transactionId = transactionManager.syncJoin(
- param.transactionId,
+ val sagaId = sagaManager.syncJoin(
+ param.saganId,
Order(id = 1L, state = PENDING)
- ) // join transaction
+ ) // join saga
runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
- onSuccess = { transactionManager.syncCommit(transactionId) }, // commit transaction
+ onSuccess = { sagaManager.syncCommit(sagaId) }, // commit saga
onFailure = {
- transactionManager.syncRollback(
- transactionId,
+ sagaManager.syncRollback(
+ sagaId,
it.message
)
- } // rollback transaction
+ } // rollback saga
)
}
// Async
fun order(param: Any): Mono {
- return transactionManager.join(
- param.transactionId,
+ return sagaManager.join(
+ param.sagaId,
Order(id = 1L, state = PENDING)
- ) // join exists distributed transaction and publish transaction join event
- .flatMap { transactionId ->
+ ) // join exists distributed saga and publish saga join event
+ .flatMap { sagaId ->
service.order(param)
.doOnError { throwable ->
- transactionManager.rollback(transactionId, throwable.message)
+ sagaManager.rollback(sagaId, throwable.message)
}
- }.doOnSuccess { transactionId ->
- transactionManager.commit(transactionId)
+ }.doOnSuccess { sagaId ->
+ sagaManager.commit(sagaId)
}
}
```
-#### Events-Scenario3. Check exists transaction
+#### Events-Scenario3. Check exists saga
```kotlin
// Sync
fun exists(param: Any): Any {
- return transactionManager.syncExists(param.transactionId)
+ return sagaManager.syncExists(param.sagaId)
}
// Async
fun exists(param: Any): Mono {
- return transactionManager.exists(param.transactionId) // Find any transaction has ever been started
+ return sagaManager.exists(param.sagaId) // Find any saga has ever been started
}
```
-#### Events-Scenario4. Handle transaction event
-
-다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 트랜잭션 상태에 맞는 핸들러를 호출합니다.
-이 핸들러를 구현함으로써, 트랜잭션 상태별 로직을 구현할 수 있습니다.
-각 핸들러에서 에러가 던져지면, 자동으로 rollback 이 호출됩니다.
-
-> [!WARNING]
-> 트랜잭션 핸들러는 반드시 핸들러에 맞는 `TransactionEvent` **하나**만을 파라미터로 받아야 합니다.
-
-```kotlin
-
-@TransactionHandler
-class TransactionHandler {
-
- @TransactionStartListener(event = Foo::class) // Receive transaction event when event can be mapped to Foo.class
- fun handleTransactionStartEvent(event: TransactionStartEvent) {
- val foo: Foo = event.decodeEvent(Foo::class) // Get event field to Foo.class
- // ...
- event.setNextEvent(nextFoo) // When this handler terminates and calls the next event or rollback, the event set here is published together.
- }
-
- @TransactionJoinListener(successWith = SuccessWith.PUBLISH_COMMIT) // Receive all transaction event when no type is defined. And, when terminated this function, publish commit state
- fun handleTransactionJoinEvent(event: TransactionJoinEvent) {
- // ...
- }
-
- @TransactionCommitListener(
- event = Foo::class,
- noRollbackFor = [IllegalArgumentException::class] // Dont rollback when throw IllegalArgumentException. *Rollback if throw Throwable or IllegalArgumentException's super type*
- )
- fun handleTransactionCommitEvent(event: TransactionCommitEvent): Mono { // In Webflux framework, publisher must be returned.
- throw IllegalArgumentException("Ignore this exception")
- // ...
- }
-
- @TransactionRollbackListener(Foo::class)
- fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) { // In Mvc framework, publisher must not returned.
- val undo: Foo = event.decodeUndo(Foo::class) // Get event field to Foo.class
- }
-}
-```
## Download
diff --git a/gradle.properties b/gradle.properties
index 5320a89..5cb3c9d 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,7 +2,7 @@ kotlin.code.style=official
### Project ###
group=org.rooftop.netx
-version=0.3.8
+version=0.3.9
compatibility=17
### Sonarcloud ###
diff --git a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
index 892854f..4e6b9c3 100644
--- a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
@@ -4,14 +4,14 @@ class EncodeException(message: String, throwable: Throwable) : RuntimeException(
class DecodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)
-open class TransactionException(message: String) : RuntimeException(message)
+open class SagaException(message: String) : RuntimeException(message)
-class AlreadyCommittedTransactionException(transactionId: String, state: String) :
- TransactionException("Cannot join transaction cause, transaction \"$transactionId\" already \"$state\"")
+class AlreadyCommittedSagaException(id: String, state: String) :
+ SagaException("Cannot join saga cause, saga \"$id\" already \"$state\"")
class NotFoundDispatchFunctionException(message: String) : RuntimeException(message)
-class FailedAckTransactionException(message: String) : RuntimeException(message)
+class FailedAckSagaException(message: String) : RuntimeException(message)
class ResultTimeoutException(message: String, throwable: Throwable) :
RuntimeException(message, throwable)
diff --git a/src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt b/src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt
index 045ecec..6e23352 100644
--- a/src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/Orchestrator.kt
@@ -4,19 +4,19 @@ import reactor.core.publisher.Mono
interface Orchestrator {
- fun transaction(request: T): Mono>
+ fun saga(request: T): Mono>
- fun transaction(timeoutMillis: Long, request: T): Mono>
+ fun saga(timeoutMillis: Long, request: T): Mono>
- fun transaction(request: T, context: MutableMap): Mono>
+ fun saga(request: T, context: MutableMap): Mono>
- fun transaction(timeoutMillis: Long, request: T, context: MutableMap): Mono>
+ fun saga(timeoutMillis: Long, request: T, context: MutableMap): Mono>
- fun transactionSync(request: T): Result
+ fun sagaSync(request: T): Result
- fun transactionSync(timeoutMillis: Long, request: T): Result
+ fun sagaSync(timeoutMillis: Long, request: T): Result
- fun transactionSync(request: T, context: MutableMap): Result
+ fun sagaSync(request: T, context: MutableMap): Result
- fun transactionSync(timeoutMillis: Long, request: T, context: MutableMap): Result
+ fun sagaSync(timeoutMillis: Long, request: T, context: MutableMap): Result
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaCommitEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaCommitEvent.kt
new file mode 100644
index 0000000..d17827f
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaCommitEvent.kt
@@ -0,0 +1,13 @@
+package org.rooftop.netx.api
+
+class SagaCommitEvent internal constructor(
+ id: String,
+ nodeName: String,
+ group: String,
+ event: String?,
+ codec: Codec,
+) : SagaEvent(id, nodeName, group, event, codec) {
+
+ override fun copy(): SagaCommitEvent =
+ SagaCommitEvent(id, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt b/src/main/kotlin/org/rooftop/netx/api/SagaCommitListener.kt
similarity index 84%
rename from src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt
rename to src/main/kotlin/org/rooftop/netx/api/SagaCommitListener.kt
index 239a8a9..e59fcc3 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaCommitListener.kt
@@ -4,7 +4,7 @@ import kotlin.reflect.KClass
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
-annotation class TransactionCommitListener(
+annotation class SagaCommitListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array> = [],
)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt
similarity index 84%
rename from src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
rename to src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt
index 661179d..02012d9 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaEvent.kt
@@ -2,8 +2,8 @@ package org.rooftop.netx.api
import kotlin.reflect.KClass
-sealed class TransactionEvent(
- val transactionId: String,
+sealed class SagaEvent(
+ val id: String,
val nodeName: String,
val group: String,
internal val event: String?,
@@ -24,5 +24,5 @@ sealed class TransactionEvent(
type
)
- internal abstract fun copy(): TransactionEvent
+ internal abstract fun copy(): SagaEvent
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaJoinEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaJoinEvent.kt
new file mode 100644
index 0000000..9f3f21f
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaJoinEvent.kt
@@ -0,0 +1,13 @@
+package org.rooftop.netx.api
+
+class SagaJoinEvent internal constructor(
+ id: String,
+ nodeName: String,
+ group: String,
+ event: String?,
+ codec: Codec,
+) : SagaEvent(id, nodeName, group, event, codec) {
+
+ override fun copy(): SagaJoinEvent =
+ SagaJoinEvent(id, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt b/src/main/kotlin/org/rooftop/netx/api/SagaJoinListener.kt
similarity index 87%
rename from src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt
rename to src/main/kotlin/org/rooftop/netx/api/SagaJoinListener.kt
index 243b103..d888dae 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaJoinListener.kt
@@ -4,7 +4,7 @@ import kotlin.reflect.KClass
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
-annotation class TransactionJoinListener(
+annotation class SagaJoinListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaManager.kt b/src/main/kotlin/org/rooftop/netx/api/SagaManager.kt
new file mode 100644
index 0000000..bf558ea
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaManager.kt
@@ -0,0 +1,43 @@
+package org.rooftop.netx.api
+
+import reactor.core.publisher.Mono
+
+interface SagaManager {
+
+ fun start(): Mono
+
+ fun start(event: T): Mono
+
+ fun syncStart(): String
+
+ fun syncStart(event: T): String
+
+ fun join(id: String): Mono
+
+ fun join(id: String, event: T): Mono
+
+ fun syncJoin(id: String): String
+
+ fun syncJoin(id: String, event: T): String
+
+ fun exists(id: String): Mono
+
+ fun syncExists(id: String): String
+
+ fun commit(id: String): Mono
+
+ fun commit(id: String, event: T): Mono
+
+ fun syncCommit(id: String): String
+
+ fun syncCommit(id: String, event: T): String
+
+ fun rollback(id: String, cause: String): Mono
+
+ fun rollback(id: String, cause: String, event: T): Mono
+
+ fun syncRollback(id: String, cause: String): String
+
+ fun syncRollback(id: String, cause: String, event: T): String
+
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaRollbackEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaRollbackEvent.kt
new file mode 100644
index 0000000..998739c
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaRollbackEvent.kt
@@ -0,0 +1,14 @@
+package org.rooftop.netx.api
+
+class SagaRollbackEvent internal constructor(
+ id: String,
+ nodeName: String,
+ group: String,
+ event: String?,
+ val cause: String,
+ codec: Codec,
+) : SagaEvent(id, nodeName, group, event, codec) {
+
+ override fun copy(): SagaRollbackEvent =
+ SagaRollbackEvent(id, nodeName, group, event, cause, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt b/src/main/kotlin/org/rooftop/netx/api/SagaRollbackListener.kt
similarity index 79%
rename from src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt
rename to src/main/kotlin/org/rooftop/netx/api/SagaRollbackListener.kt
index 78b89b7..3f1c2c4 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaRollbackListener.kt
@@ -4,6 +4,6 @@ import kotlin.reflect.KClass
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
-annotation class TransactionRollbackListener(
+annotation class SagaRollbackListener(
val event: KClass<*> = Any::class,
)
diff --git a/src/main/kotlin/org/rooftop/netx/api/SagaStartEvent.kt b/src/main/kotlin/org/rooftop/netx/api/SagaStartEvent.kt
new file mode 100644
index 0000000..2286c74
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaStartEvent.kt
@@ -0,0 +1,13 @@
+package org.rooftop.netx.api
+
+class SagaStartEvent internal constructor(
+ id: String,
+ nodeName: String,
+ group: String,
+ event: String?,
+ codec: Codec,
+) : SagaEvent(id, nodeName, group, event, codec) {
+
+ override fun copy(): SagaStartEvent =
+ SagaStartEvent(id, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt b/src/main/kotlin/org/rooftop/netx/api/SagaStartListener.kt
similarity index 87%
rename from src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt
rename to src/main/kotlin/org/rooftop/netx/api/SagaStartListener.kt
index 0ca54a4..92407d9 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/SagaStartListener.kt
@@ -4,7 +4,7 @@ import kotlin.reflect.KClass
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
-annotation class TransactionStartListener(
+annotation class SagaStartListener(
val event: KClass<*> = Any::class,
val noRollbackFor: Array> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
deleted file mode 100644
index ba47177..0000000
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.rooftop.netx.api
-
-class TransactionCommitEvent internal constructor(
- transactionId: String,
- nodeName: String,
- group: String,
- event: String?,
- codec: Codec,
-): TransactionEvent(transactionId, nodeName, group, event, codec) {
-
- override fun copy(): TransactionCommitEvent =
- TransactionCommitEvent(transactionId, nodeName, group, event, codec)
-}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
deleted file mode 100644
index 9a31120..0000000
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.rooftop.netx.api
-
-class TransactionJoinEvent internal constructor(
- transactionId: String,
- nodeName: String,
- group: String,
- event: String?,
- codec: Codec,
-) : TransactionEvent(transactionId, nodeName, group, event, codec) {
-
- override fun copy(): TransactionJoinEvent =
- TransactionJoinEvent(transactionId, nodeName, group, event, codec)
-}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
deleted file mode 100644
index 25679e3..0000000
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.rooftop.netx.api
-
-import reactor.core.publisher.Mono
-
-interface TransactionManager {
-
- fun start(): Mono
-
- fun start(event: T): Mono
-
- fun syncStart(): String
-
- fun syncStart(event: T): String
-
- fun join(transactionId: String): Mono
-
- fun join(transactionId: String, event: T): Mono
-
- fun syncJoin(transactionId: String): String
-
- fun syncJoin(transactionId: String, event: T): String
-
- fun exists(transactionId: String): Mono
-
- fun syncExists(transactionId: String): String
-
- fun commit(transactionId: String): Mono
-
- fun commit(transactionId: String, event: T): Mono
-
- fun syncCommit(transactionId: String): String
-
- fun syncCommit(transactionId: String, event: T): String
-
- fun rollback(transactionId: String, cause: String): Mono
-
- fun rollback(transactionId: String, cause: String, event: T): Mono
-
- fun syncRollback(transactionId: String, cause: String): String
-
- fun syncRollback(transactionId: String, cause: String, event: T): String
-
-}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
deleted file mode 100644
index ced0e88..0000000
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.rooftop.netx.api
-
-class TransactionRollbackEvent internal constructor(
- transactionId: String,
- nodeName: String,
- group: String,
- event: String?,
- val cause: String,
- codec: Codec,
-) : TransactionEvent(transactionId, nodeName, group, event, codec) {
-
- override fun copy(): TransactionRollbackEvent =
- TransactionRollbackEvent(transactionId, nodeName, group, event, cause, codec)
-}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
deleted file mode 100644
index 8e1831d..0000000
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.rooftop.netx.api
-
-class TransactionStartEvent internal constructor(
- transactionId: String,
- nodeName: String,
- group: String,
- event: String?,
- codec: Codec,
-) : TransactionEvent(transactionId, nodeName, group, event, codec) {
-
- override fun copy(): TransactionStartEvent =
- TransactionStartEvent(transactionId, nodeName, group, event, codec)
-}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
index e842ecf..43b788d 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
@@ -1,7 +1,7 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.api.TransactionEvent
-import org.rooftop.netx.api.TransactionManager
+import org.rooftop.netx.api.SagaEvent
+import org.rooftop.netx.api.SagaManager
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
@@ -11,12 +11,12 @@ internal sealed class AbstractDispatchFunction(
protected val function: KFunction,
protected val handler: Any,
private val noRollbackFor: Array>,
- private val nextState: NextTransactionState,
- private val transactionManager: TransactionManager,
+ private val nextState: NextSagaState,
+ private val sagaManager: SagaManager,
) {
fun name(): String = function.name
- abstract fun call(transactionEvent: TransactionEvent): T
+ abstract fun call(sagaEvent: SagaEvent): T
protected fun isNoRollbackFor(throwable: Throwable): Boolean {
return noRollbackFor.isNotEmpty() && throwable.cause != null && noRollbackFor.contains(
@@ -24,35 +24,35 @@ internal sealed class AbstractDispatchFunction(
)
}
- protected fun isProcessable(transactionEvent: TransactionEvent): Boolean {
+ protected fun isProcessable(sagaEvent: SagaEvent): Boolean {
return runCatching {
- transactionEvent.decodeEvent(eventType)
+ sagaEvent.decodeEvent(eventType)
}.onFailure {
return it is NullPointerException && eventType == Any::class
}.isSuccess
}
- protected fun rollback(transactionEvent: TransactionEvent, throwable: Throwable) {
- transactionEvent.nextEvent?.let {
- transactionManager.rollback(transactionEvent.transactionId, throwable.getCause(), it)
+ protected fun rollback(sagaEvent: SagaEvent, throwable: Throwable) {
+ sagaEvent.nextEvent?.let {
+ sagaManager.rollback(sagaEvent.id, throwable.getCause(), it)
.subscribeOn(Schedulers.parallel())
.subscribe()
- } ?: transactionManager.rollback(transactionEvent.transactionId, throwable.getCause())
+ } ?: sagaManager.rollback(sagaEvent.id, throwable.getCause())
.subscribeOn(Schedulers.parallel())
.subscribe()
}
- protected fun publishNextTransaction(transactionEvent: TransactionEvent) {
+ protected fun publishNextSaga(sagaEvent: SagaEvent) {
when (nextState) {
- NextTransactionState.JOIN -> transactionEvent.nextEvent?.let {
- transactionManager.join(transactionEvent.transactionId, it)
- } ?: transactionManager.join(transactionEvent.transactionId)
+ NextSagaState.JOIN -> sagaEvent.nextEvent?.let {
+ sagaManager.join(sagaEvent.id, it)
+ } ?: sagaManager.join(sagaEvent.id)
- NextTransactionState.COMMIT -> transactionEvent.nextEvent?.let {
- transactionManager.commit(transactionEvent.transactionId, it)
- } ?: transactionManager.commit(transactionEvent.transactionId)
+ NextSagaState.COMMIT -> sagaEvent.nextEvent?.let {
+ sagaManager.commit(sagaEvent.id, it)
+ } ?: sagaManager.commit(sagaEvent.id)
- NextTransactionState.END -> return
+ NextSagaState.END -> return
}.subscribeOn(Schedulers.parallel())
.subscribe()
}
@@ -61,7 +61,7 @@ internal sealed class AbstractDispatchFunction(
return this.message ?: this.cause?.message ?: this::class.java.name
}
- internal enum class NextTransactionState {
+ internal enum class NextSagaState {
JOIN,
COMMIT,
END
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt
similarity index 50%
rename from src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
rename to src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt
index d84d411..6a30025 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaDispatcher.kt
@@ -2,8 +2,8 @@ package org.rooftop.netx.engine
import jakarta.annotation.PostConstruct
import org.rooftop.netx.api.*
-import org.rooftop.netx.engine.core.Transaction
-import org.rooftop.netx.engine.core.TransactionState
+import org.rooftop.netx.engine.core.Saga
+import org.rooftop.netx.engine.core.SagaState
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.publisher.Flux
@@ -13,106 +13,106 @@ import kotlin.reflect.KClass
import kotlin.reflect.KFunction
import kotlin.reflect.full.declaredMemberFunctions
-internal abstract class AbstractTransactionDispatcher(
+internal abstract class AbstractSagaDispatcher(
private val codec: Codec,
- private val transactionManager: TransactionManager,
+ private val sagaManager: SagaManager,
) {
private val functions =
- mutableMapOf>>()
+ mutableMapOf>>()
- fun dispatch(transaction: Transaction, messageId: String): Mono {
- return Flux.fromIterable(functions[transaction.state] ?: listOf())
+ fun dispatch(saga: Saga, messageId: String): Mono {
+ return Flux.fromIterable(functions[saga.state] ?: listOf())
.flatMap { function ->
when (function) {
is MonoDispatchFunction -> {
- mapToTransactionEvent(transaction.copy())
+ mapSagaEvent(saga.copy())
.callMono(function)
- .warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
+ .warningOnError("Error occurred in SagaHandler function \"${function.name()}\" with saga id ${saga.id}")
}
is NotPublishDispatchFunction -> {
- mapToTransactionEvent(transaction.copy())
+ mapSagaEvent(saga.copy())
.callNotPublish(function)
- .warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
+ .warningOnError("Error occurred in SagaHandler function \"${function.name()}\" with saga id ${saga.id}")
}
is OrchestrateDispatchFunction -> {
- mapToTransactionEvent(transaction.copy())
+ mapSagaEvent(saga.copy())
.callOrchestrate(function)
- .warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
+ .warningOnError("Error occurred in SagaHandler function \"${function.name()}\" with saga id ${saga.id}")
}
}
}
.subscribeOn(Schedulers.boundedElastic())
- .ackWhenComplete(transaction, messageId)
+ .ackWhenComplete(saga, messageId)
.then(Mono.just(DISPATCHED))
}
private fun Flux<*>.ackWhenComplete(
- transaction: Transaction,
+ saga: Saga,
messageId: String
): Flux<*> = this.doOnComplete {
- Mono.just(transaction to messageId)
- .info("Ack transaction \"${transaction.id}\"")
+ Mono.just(saga to messageId)
+ .info("Ack saga \"${saga.id}\"")
.flatMap {
- ack(transaction, messageId)
- .warningOnError("Fail to ack transaction \"${transaction.id}\"")
+ ack(saga, messageId)
+ .warningOnError("Fail to ack saga \"${saga.id}\"")
}
.subscribeOn(Schedulers.parallel())
.subscribe()
}
- private fun mapToTransactionEvent(transaction: Transaction): Mono {
- return when (transaction.state) {
- TransactionState.START -> Mono.just(
- TransactionStartEvent(
- transactionId = transaction.id,
- nodeName = transaction.serverId,
- group = transaction.group,
- event = extractEvent(transaction),
+ private fun mapSagaEvent(saga: Saga): Mono {
+ return when (saga.state) {
+ SagaState.START -> Mono.just(
+ SagaStartEvent(
+ id = saga.id,
+ nodeName = saga.serverId,
+ group = saga.group,
+ event = extractEvent(saga),
codec = codec,
)
)
- TransactionState.COMMIT -> Mono.just(
- TransactionCommitEvent(
- transactionId = transaction.id,
- nodeName = transaction.serverId,
- group = transaction.group,
- event = extractEvent(transaction),
+ SagaState.COMMIT -> Mono.just(
+ SagaCommitEvent(
+ id = saga.id,
+ nodeName = saga.serverId,
+ group = saga.group,
+ event = extractEvent(saga),
codec = codec
)
)
- TransactionState.JOIN -> Mono.just(
- TransactionJoinEvent(
- transactionId = transaction.id,
- nodeName = transaction.serverId,
- group = transaction.group,
- event = extractEvent(transaction),
+ SagaState.JOIN -> Mono.just(
+ SagaJoinEvent(
+ id = saga.id,
+ nodeName = saga.serverId,
+ group = saga.group,
+ event = extractEvent(saga),
codec = codec,
)
)
- TransactionState.ROLLBACK ->
+ SagaState.ROLLBACK ->
Mono.just(
- TransactionRollbackEvent(
- transactionId = transaction.id,
- nodeName = transaction.serverId,
- group = transaction.group,
- event = extractEvent(transaction),
- cause = transaction.cause
- ?: throw NullPointerException("Null value on TransactionRollbackEvent's cause field"),
+ SagaRollbackEvent(
+ id = saga.id,
+ nodeName = saga.serverId,
+ group = saga.group,
+ event = extractEvent(saga),
+ cause = saga.cause
+ ?: throw NullPointerException("Null value on SagaRollbackEvent's cause field"),
codec = codec,
)
)
}
}
- private fun extractEvent(transaction: Transaction): String? {
- return when (transaction.event != null) {
- true -> transaction.event
+ private fun extractEvent(saga: Saga): String? {
+ return when (saga.event != null) {
+ true -> saga.event
false -> null
}
}
@@ -131,23 +131,23 @@ internal abstract class AbstractTransactionDispatcher(
function.annotations
.forEach { annotation ->
runCatching {
- val transactionState = getMatchedTransactionState(annotation)
+ val sagaState = getMatchedSagaState(annotation)
val eventType = getEventType(annotation)
val noRollbackFor = getNoRollbackFor(annotation)
- val nextState = getNextTransactionState(annotation)
- functions.putIfAbsent(transactionState, mutableListOf())
- functions[transactionState]?.add(
+ val nextState = getNextSagaState(annotation)
+ functions.putIfAbsent(sagaState, mutableListOf())
+ functions[sagaState]?.add(
OrchestrateDispatchFunction(
eventType,
function as KFunction>,
handler,
noRollbackFor,
nextState,
- transactionManager,
+ sagaManager,
)
)
}.onFailure {
- throw IllegalStateException("Cannot add Mono TransactionHandler", it)
+ throw IllegalStateException("Cannot add Mono SagaHandler", it)
}
}
}
@@ -155,9 +155,9 @@ internal abstract class AbstractTransactionDispatcher(
@PostConstruct
fun initHandler() {
- val transactionHandler = findHandlers()
- initMonoFunctions(transactionHandler)
- initNotPublisherFunctions(transactionHandler)
+ val sagaHandlers = findHandlers()
+ initMonoFunctions(sagaHandlers)
+ initNotPublisherFunctions(sagaHandlers)
functions.forEach { (_, function) ->
val functionName = function.map { it.name() }.toList()
info("Register functions names : \"${functionName}\"")
@@ -176,23 +176,23 @@ internal abstract class AbstractTransactionDispatcher(
function.annotations
.forEach { annotation ->
runCatching {
- val transactionState = getMatchedTransactionState(annotation)
+ val sagaState = getMatchedSagaState(annotation)
val eventType = getEventType(annotation)
val noRollbackFor = getNoRollbackFor(annotation)
- val nextState = getNextTransactionState(annotation)
- functions.putIfAbsent(transactionState, mutableListOf())
- functions[transactionState]?.add(
+ val nextState = getNextSagaState(annotation)
+ functions.putIfAbsent(sagaState, mutableListOf())
+ functions[sagaState]?.add(
MonoDispatchFunction(
eventType,
function as KFunction>,
handler,
noRollbackFor,
nextState,
- transactionManager,
+ sagaManager,
)
)
}.onFailure {
- throw IllegalStateException("Cannot add Mono TransactionHandler", it)
+ throw IllegalStateException("Cannot add Mono SagaHandler", it)
}
}
}
@@ -211,23 +211,23 @@ internal abstract class AbstractTransactionDispatcher(
function.annotations
.forEach { annotation ->
runCatching {
- val transactionState = getMatchedTransactionState(annotation)
+ val sagaState = getMatchedSagaState(annotation)
val eventType = getEventType(annotation)
val noRollbackFor = getNoRollbackFor(annotation)
- val nextState = getNextTransactionState(annotation)
- functions.putIfAbsent(transactionState, mutableListOf())
- functions[transactionState]?.add(
+ val nextState = getNextSagaState(annotation)
+ functions.putIfAbsent(sagaState, mutableListOf())
+ functions[sagaState]?.add(
NotPublishDispatchFunction(
eventType,
function,
handler,
noRollbackFor,
nextState,
- transactionManager,
+ sagaManager,
)
)
}.onFailure {
- throw IllegalStateException("Cannot add TransactionHandler", it)
+ throw IllegalStateException("Cannot add SagaHandler", it)
}
}
}
@@ -238,59 +238,59 @@ internal abstract class AbstractTransactionDispatcher(
private fun getEventType(annotation: Annotation): KClass<*> {
return when (annotation) {
- is TransactionStartListener -> annotation.event
- is TransactionCommitListener -> annotation.event
- is TransactionJoinListener -> annotation.event
- is TransactionRollbackListener -> annotation.event
- else -> throw notMatchedTransactionHandlerException
+ is SagaStartListener -> annotation.event
+ is SagaCommitListener -> annotation.event
+ is SagaJoinListener -> annotation.event
+ is SagaRollbackListener -> annotation.event
+ else -> throw notMatchedSagaHandlerException
}
}
private fun getNoRollbackFor(annotation: Annotation): Array> {
return when (annotation) {
- is TransactionStartListener -> annotation.noRollbackFor
- is TransactionCommitListener -> annotation.noRollbackFor
- is TransactionJoinListener -> annotation.noRollbackFor
- is TransactionRollbackListener -> emptyArray()
- else -> throw notMatchedTransactionHandlerException
+ is SagaStartListener -> annotation.noRollbackFor
+ is SagaCommitListener -> annotation.noRollbackFor
+ is SagaJoinListener -> annotation.noRollbackFor
+ is SagaRollbackListener -> emptyArray()
+ else -> throw notMatchedSagaHandlerException
}
}
- private fun getMatchedTransactionState(annotation: Annotation): TransactionState {
+ private fun getMatchedSagaState(annotation: Annotation): SagaState {
return when (annotation) {
- is TransactionStartListener -> TransactionState.START
- is TransactionCommitListener -> TransactionState.COMMIT
- is TransactionJoinListener -> TransactionState.JOIN
- is TransactionRollbackListener -> TransactionState.ROLLBACK
- else -> throw notMatchedTransactionHandlerException
+ is SagaStartListener -> SagaState.START
+ is SagaCommitListener -> SagaState.COMMIT
+ is SagaJoinListener -> SagaState.JOIN
+ is SagaRollbackListener -> SagaState.ROLLBACK
+ else -> throw notMatchedSagaHandlerException
}
}
- private fun getNextTransactionState(annotation: Annotation): AbstractDispatchFunction.NextTransactionState {
+ private fun getNextSagaState(annotation: Annotation): AbstractDispatchFunction.NextSagaState {
return when (annotation) {
- is TransactionStartListener -> annotation.successWith.toNextTransactionState()
- is TransactionJoinListener -> annotation.successWith.toNextTransactionState()
- else -> AbstractDispatchFunction.NextTransactionState.END
+ is SagaStartListener -> annotation.successWith.toNextSagaState()
+ is SagaJoinListener -> annotation.successWith.toNextSagaState()
+ else -> AbstractDispatchFunction.NextSagaState.END
}
}
- private fun SuccessWith.toNextTransactionState(): AbstractDispatchFunction.NextTransactionState {
+ private fun SuccessWith.toNextSagaState(): AbstractDispatchFunction.NextSagaState {
return when (this) {
- SuccessWith.PUBLISH_JOIN -> AbstractDispatchFunction.NextTransactionState.JOIN
- SuccessWith.PUBLISH_COMMIT -> AbstractDispatchFunction.NextTransactionState.COMMIT
- SuccessWith.END -> AbstractDispatchFunction.NextTransactionState.END
+ SuccessWith.PUBLISH_JOIN -> AbstractDispatchFunction.NextSagaState.JOIN
+ SuccessWith.PUBLISH_COMMIT -> AbstractDispatchFunction.NextSagaState.COMMIT
+ SuccessWith.END -> AbstractDispatchFunction.NextSagaState.END
}
}
protected abstract fun ack(
- transaction: Transaction,
+ saga: Saga,
messageId: String
- ): Mono>
+ ): Mono>
private companion object {
private const val DISPATCHED = "dispatched"
- private val notMatchedTransactionHandlerException =
- NotFoundDispatchFunctionException("Cannot find matched Transaction handler")
+ private val notMatchedSagaHandlerException =
+ NotFoundDispatchFunctionException("Cannot find matched Saga handler")
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
similarity index 65%
rename from src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
rename to src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
index a511773..b038e0a 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaListener.kt
@@ -1,6 +1,6 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.engine.core.Transaction
+import org.rooftop.netx.engine.core.Saga
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.publisher.BufferOverflowStrategy
@@ -8,9 +8,9 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
-internal abstract class AbstractTransactionListener(
+internal abstract class AbstractSagaListener(
private val backpressureSize: Int,
- private val transactionDispatcher: AbstractTransactionDispatcher,
+ private val sagaDispatcher: AbstractSagaDispatcher,
) {
fun subscribeStream() {
@@ -18,18 +18,18 @@ internal abstract class AbstractTransactionListener(
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer(backpressureSize, BufferOverflowStrategy.DROP_LATEST)
.doOnNext {
- info("Listen transaction ${it.first}\nmessageId \"${it.second}\"")
+ info("Listen saga ${it.first}\nmessageId \"${it.second}\"")
}
- .flatMap { (transaction, messageId) ->
- transactionDispatcher.dispatch(transaction, messageId)
- .warningOnError("Error occurred when listen transaction ${transaction.id}")
+ .flatMap { (saga, messageId) ->
+ sagaDispatcher.dispatch(saga, messageId)
+ .warningOnError("Error occurred when listen saga id ${saga.id}")
}
.onErrorResume { Mono.empty() }
.restartWhenTerminated()
.subscribe()
}
- protected abstract fun receive(): Flux>
+ protected abstract fun receive(): Flux>
private fun Flux.restartWhenTerminated(): Flux {
return this.doAfterTerminate {
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaManager.kt
new file mode 100644
index 0000000..7724ef2
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaManager.kt
@@ -0,0 +1,231 @@
+package org.rooftop.netx.engine
+
+import org.rooftop.netx.api.AlreadyCommittedSagaException
+import org.rooftop.netx.api.Codec
+import org.rooftop.netx.api.SagaException
+import org.rooftop.netx.api.SagaManager
+import org.rooftop.netx.engine.core.Saga
+import org.rooftop.netx.engine.core.SagaState
+import org.rooftop.netx.engine.logging.info
+import org.rooftop.netx.engine.logging.infoOnError
+import org.rooftop.netx.engine.logging.warningOnError
+import reactor.core.publisher.Mono
+
+internal abstract class AbstractSagaManager(
+ private val codec: Codec,
+ private val nodeGroup: String,
+ private val nodeName: String,
+ private val sagaIdGenerator: SagaIdGenerator,
+) : SagaManager {
+
+ override fun syncStart(): String {
+ return start().block()
+ ?: throw SagaException("Cannot start saga")
+ }
+
+ final override fun syncStart(event: T): String {
+ return start(event).block()
+ ?: throw SagaException("Cannot start saga \"$event\"")
+ }
+
+ override fun syncJoin(id: String): String {
+ return join(id).block()
+ ?: throw SagaException("Cannot join saga \"$id\"")
+ }
+
+ final override fun syncJoin(id: String, event: T): String {
+ return join(id, event).block()
+ ?: throw SagaException("Cannot join saga \"$id\", \"$event\"")
+ }
+
+ final override fun syncExists(id: String): String {
+ return exists(id).block()
+ ?: throw SagaException("Cannot exists saga \"$id\"")
+ }
+
+ final override fun syncCommit(id: String): String {
+ return commit(id).block()
+ ?: throw SagaException("Cannot commit saga \"$id\"")
+ }
+
+ override fun syncCommit(id: String, event: T): String {
+ return commit(id, event).block()
+ ?: throw SagaException("Cannot commit saga \"$id\" \"$event\"")
+ }
+
+ final override fun syncRollback(id: String, cause: String): String {
+ return rollback(id, cause).block()
+ ?: throw SagaException("Cannot rollback saga \"$id\", \"$cause\"")
+ }
+
+ override fun syncRollback(id: String, cause: String, event: T): String {
+ return rollback(id, cause, event).block()
+ ?: throw SagaException("Cannot rollback saga \"$id\", \"$cause\" \"$event\"")
+ }
+
+ override fun start(): Mono {
+ return startSaga(null)
+ .info("Start saga")
+ .contextWrite { it.put(CONTEXT_TX_KEY, sagaIdGenerator.generate()) }
+ }
+
+ final override fun start(event: T): Mono {
+ return Mono.fromCallable { codec.encode(event) }
+ .flatMap { encodedEvent ->
+ startSaga(encodedEvent)
+ .info("Start saga event \"$event\"")
+ }
+ .contextWrite { it.put(CONTEXT_TX_KEY, sagaIdGenerator.generate()) }
+ }
+
+ private fun startSaga(event: String?): Mono {
+ return Mono.deferContextual { Mono.just(it[CONTEXT_TX_KEY]) }
+ .flatMap { id ->
+ publishSaga(
+ id, Saga(
+ id = id,
+ serverId = nodeName,
+ group = nodeGroup,
+ state = SagaState.START,
+ event = event,
+ )
+ )
+ }
+ }
+
+ override fun join(id: String): Mono {
+ return getAnySaga(id)
+ .map {
+ if (it == SagaState.COMMIT) {
+ throw AlreadyCommittedSagaException(id, it.name)
+ }
+ id
+ }
+ .warningOnError("Cannot join saga")
+ .flatMap {
+ joinSaga(id, null)
+ .info("Join saga id \"$id\"")
+ }
+ }
+
+ override fun join(id: String, event: T): Mono {
+ return getAnySaga(id)
+ .map {
+ if (it == SagaState.COMMIT) {
+ throw AlreadyCommittedSagaException(id, it.name)
+ }
+ id
+ }
+ .warningOnError("Cannot join saga")
+ .map { codec.encode(event) }
+ .flatMap {
+ joinSaga(id, it)
+ .info("Join saga id \"$id\", event \"$event\"")
+ }
+ }
+
+ private fun joinSaga(id: String, event: String?): Mono {
+ return publishSaga(
+ id, Saga(
+ id = id,
+ serverId = nodeName,
+ group = nodeGroup,
+ state = SagaState.JOIN,
+ event = event,
+ )
+ )
+ }
+
+ final override fun rollback(id: String, cause: String): Mono {
+ return exists(id)
+ .infoOnError("Cannot rollback saga cause, saga \"$id\" is not exists")
+ .flatMap {
+ rollbackSaga(id, cause, null)
+ }
+ .info("Rollback saga \"$id\"")
+ .contextWrite { it.put(CONTEXT_TX_KEY, id) }
+ }
+
+ override fun rollback(id: String, cause: String, event: T): Mono {
+ return exists(id)
+ .infoOnError("Cannot rollback saga cause, saga \"$id\" is not exists")
+ .map { codec.encode(event) }
+ .flatMap { encodedEvent ->
+ rollbackSaga(id, cause, encodedEvent)
+ }
+ .info("Rollback saga \"$id\"")
+ .contextWrite { it.put(CONTEXT_TX_KEY, id) }
+ }
+
+ private fun rollbackSaga(
+ id: String,
+ cause: String,
+ event: String?
+ ): Mono {
+ return publishSaga(
+ id, Saga(
+ id = id,
+ serverId = nodeName,
+ group = nodeGroup,
+ state = SagaState.ROLLBACK,
+ cause = cause,
+ event = event
+ )
+ )
+ }
+
+ final override fun commit(id: String): Mono {
+ return exists(id)
+ .infoOnError("Cannot commit saga cause, saga \"$id\" is not exists")
+ .flatMap { commitSaga(id, null) }
+ .info("Commit saga \"$id\"")
+ .contextWrite { it.put(CONTEXT_TX_KEY, id) }
+ }
+
+ override fun commit(id: String, event: T): Mono {
+ return exists(id)
+ .infoOnError("Cannot commit saga cause, saga \"$id\" is not exists")
+ .map { codec.encode(event) }
+ .flatMap { encodedEvent ->
+ commitSaga(id, encodedEvent)
+ }
+ .info("Commit saga \"$id\"")
+ .contextWrite { it.put(CONTEXT_TX_KEY, id) }
+ }
+
+ private fun commitSaga(id: String, event: String?): Mono {
+ return publishSaga(
+ id, Saga(
+ id = id,
+ serverId = nodeName,
+ group = nodeGroup,
+ state = SagaState.COMMIT,
+ event = event,
+ )
+ )
+ }
+
+ final override fun exists(id: String): Mono {
+ return getAnySaga(id)
+ .infoOnError("There is no saga corresponding to id \"$id\"")
+ .mapSagaId()
+ .contextWrite { it.put(CONTEXT_TX_KEY, id) }
+ }
+
+ protected abstract fun getAnySaga(id: String): Mono
+
+ private fun Mono<*>.mapSagaId(): Mono {
+ return this.flatMap {
+ Mono.deferContextual { Mono.just(it[CONTEXT_TX_KEY]) }
+ }
+ }
+
+ protected abstract fun publishSaga(
+ id: String,
+ saga: Saga,
+ ): Mono
+
+ private companion object {
+ private const val CONTEXT_TX_KEY = "sagaId"
+ }
+}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaRetrySupporter.kt
similarity index 67%
rename from src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
rename to src/main/kotlin/org/rooftop/netx/engine/AbstractSagaRetrySupporter.kt
index e1876d2..c99f1af 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractSagaRetrySupporter.kt
@@ -1,7 +1,7 @@
package org.rooftop.netx.engine
import jakarta.annotation.PreDestroy
-import org.rooftop.netx.engine.core.Transaction
+import org.rooftop.netx.engine.core.Saga
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.publisher.Flux
@@ -12,35 +12,35 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
-internal abstract class AbstractTransactionRetrySupporter(
+internal abstract class AbstractSagaRetrySupporter(
private val backpressureSize: Int,
private val recoveryMilli: Long,
- private val transactionDispatcher: AbstractTransactionDispatcher,
+ private val sagaDispatcher: AbstractSagaDispatcher,
) {
private lateinit var executor: ScheduledExecutorService;
private lateinit var scheduledFuture: ScheduledFuture<*>;
- fun watchOrphanTransaction() {
+ fun watchOrphanSaga() {
this.executor = Executors.newSingleThreadScheduledExecutor()
this.scheduledFuture = executor.scheduleWithFixedDelay(
- handleOrphanTransaction(),
+ handleOrphanSaga(),
0,
recoveryMilli,
TimeUnit.MILLISECONDS,
)
}
- private fun handleOrphanTransaction(): Runnable {
+ private fun handleOrphanSaga(): Runnable {
return Runnable {
- claimOrphanTransaction(backpressureSize)
+ claimOrphanSaga(backpressureSize)
.doOnNext {
- info("Retry orphan transaction ${it.first}\nmessageId \"${it.second}\"")
+ info("Retry orphan saga ${it.first}\nmessageId \"${it.second}\"")
}
- .flatMap { (transaction, messageId) ->
- transactionDispatcher.dispatch(transaction, messageId)
- .warningOnError("Error occurred when retry orphan transaction \"${transaction.id}\"")
+ .flatMap { (saga, messageId) ->
+ sagaDispatcher.dispatch(saga, messageId)
+ .warningOnError("Error occurred when retry orphan saga \"${saga.id}\"")
}
.onErrorResume { Mono.empty() }
.subscribeOn(Schedulers.immediate())
@@ -48,7 +48,7 @@ internal abstract class AbstractTransactionRetrySupporter(
}
}
- protected abstract fun claimOrphanTransaction(backpressureSize: Int): Flux>
+ protected abstract fun claimOrphanSaga(backpressureSize: Int): Flux>
@PreDestroy
private fun shutdownGracefully() {
@@ -58,14 +58,14 @@ internal abstract class AbstractTransactionRetrySupporter(
if (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
executor.shutdownNow()
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
- org.rooftop.netx.engine.logging.error("Cannot shutdown TransactionRetrySupporter thread")
+ org.rooftop.netx.engine.logging.error("Cannot shutdown SagaRetrySupporter thread")
}
}
}.onFailure {
executor.shutdownNow()
Thread.currentThread().interrupt()
}.onSuccess {
- info("Shutdown TransactionRetrySupporter gracefully")
+ info("Shutdown SagaRetrySupporter gracefully")
}
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
deleted file mode 100644
index 05b7468..0000000
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
+++ /dev/null
@@ -1,231 +0,0 @@
-package org.rooftop.netx.engine
-
-import org.rooftop.netx.api.AlreadyCommittedTransactionException
-import org.rooftop.netx.api.Codec
-import org.rooftop.netx.api.TransactionException
-import org.rooftop.netx.api.TransactionManager
-import org.rooftop.netx.engine.core.Transaction
-import org.rooftop.netx.engine.core.TransactionState
-import org.rooftop.netx.engine.logging.info
-import org.rooftop.netx.engine.logging.infoOnError
-import org.rooftop.netx.engine.logging.warningOnError
-import reactor.core.publisher.Mono
-
-internal abstract class AbstractTransactionManager(
- private val codec: Codec,
- private val nodeGroup: String,
- private val nodeName: String,
- private val transactionIdGenerator: TransactionIdGenerator,
-) : TransactionManager {
-
- override fun syncStart(): String {
- return start().block()
- ?: throw TransactionException("Cannot start transaction")
- }
-
- final override fun syncStart(event: T): String {
- return start(event).block()
- ?: throw TransactionException("Cannot start transaction \"$event\"")
- }
-
- override fun syncJoin(transactionId: String): String {
- return join(transactionId).block()
- ?: throw TransactionException("Cannot join transaction \"$transactionId\"")
- }
-
- final override fun syncJoin(transactionId: String, event: T): String {
- return join(transactionId, event).block()
- ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$event\"")
- }
-
- final override fun syncExists(transactionId: String): String {
- return exists(transactionId).block()
- ?: throw TransactionException("Cannot exists transaction \"$transactionId\"")
- }
-
- final override fun syncCommit(transactionId: String): String {
- return commit(transactionId).block()
- ?: throw TransactionException("Cannot commit transaction \"$transactionId\"")
- }
-
- override fun syncCommit(transactionId: String, event: T): String {
- return commit(transactionId, event).block()
- ?: throw TransactionException("Cannot commit transaction \"$transactionId\" \"$event\"")
- }
-
- final override fun syncRollback(transactionId: String, cause: String): String {
- return rollback(transactionId, cause).block()
- ?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\"")
- }
-
- override fun syncRollback(transactionId: String, cause: String, event: T): String {
- return rollback(transactionId, cause, event).block()
- ?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"")
- }
-
- override fun start(): Mono {
- return startTransaction(null)
- .info("Start transaction")
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
- }
-
- final override fun start(event: T): Mono {
- return Mono.fromCallable { codec.encode(event) }
- .flatMap { encodedEvent ->
- startTransaction(encodedEvent)
- .info("Start transaction event \"$event\"")
- }
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
- }
-
- private fun startTransaction(event: String?): Mono {
- return Mono.deferContextual { Mono.just(it[CONTEXT_TX_KEY]) }
- .flatMap { transactionId ->
- publishTransaction(
- transactionId, Transaction(
- id = transactionId,
- serverId = nodeName,
- group = nodeGroup,
- state = TransactionState.START,
- event = event,
- )
- )
- }
- }
-
- override fun join(transactionId: String): Mono {
- return getAnyTransaction(transactionId)
- .map {
- if (it == TransactionState.COMMIT) {
- throw AlreadyCommittedTransactionException(transactionId, it.name)
- }
- transactionId
- }
- .warningOnError("Cannot join transaction")
- .flatMap {
- joinTransaction(transactionId, null)
- .info("Join transaction transactionId \"$transactionId\"")
- }
- }
-
- override fun join(transactionId: String, event: T): Mono {
- return getAnyTransaction(transactionId)
- .map {
- if (it == TransactionState.COMMIT) {
- throw AlreadyCommittedTransactionException(transactionId, it.name)
- }
- transactionId
- }
- .warningOnError("Cannot join transaction")
- .map { codec.encode(event) }
- .flatMap {
- joinTransaction(transactionId, it)
- .info("Join transaction transactionId \"$transactionId\", event \"$event\"")
- }
- }
-
- private fun joinTransaction(transactionId: String, event: String?): Mono {
- return publishTransaction(
- transactionId, Transaction(
- id = transactionId,
- serverId = nodeName,
- group = nodeGroup,
- state = TransactionState.JOIN,
- event = event,
- )
- )
- }
-
- final override fun rollback(transactionId: String, cause: String): Mono {
- return exists(transactionId)
- .infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
- .flatMap {
- rollbackTransaction(transactionId, cause, null)
- }
- .info("Rollback transaction \"$transactionId\"")
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
- }
-
- override fun rollback(transactionId: String, cause: String, event: T): Mono {
- return exists(transactionId)
- .infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
- .map { codec.encode(event) }
- .flatMap { encodedEvent ->
- rollbackTransaction(transactionId, cause, encodedEvent)
- }
- .info("Rollback transaction \"$transactionId\"")
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
- }
-
- private fun rollbackTransaction(
- transactionId: String,
- cause: String,
- event: String?
- ): Mono {
- return publishTransaction(
- transactionId, Transaction(
- id = transactionId,
- serverId = nodeName,
- group = nodeGroup,
- state = TransactionState.ROLLBACK,
- cause = cause,
- event = event
- )
- )
- }
-
- final override fun commit(transactionId: String): Mono {
- return exists(transactionId)
- .infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
- .flatMap { commitTransaction(transactionId, null) }
- .info("Commit transaction \"$transactionId\"")
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
- }
-
- override fun commit(transactionId: String, event: T): Mono {
- return exists(transactionId)
- .infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
- .map { codec.encode(event) }
- .flatMap { encodedEvent ->
- commitTransaction(transactionId, encodedEvent)
- }
- .info("Commit transaction \"$transactionId\"")
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
- }
-
- private fun commitTransaction(transactionId: String, event: String?): Mono {
- return publishTransaction(
- transactionId, Transaction(
- id = transactionId,
- serverId = nodeName,
- group = nodeGroup,
- state = TransactionState.COMMIT,
- event = event,
- )
- )
- }
-
- final override fun exists(transactionId: String): Mono {
- return getAnyTransaction(transactionId)
- .infoOnError("There is no transaction corresponding to transactionId \"$transactionId\"")
- .mapTransactionId()
- .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
- }
-
- protected abstract fun getAnyTransaction(transactionId: String): Mono
-
- private fun Mono<*>.mapTransactionId(): Mono {
- return this.flatMap {
- Mono.deferContextual { Mono.just(it["transactionId"]) }
- }
- }
-
- protected abstract fun publishTransaction(
- transactionId: String,
- transaction: Transaction,
- ): Mono
-
- private companion object {
- private const val CONTEXT_TX_KEY = "transactionId"
- }
-}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
index 08e4ffa..bc06ca8 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
@@ -44,7 +44,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = JoinOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
orchestrateCommand = OrchestrateCommand(
@@ -113,7 +113,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = MonoJoinOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
monoOrchestrateCommand = MonoOrchestrateCommand(
@@ -172,7 +172,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = CommitOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
orchestrateCommand = OrchestrateCommand(commandType, chainContainer.codec, function),
@@ -187,7 +187,7 @@ class DefaultOrchestrateChain private constru
) = rollback?.let {
RollbackOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
rollbackCommand = RollbackCommand(commandType, chainContainer.codec, it),
@@ -214,7 +214,7 @@ class DefaultOrchestrateChain private constru
val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
return@cache OrchestratorManager(
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
codec = chainContainer.codec,
orchestratorId = orchestratorId,
resultHolder = chainContainer.resultHolder,
@@ -266,7 +266,7 @@ class DefaultOrchestrateChain private constru
val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
return@cache OrchestratorManager(
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
codec = chainContainer.codec,
orchestratorId = orchestratorId,
resultHolder = chainContainer.resultHolder,
@@ -373,8 +373,8 @@ class DefaultOrchestrateChain private constru
private fun addDispatcher(orchestrateListeners: List, AbstractOrchestrateListener?>>) {
orchestrateListeners.forEach { (listener, rollbackListener) ->
- chainContainer.transactionDispatcher.addOrchestrate(listener)
- rollbackListener?.let { chainContainer.transactionDispatcher.addOrchestrate(it) }
+ chainContainer.sagaDispatcher.addOrchestrate(listener)
+ rollbackListener?.let { chainContainer.sagaDispatcher.addOrchestrate(it) }
}
}
@@ -383,7 +383,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = MonoCommitOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
monoOrchestrateCommand = MonoOrchestrateCommand(
@@ -402,7 +402,7 @@ class DefaultOrchestrateChain private constru
) = rollback?.let {
MonoRollbackOrchestrateListener(
codec = chainContainer.codec,
- transactionManager = chainContainer.transactionManager,
+ sagaManager = chainContainer.sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
monoRollbackCommand = MonoRollbackCommand(
@@ -418,8 +418,8 @@ class DefaultOrchestrateChain private constru
internal class Pre internal constructor(
private val orchestratorId: String,
- private val transactionManager: TransactionManager,
- private val transactionDispatcher: AbstractTransactionDispatcher,
+ private val sagaManager: SagaManager,
+ private val sagaDispatcher: AbstractSagaDispatcher,
private val codec: Codec,
private val resultHolder: ResultHolder,
private val requestHolder: RequestHolder,
@@ -467,7 +467,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = StartOrchestrateListener(
codec = codec,
- transactionManager = transactionManager,
+ sagaManager = sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
orchestrateCommand = OrchestrateCommand(
@@ -486,7 +486,7 @@ class DefaultOrchestrateChain private constru
) = rollback?.let {
RollbackOrchestrateListener(
codec = codec,
- transactionManager = transactionManager,
+ sagaManager = sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
rollbackCommand = RollbackCommand(
@@ -541,7 +541,7 @@ class DefaultOrchestrateChain private constru
function: TypeReified,
) = MonoStartOrchestrateListener(
codec = codec,
- transactionManager = transactionManager,
+ sagaManager = sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
monoOrchestrateCommand = MonoOrchestrateCommand(
@@ -560,7 +560,7 @@ class DefaultOrchestrateChain private constru
) = rollback?.let {
MonoRollbackOrchestrateListener(
codec = codec,
- transactionManager = transactionManager,
+ sagaManager = sagaManager,
orchestratorId = orchestratorId,
orchestrateSequence = 0,
monoRollbackCommand = MonoRollbackCommand(
@@ -575,8 +575,8 @@ class DefaultOrchestrateChain private constru
}
private fun getStreamContainer(): ChainContainer = ChainContainer(
- transactionManager,
- transactionDispatcher,
+ sagaManager,
+ sagaDispatcher,
codec,
resultHolder,
requestHolder,
@@ -589,8 +589,8 @@ class DefaultOrchestrateChain private constru
}
private data class ChainContainer(
- val transactionManager: TransactionManager,
- val transactionDispatcher: AbstractTransactionDispatcher,
+ val sagaManager: SagaManager,
+ val sagaDispatcher: AbstractSagaDispatcher,
val codec: Codec,
val resultHolder: ResultHolder,
val requestHolder: RequestHolder,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
index 2fefc4f..6444c7b 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
@@ -1,13 +1,13 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.api.TransactionEvent
-import org.rooftop.netx.api.TransactionManager
+import org.rooftop.netx.api.SagaEvent
+import org.rooftop.netx.api.SagaManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
-internal fun Mono.callMono(function: MonoDispatchFunction): Mono<*> {
+internal fun Mono.callMono(function: MonoDispatchFunction): Mono<*> {
return this.flatMap {
function.call(it)
}
@@ -18,34 +18,34 @@ internal class MonoDispatchFunction(
function: KFunction>,
handler: Any,
noRetryFor: Array>,
- nextState: NextTransactionState,
- transactionManager: TransactionManager,
+ nextState: NextSagaState,
+ sagaManager: SagaManager,
) : AbstractDispatchFunction>(
eventType,
function,
handler,
noRetryFor,
nextState,
- transactionManager,
+ sagaManager,
) {
- override fun call(transactionEvent: TransactionEvent): Mono<*> {
- return Mono.just(transactionEvent)
- .filter { isProcessable(transactionEvent) }
- .map { transactionEvent.copy() }
- .flatMap { function.call(handler, transactionEvent) }
- .info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ override fun call(sagaEvent: SagaEvent): Mono<*> {
+ return Mono.just(sagaEvent)
+ .filter { isProcessable(sagaEvent) }
+ .map { sagaEvent.copy() }
+ .flatMap { function.call(handler, sagaEvent) }
+ .info("Call Mono SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"")
.switchIfEmpty(`continue`)
.doOnNext {
- if (isProcessable(transactionEvent)) {
- publishNextTransaction(transactionEvent)
+ if (isProcessable(sagaEvent)) {
+ publishNextSaga(sagaEvent)
}
}
.onErrorResume {
if (isNoRollbackFor(it)) {
return@onErrorResume noRollbackFor
}
- rollback(transactionEvent, it)
+ rollback(sagaEvent, it)
`continue`
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
index cd263d2..6a4163b 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
@@ -1,13 +1,13 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.api.TransactionEvent
-import org.rooftop.netx.api.TransactionManager
+import org.rooftop.netx.api.SagaEvent
+import org.rooftop.netx.api.SagaManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
-internal fun Mono.callNotPublish(function: NotPublishDispatchFunction): Mono<*> {
+internal fun Mono.callNotPublish(function: NotPublishDispatchFunction): Mono<*> {
return this.map { function.call(it) }
}
@@ -16,29 +16,29 @@ internal class NotPublishDispatchFunction(
function: KFunction<*>,
handler: Any,
noRollbackFor: Array>,
- nextState: NextTransactionState,
- transactionManager: TransactionManager,
+ nextState: NextSagaState,
+ sagaManager: SagaManager,
) : AbstractDispatchFunction(
eventType,
function,
handler,
noRollbackFor,
nextState,
- transactionManager,
+ sagaManager,
) {
- override fun call(transactionEvent: TransactionEvent): Any {
- if (isProcessable(transactionEvent)) {
+ override fun call(sagaEvent: SagaEvent): Any {
+ if (isProcessable(sagaEvent)) {
return runCatching {
- function.call(handler, transactionEvent)
- info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ function.call(handler, sagaEvent)
+ info("Call NotPublisher SagaHandler \"${name()}\" with id \"${sagaEvent.id}\"")
}.fold(
- onSuccess = { publishNextTransaction(transactionEvent) },
+ onSuccess = { publishNextSaga(sagaEvent) },
onFailure = {
if (isNoRollbackFor(it)) {
return@fold NO_ROLLBACK_FOR
}
- rollback(transactionEvent, it)
+ rollback(sagaEvent, it)
},
)
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt
index 6c88835..77ae940 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt
@@ -1,13 +1,13 @@
package org.rooftop.netx.engine
-import org.rooftop.netx.api.TransactionEvent
-import org.rooftop.netx.api.TransactionManager
+import org.rooftop.netx.api.SagaEvent
+import org.rooftop.netx.api.SagaManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
-internal fun Mono.callOrchestrate(function: OrchestrateDispatchFunction): Mono<*> {
+internal fun Mono.callOrchestrate(function: OrchestrateDispatchFunction): Mono<*> {
return this.flatMap {
function.call(it)
}
@@ -18,25 +18,25 @@ internal class OrchestrateDispatchFunction(
function: KFunction>,
handler: Any,
noRetryFor: Array>,
- nextState: NextTransactionState,
- transactionManager: TransactionManager,
+ nextState: NextSagaState,
+ sagaManager: SagaManager,
) : AbstractDispatchFunction>(
eventType,
function,
handler,
noRetryFor,
nextState,
- transactionManager,
+ sagaManager,
) {
- override fun call(transactionEvent: TransactionEvent): Mono<*> {
- return Mono.just(transactionEvent)
- .filter { isProcessable(transactionEvent) }
- .map { transactionEvent.copy() }
- .flatMap { function.call(handler, transactionEvent) }
- .info("Call OrchestrateHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ override fun call(sagaEvent: SagaEvent): Mono<*> {
+ return Mono.just(sagaEvent)
+ .filter { isProcessable(sagaEvent) }
+ .map { sagaEvent.copy() }
+ .flatMap { function.call(handler, sagaEvent) }
+ .info("Call OrchestrateHandler \"${name()}\" with id \"${sagaEvent.id}\"")
.map {
- publishNextTransaction(transactionEvent)
+ publishNextSaga(sagaEvent)
it
}
.switchIfEmpty(`continue`)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
index 4a0b43f..3c90546 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorFactory.kt
@@ -4,8 +4,8 @@ import org.rooftop.netx.api.*
import org.rooftop.netx.api.OrchestratorFactory
class OrchestratorFactory internal constructor(
- private val transactionManager: TransactionManager,
- private val transactionDispatcher: AbstractTransactionDispatcher,
+ private val sagaManager: SagaManager,
+ private val sagaDispatcher: AbstractSagaDispatcher,
private val codec: Codec,
private val resultHolder: ResultHolder,
private val requestHolder: RequestHolder,
@@ -19,8 +19,8 @@ class OrchestratorFactory internal constructor(
override fun create(orchestratorId: String): OrchestrateChain.Pre {
return DefaultOrchestrateChain.Pre(
orchestratorId = orchestratorId,
- transactionManager = transactionManager,
- transactionDispatcher = transactionDispatcher,
+ sagaManager = sagaManager,
+ sagaDispatcher = sagaDispatcher,
codec = codec,
resultHolder = resultHolder,
requestHolder = requestHolder,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
index c4f6888..002ab8c 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
@@ -1,11 +1,12 @@
package org.rooftop.netx.engine
import org.rooftop.netx.api.*
+import org.rooftop.netx.engine.listen.AbstractOrchestrateListener
import reactor.core.publisher.Mono
import kotlin.time.Duration.Companion.milliseconds
class OrchestratorManager internal constructor(
- private val transactionManager: TransactionManager,
+ private val sagaManager: SagaManager,
private val codec: Codec,
private val orchestratorId: String,
private val resultHolder: ResultHolder,
@@ -13,43 +14,43 @@ class OrchestratorManager internal constructor(
private val rollbackOrchestrateListener: AbstractOrchestrateListener?,
) : Orchestrator {
- override fun transactionSync(request: T): Result {
- return transaction(request).block()
- ?: throw TransactionException("Cannot start transaction \"$request\"")
+ override fun sagaSync(request: T): Result {
+ return saga(request).block()
+ ?: throw SagaException("Cannot start saga \"$request\"")
}
- override fun transactionSync(timeoutMillis: Long, request: T): Result {
- return transaction(timeoutMillis, request).block()
- ?: throw TransactionException("Cannot start transaction \"$request\"")
+ override fun sagaSync(timeoutMillis: Long, request: T): Result {
+ return saga(timeoutMillis, request).block()
+ ?: throw SagaException("Cannot start saga \"$request\"")
}
- override fun transactionSync(request: T, context: MutableMap): Result {
- return transaction(request, context).block()
- ?: throw TransactionException("Cannot start transaction \"$request\"")
+ override fun sagaSync(request: T, context: MutableMap): Result {
+ return saga(request, context).block()
+ ?: throw SagaException("Cannot start saga \"$request\"")
}
- override fun transactionSync(
+ override fun sagaSync(
timeoutMillis: Long,
request: T,
context: MutableMap
): Result {
- return transaction(timeoutMillis, request, context).block()
- ?: throw TransactionException("Cannot start transaction \"$request\"")
+ return saga(timeoutMillis, request, context).block()
+ ?: throw SagaException("Cannot start saga \"$request\"")
}
- override fun transaction(request: T): Mono> {
- return transaction(TEN_SECONDS_TO_TIME_OUT, request, mutableMapOf())
+ override fun saga(request: T): Mono> {
+ return saga(TEN_SECONDS_TO_TIME_OUT, request, mutableMapOf())
}
- override fun transaction(timeoutMillis: Long, request: T): Mono> {
- return transaction(timeoutMillis, request, mutableMapOf())
+ override fun saga(timeoutMillis: Long, request: T): Mono> {
+ return saga(timeoutMillis, request, mutableMapOf())
}
- override fun transaction(request: T, context: MutableMap): Mono> {
- return transaction(TEN_SECONDS_TO_TIME_OUT, request, context)
+ override fun saga(request: T, context: MutableMap): Mono> {
+ return saga(TEN_SECONDS_TO_TIME_OUT, request, context)
}
- override fun transaction(
+ override fun saga(
timeoutMillis: Long,
request: T,
context: MutableMap
@@ -66,7 +67,7 @@ class OrchestratorManager internal constructor(
context = codec.encode(context.mapValues { codec.encode(it.value) })
)
}
- .flatMap { transactionManager.start(it) }
+ .flatMap { sagaManager.start(it) }
.flatMap { resultHolder.getResult(timeoutMillis.milliseconds, it) }
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/ResultHolder.kt b/src/main/kotlin/org/rooftop/netx/engine/ResultHolder.kt
index 8efe09d..2c8037c 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/ResultHolder.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/ResultHolder.kt
@@ -6,9 +6,9 @@ import kotlin.time.Duration
internal interface ResultHolder {
- fun getResult(timeout: Duration, transactionId: String): Mono>
+ fun getResult(timeout: Duration, id: String): Mono>
- fun setSuccessResult(transactionId: String, result: T): Mono
+ fun setSuccessResult(id: String, result: T): Mono
- fun setFailResult(transactionId: String, result: T) : Mono
+ fun setFailResult(id: String, result: T) : Mono
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/TransactionIdGenerator.kt b/src/main/kotlin/org/rooftop/netx/engine/SagaIdGenerator.kt
similarity index 89%
rename from src/main/kotlin/org/rooftop/netx/engine/TransactionIdGenerator.kt
rename to src/main/kotlin/org/rooftop/netx/engine/SagaIdGenerator.kt
index 968a13a..6327639 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/TransactionIdGenerator.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/SagaIdGenerator.kt
@@ -2,7 +2,7 @@ package org.rooftop.netx.engine
import com.github.f4b6a3.tsid.TsidFactory
-class TransactionIdGenerator(
+class SagaIdGenerator(
nodeId: Int,
private val tsidFactory: TsidFactory = TsidFactory.newInstance256(nodeId),
) {
diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt
similarity index 72%
rename from src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt
rename to src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt
index dce8ef3..84c4c18 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/core/Saga.kt
@@ -1,10 +1,10 @@
package org.rooftop.netx.engine.core
-internal data class Transaction(
+internal data class Saga(
val id: String,
val serverId: String,
val group: String,
- val state: TransactionState,
+ val state: SagaState,
val cause: String? = null,
val event: String? = null,
)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt b/src/main/kotlin/org/rooftop/netx/engine/core/SagaState.kt
similarity index 70%
rename from src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt
rename to src/main/kotlin/org/rooftop/netx/engine/core/SagaState.kt
index f5697df..d5f4d81 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/core/SagaState.kt
@@ -1,6 +1,6 @@
package org.rooftop.netx.engine.core
-internal enum class TransactionState {
+internal enum class SagaState {
JOIN,
COMMIT,
ROLLBACK,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
similarity index 81%
rename from src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
rename to src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
index 1ef3baa..43decd5 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/AbstractOrchestrateListener.kt
@@ -1,6 +1,9 @@
-package org.rooftop.netx.engine
+package org.rooftop.netx.engine.listen
import org.rooftop.netx.api.*
+import org.rooftop.netx.engine.OrchestrateEvent
+import org.rooftop.netx.engine.RequestHolder
+import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
@@ -9,7 +12,7 @@ internal abstract class AbstractOrchestrateListener internal c
private val orchestratorId: String,
internal val orchestrateSequence: Int,
private val codec: Codec,
- private val transactionManager: TransactionManager,
+ private val sagaManager: SagaManager,
private val requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
private val typeReference: TypeReference?,
@@ -48,28 +51,28 @@ internal abstract class AbstractOrchestrateListener internal c
}
}
- protected fun orchestrate(transactionEvent: TransactionEvent): Mono {
- return transactionEvent.startWithOrchestrateEvent()
+ protected fun orchestrate(sagaEvent: SagaEvent): Mono {
+ return sagaEvent.startWithOrchestrateEvent()
.filter {
it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
}
.mapReifiedRequest()
.flatMap { (request, event) ->
- holdRequestIfRollbackable(request, transactionEvent.transactionId)
+ holdRequestIfRollbackable(request, sagaEvent.id)
.map { it to event }
}
.flatMap { (request, event) -> command(request, event) }
.setNextCastableType()
.doOnError {
rollback(
- transactionEvent.transactionId,
+ sagaEvent.id,
it,
- transactionEvent.decodeEvent(OrchestrateEvent::class)
+ sagaEvent.decodeEvent(OrchestrateEvent::class)
)
}
.toOrchestrateEvent()
.map {
- transactionEvent.setNextEvent(it)
+ sagaEvent.setNextEvent(it)
}
}
@@ -86,9 +89,9 @@ internal abstract class AbstractOrchestrateListener internal c
}
}
- protected fun Mono.getHeldRequest(transactionEvent: TransactionEvent): Mono> {
+ protected fun Mono.getHeldRequest(sagaEvent: SagaEvent): Mono> {
return this.flatMap { event ->
- val key = "${transactionEvent.transactionId}:$orchestrateSequence"
+ val key = "${sagaEvent.id}:$orchestrateSequence"
if (typeReference == null) {
return@flatMap requestHolder.getRequest(key, getCastableType())
.map { it to event }
@@ -97,12 +100,12 @@ internal abstract class AbstractOrchestrateListener internal c
}
}
- protected fun holdRequestIfRollbackable(request: T, transactionId: String): Mono {
+ protected fun holdRequestIfRollbackable(request: T, id: String): Mono {
if (!isRollbackable) {
Mono.just(request)
}
return requestHolder.setRequest(
- "$transactionId:$orchestrateSequence",
+ "$id:$orchestrateSequence",
request
)
}
@@ -129,7 +132,7 @@ internal abstract class AbstractOrchestrateListener internal c
} ?: throw NullPointerException("Cannot cast \"$data\" cause, castableType is null")
}
- protected fun TransactionEvent.startWithOrchestrateEvent(): Mono =
+ protected fun SagaEvent.startWithOrchestrateEvent(): Mono =
Mono.just(this.decodeEvent(OrchestrateEvent::class))
private fun Throwable.toEmptyStackTrace(): Throwable {
@@ -138,7 +141,7 @@ internal abstract class AbstractOrchestrateListener internal c
}
protected fun rollback(
- transactionId: String,
+ id: String,
throwable: Throwable,
orchestrateEvent: OrchestrateEvent,
) {
@@ -149,18 +152,18 @@ internal abstract class AbstractOrchestrateListener internal c
"",
orchestrateEvent.context,
)
- holdFailResult(transactionId, throwable)
+ holdFailResult(id, throwable)
.flatMap {
- transactionManager.rollback(
- transactionId = transactionId,
+ sagaManager.rollback(
+ id = id,
cause = throwable.message ?: throwable.localizedMessage,
event = rollbackOrchestrateEvent
)
}.subscribeOn(Schedulers.parallel()).subscribe()
}
- private fun holdFailResult(transactionId: String, throwable: Throwable): Mono {
- return resultHolder.setFailResult(transactionId, throwable.toEmptyStackTrace())
+ private fun holdFailResult(id: String, throwable: Throwable): Mono {
+ return resultHolder.setFailResult(id, throwable.toEmptyStackTrace())
}
open fun withAnnotated(): AbstractOrchestrateListener {
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
index 248e4aa..1e17cb0 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
@@ -1,7 +1,6 @@
package org.rooftop.netx.engine.listen
import org.rooftop.netx.api.*
-import org.rooftop.netx.engine.AbstractOrchestrateListener
import org.rooftop.netx.engine.OrchestrateEvent
import org.rooftop.netx.engine.RequestHolder
import org.rooftop.netx.engine.ResultHolder
@@ -9,7 +8,7 @@ import reactor.core.publisher.Mono
internal class CommitOrchestrateListener internal constructor(
codec: Codec,
- transactionManager: TransactionManager,
+ sagaManager: SagaManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateCommand: OrchestrateCommand