Skip to content

Commit

Permalink
refactor: orchestrator's rollback start previous sequence (#138)
Browse files Browse the repository at this point in the history
* test: Orchestrator Result test 를 추가한다

* refactor: Orchestrator도중에 예외가 발생하면, 이전 chain부터 rollback이 수행되도록 한다

* docs: Readme.md에 rollback 발생 순서를 수정한다

* build: version을 0.4.2에서 0.4.3으로 올린다
  • Loading branch information
devxb authored Apr 27, 2024
1 parent 5fd082f commit 31b0393
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 52 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<br>

![version 0.4.2](https://img.shields.io/badge/version-0.4.2-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.4.3](https://img.shields.io/badge/version-0.4.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

**TPS(6,000)** on my Macbook air m2(default options). _[link](#Test1-TPS)_
Expand Down Expand Up @@ -120,12 +120,8 @@ class OrchestratorConfigurer(
)
.commit(
orchestrate = { request ->
// When an error occurs, all rollbacks are called from the bottom up,
// starting from the location where the error occurred.
// If a rollback occurs here, all the above rollback functions will be executed sequentially.
throw IllegalArgumentException("Oops! Something went wrong..")
},
rollback = { request ->
// ...
}
)
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftopmsa
version=0.4.2
version=0.4.3
compatibility=17

### Sonarcloud ###
Expand Down
8 changes: 0 additions & 8 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,43 +63,35 @@ interface OrchestrateChain<OriginReq : Any, T : Any, V : Any> {
* Commits the saga with the operation.
*
* @param orchestrate Operation to be executed along with the commit.
* @param rollback Rollback function to be executed if an exception is thrown in the current orchestrate.
* @param S The final return value of Orchestrator.
* @return Orchestrator
* @see Orchestrate
* @see Rollback*
*/
fun <S : Any> commit(
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>? = null,
): Orchestrator<OriginReq, S>

/**
* @see commit
*/
fun <S : Any> commitReactive(
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

/**
* @param contextOrchestrate Allows using Context maintained in each Saga.
* @param contextRollback Allows using Context maintained in each Saga.
* @see commit
* @see contextOrchestrate
* @see contextRollback
*/
fun <S : Any> commitWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>? = null,
): Orchestrator<OriginReq, S>

/**
* @see commitWithContext
*/
fun <S : Any> commitReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

interface Pre<T : Any> {
Expand Down
30 changes: 10 additions & 20 deletions src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
Original file line number Diff line number Diff line change
Expand Up @@ -146,26 +146,20 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

override fun <S : Any> commit(
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>?,
): Orchestrator<OriginReq, S> {
val nextCommitOrchestrateListener =
getCommitOrchestrateListener<V, S>(CommandType.DEFAULT, orchestrate)
val nextRollbackOrchestrateListener =
getRollbackOrchestrateListener<V, S>(CommandType.DEFAULT, rollback)

return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextCommitOrchestrateListener)
}

override fun <S : Any> commitWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>?
): Orchestrator<OriginReq, S> {
val nextCommitOrchestrateListener =
getCommitOrchestrateListener<V, S>(CommandType.CONTEXT, contextOrchestrate)
val nextRollbackOrchestrateListener =
getRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)

return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextCommitOrchestrateListener)
}

private fun <T : Any, V : Any> getCommitOrchestrateListener(
Expand Down Expand Up @@ -200,15 +194,14 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

private fun <S : Any> createOrchestrator(
nextCommitOrchestrateListener: CommitOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: RollbackOrchestrateListener<V, S>?
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextCommitOrchestrateListener,
nextRollbackOrchestrateListener,
null,
this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
Expand All @@ -227,39 +220,32 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

override fun <S : Any> commitReactive(
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>?,
): Orchestrator<OriginReq, S> {
val nextJoinOrchestrateListener =
getMonoCommitOrchestrateListener<V, S>(CommandType.DEFAULT, orchestrate)
val nextRollbackOrchestrateListener =
getMonoRollbackOrchestrateListener<V, S>(CommandType.DEFAULT, rollback)

return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextJoinOrchestrateListener)
}

override fun <S : Any> commitReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>?
): Orchestrator<OriginReq, S> {
val nextJoinOrchestrateListener =
getMonoCommitOrchestrateListener<V, S>(CommandType.CONTEXT, contextOrchestrate)
val nextRollbackOrchestrateListener =
getMonoRollbackOrchestrateListener<V, S>(CommandType.CONTEXT, contextRollback)

return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener)
return createOrchestrator(nextJoinOrchestrateListener)
}

private fun <S : Any> createOrchestrator(
nextJoinOrchestrateListener: MonoCommitOrchestrateListener<V, S>,
nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener<V, S>?
): Orchestrator<OriginReq, S> {
return chainContainer.orchestratorCache.cache(orchestratorId) {
val nextDefaultOrchestrateChain = DefaultOrchestrateChain(
orchestratorId,
orchestrateSequence + 1,
chainContainer,
nextJoinOrchestrateListener,
nextRollbackOrchestrateListener,
null,
this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
Expand Down Expand Up @@ -330,6 +316,7 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat

private fun chainOrchestrateListeners(orchestrateListeners: List<Pair<AbstractOrchestrateListener<out Any, out Any>, AbstractOrchestrateListener<out Any, out Any>?>>) {
var rollbackSequence = 0
var beforeRollbackSequence = -1
for (listenerWithIdx in orchestrateListeners.withIndex()) {
val isFirst = listenerWithIdx.index == 0
val isLast =
Expand All @@ -341,6 +328,9 @@ internal class DefaultOrchestrateChain<OriginReq : Any, T : Any, V : Any> privat
rollbackSequence = it.orchestrateSequence
}
listener.rollbackSequence = rollbackSequence
listener.beforeRollbackOrchestrateSequence = beforeRollbackSequence

beforeRollbackSequence = rollbackSequence

listener.isFirst = isFirst
listener.isLast = isLast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
val rollbackOrchestrateEvent =
OrchestrateEvent(
orchestrateEvent.orchestratorId,
rollbackSequence,
beforeRollbackOrchestrateSequence,
"",
orchestrateEvent.context,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public Orchestrator<Integer, Integer> intOrchestrator() {
request -> request - 1
)
.commit(
request -> request + 1,
request -> request - 1
request -> request + 1
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class OrchestratorConfigurer {
Mono.fromCallable {
request + 1
}
}, contextRollback = { _, request -> Mono.fromCallable { request - 1 } })
})
}

object IntOrchestrator : Orchestrate<Int, Int> {
Expand Down
38 changes: 30 additions & 8 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ internal class OrchestratorConfigurer(
rollbackOrchestratorResult.add("4")
throw IllegalArgumentException("Rollback")
},
rollback = {
rollbackOrchestratorResult.add("-4")
}
)
}

Expand Down Expand Up @@ -141,10 +138,9 @@ internal class OrchestratorConfigurer(
val start1 = context.decodeContext("start-1", String::class)
val join2 = context.decodeContext("join-2", String::class)
val join3 = context.decodeContext("join-3", String::class)
val rCommit4 = context.decodeContext("r-commit-4", String::class)
val rJoin3 = context.decodeContext("r-join-3", String::class)

contextResult.addAll(listOf(start1, join2, join3, rCommit4, rJoin3))
contextResult.addAll(listOf(start1, join2, join3, rJoin3))
}
)
.joinWithContext(
Expand All @@ -171,9 +167,6 @@ internal class OrchestratorConfigurer(
context.set("commit-4", request)
throw IllegalArgumentException("Rollback")
},
contextRollback = { context, request ->
context.set("r-commit-4", "r$request")
}
)
}

Expand Down Expand Up @@ -237,6 +230,35 @@ internal class OrchestratorConfigurer(
.commit({ it })
}

@Bean(name = ["throwOnStartOrchestrator"])
fun throwOnStartOrchestrator(): Orchestrator<String, String> {
return OrchestratorFactory.instance()
.create<String>("throwOnStartOrchestrator")
.start(
orchestrate = {
throw IllegalArgumentException("Throw error for test.")
}
)
.commit({
"Never reach this line."
})
}

@Bean(name = ["throwOnJoinOrchestrator"])
fun throwOnJoinOrchestrator(): Orchestrator<String, String> {
return OrchestratorFactory.instance()
.create<String>("throwOnJoinOrchestrator")
.start({
"start success"
})
.join({
throw IllegalArgumentException("Throw error for test.")
})
.commit({
"Never reach this line."
})
}

object PairOrchestrate :
Orchestrate<Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>, Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>> {
override fun orchestrate(request: Pair<OrchestratorTest.Foo, OrchestratorTest.Foo>): Pair<OrchestratorTest.Foo, OrchestratorTest.Foo> {
Expand Down
33 changes: 28 additions & 5 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.equality.shouldBeEqualToComparingFields
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.api.TypeReference
import org.rooftop.netx.meta.EnableSaga
Expand Down Expand Up @@ -39,6 +38,8 @@ internal class OrchestratorTest(
@Qualifier("startWithContextOrchestrator") private val startWithContextOrchestrator: Orchestrator<String, String>,
@Qualifier("fooContextOrchestrator") private val fooContextOrchestrator: Orchestrator<String, List<Foo>>,
private val privateOrchestrator: Orchestrator<Private, Private>,
@Qualifier("throwOnStartOrchestrator") private val throwOnStartOrchestrator: Orchestrator<String, String>,
@Qualifier("throwOnJoinOrchestrator") private val throwOnJoinOrchestrator: Orchestrator<String, String>,
) : DescribeSpec({

describe("numberOrchestrator 구현채는") {
Expand Down Expand Up @@ -97,10 +98,10 @@ internal class OrchestratorTest(
}

describe("rollbackOrchestrator 구현채는") {
val expected = listOf("1", "2", "3", "4", "-4", "-3", "-1")
val expected = listOf("1", "2", "3", "4", "-3", "-1")

context("saga 메소드가 호출되면,") {
it("실패한 부분부터 위로 거슬러 올라가며 롤백한다") {
it("실패한 부분 위부터 위로 거슬러 올라가며 롤백한다") {
val result = rollbackOrchestrator.sagaSync("")

result.isSuccess shouldBeEqual false
Expand Down Expand Up @@ -134,7 +135,7 @@ internal class OrchestratorTest(
context("saga 메소드가 호출되면,") {
val expected = listOf("1", "2", "3", "4", "-3", "-1")

it("실패한 부분부터 위로 거슬러 올라가며 롤백한다.") {
it("실패한 부분위부터 위로 거슬러 올라가며 롤백한다.") {
val result = monoRollbackOrchestrator.sagaSync("")

result.isSuccess shouldBeEqual false
Expand All @@ -150,7 +151,7 @@ internal class OrchestratorTest(

describe("contextOrchestrator 구현채는") {
context("saga 메소드가 호출되면,") {
val expected = listOf("0", "1", "2", "r3", "r2")
val expected = listOf("0", "1", "2", "r2")

it("context 에서 아이템을 교환하며 Saga를 진행한다.") {
val result = contextOrchestrator.sagaSync("0")
Expand Down Expand Up @@ -225,6 +226,28 @@ internal class OrchestratorTest(
}
}
}

describe("throwOnStartOrchestrator 구현채는") {
context("start에서 예외가 던져지면,") {
it("해당 예외를 Result에서 throw한다.") {
shouldThrowWithMessage<IllegalArgumentException>("Throw error for test.") {
throwOnStartOrchestrator.sagaSync("throw error in start.")
.decodeResultOrThrow(String::class)
}
}
}
}

describe("throwOnJoinOrchestrator 구현채는") {
context("join에서 예외가 던져지면,") {
it("해당 예외를 Result에서 throw한다.") {
shouldThrowWithMessage<IllegalArgumentException>("Throw error for test.") {
throwOnJoinOrchestrator.sagaSync("throw error in join.")
.decodeResultOrThrow(String::class)
}
}
}
}
}) {
data class Home(
val address: String,
Expand Down

0 comments on commit 31b0393

Please sign in to comment.