From 87ff5c17b1bb03a4096cb816a0d24acc5c727a31 Mon Sep 17 00:00:00 2001 From: xb205 <62425964+devxb@users.noreply.github.com> Date: Sat, 23 Mar 2024 22:27:34 +0900 Subject: [PATCH] feat: Bind values to context in Orchestrator (#106) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: Orchestrate와 Rollback 인터페이스에 Function을 제거한다 * feat: Context binding in orchestrator * docs: Netx version 0.3.3 to 0.3.4 --- README.md | 14 +- src/main/kotlin/org/rooftop/netx/api/Codec.kt | 2 + .../kotlin/org/rooftop/netx/api/Context.kt | 19 + .../rooftop/netx/api/ContextOrchestrate.kt | 6 + .../org/rooftop/netx/api/ContextRollback.kt | 6 + ...{OrchestrateFunction.kt => Orchestrate.kt} | 2 +- .../org/rooftop/netx/api/OrchestrateChain.kt | 53 ++- .../api/{RollbackFunction.kt => Rollback.kt} | 2 +- .../org/rooftop/netx/api/TypeReference.kt | 14 + .../engine/AbstractOrchestrateListener.kt | 47 +- .../netx/engine/DefaultOrchestrateChain.kt | 407 +++++++++++++----- .../org/rooftop/netx/engine/JsonCodec.kt | 10 + .../rooftop/netx/engine/OrchestrateEvent.kt | 1 + .../netx/engine/OrchestratorManager.kt | 1 + .../rooftop/netx/engine/listen/CommandType.kt | 6 + .../listen/CommitOrchestrateListener.kt | 22 +- .../engine/listen/JoinOrchestrateListener.kt | 21 +- .../listen/MonoCommitOrchestrateListener.kt | 22 +- .../listen/MonoJoinOrchestrateListener.kt | 21 +- .../engine/listen/MonoOrchestrateCommand.kt | 32 ++ .../netx/engine/listen/MonoRollbackCommand.kt | 31 ++ .../listen/MonoRollbackOrchestrateListener.kt | 29 +- .../listen/MonoStartOrchestrateListener.kt | 21 +- .../netx/engine/listen/OrchestrateCommand.kt | 29 ++ .../netx/engine/listen/RollbackCommand.kt | 29 ++ .../listen/RollbackOrchestrateListener.kt | 35 +- .../engine/listen/StartOrchestrateListener.kt | 21 +- .../netx/client/OrchestratorConfigurer.kt | 6 +- .../netx/engine/OrchestratorConfigurer.kt | 88 +++- .../netx/engine/OrchestratorFactoryTest.kt | 2 +- .../rooftop/netx/engine/OrchestratorTest.kt | 42 ++ 31 files changed, 789 insertions(+), 252 deletions(-) create mode 100644 src/main/kotlin/org/rooftop/netx/api/Context.kt create mode 100644 src/main/kotlin/org/rooftop/netx/api/ContextOrchestrate.kt create mode 100644 src/main/kotlin/org/rooftop/netx/api/ContextRollback.kt rename src/main/kotlin/org/rooftop/netx/api/{OrchestrateFunction.kt => Orchestrate.kt} (55%) rename src/main/kotlin/org/rooftop/netx/api/{RollbackFunction.kt => Rollback.kt} (55%) create mode 100644 src/main/kotlin/org/rooftop/netx/api/TypeReference.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/listen/CommandType.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/listen/MonoOrchestrateCommand.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackCommand.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/listen/OrchestrateCommand.kt create mode 100644 src/main/kotlin/org/rooftop/netx/engine/listen/RollbackCommand.kt diff --git a/README.md b/README.md index 40d9654..17c7712 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@
-![version 0.3.3](https://img.shields.io/badge/version-0.3.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) +![version 0.3.4](https://img.shields.io/badge/version-0.3.4-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) Redis-Stream을 지원하는 Saga frame work 입니다. @@ -88,7 +88,7 @@ class OrchestratorConfigurer( fun orderOrchestartor(): Orchestrator { // return orchestratorFactory.create("orderOrchestrator") .start( - function = { order -> // its order type + orchestrate = { order -> // its order type // Start Transaction with your bussiness logic // something like ... "Check valid seller" return@start user @@ -98,13 +98,19 @@ class OrchestratorConfigurer( } ) .joinReactive( - function = { user -> // Before operations response type "User" flow here + orchestrate = { user -> // Before operations response type "User" flow here // Webflux supports, should return Mono type. }, // Can skip rollback operation, if you want ) + .joinWithContext( + contextOrchestrate = { context, request -> + context.set("key", request) // save data on context + context.decode("foo", Foo::class) // The context set in the upstream chain can be retrieved. + }, + ) .commit( - function = { request -> + orchestrate = { request -> // When an error occurs, all rollbacks are called from the bottom up, // starting from the location where the error occurred. throw IllegalArgumentException("Oops! Something went wrong..") diff --git a/src/main/kotlin/org/rooftop/netx/api/Codec.kt b/src/main/kotlin/org/rooftop/netx/api/Codec.kt index 728cc0c..84eb70c 100644 --- a/src/main/kotlin/org/rooftop/netx/api/Codec.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Codec.kt @@ -7,4 +7,6 @@ interface Codec { fun encode(data: T): String fun decode(data: String, type: KClass): T + + fun decode(data: String, type: TypeReference): T } diff --git a/src/main/kotlin/org/rooftop/netx/api/Context.kt b/src/main/kotlin/org/rooftop/netx/api/Context.kt new file mode 100644 index 0000000..0dcd3c9 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/Context.kt @@ -0,0 +1,19 @@ +package org.rooftop.netx.api + +import kotlin.reflect.KClass + +data class Context internal constructor( + private val codec: Codec, + internal val contexts: MutableMap, +) { + + fun set(key: String, value: T) { + contexts[key] = codec.encode(value) + } + + fun decodeContext(key: String, type: Class): T = decodeContext(key, type.kotlin) + + fun decodeContext(key: String, type: KClass): T = contexts[key]?.let { + codec.decode(it, type) + } ?: throw NullPointerException("Cannot find context by key \"$key\"") +} diff --git a/src/main/kotlin/org/rooftop/netx/api/ContextOrchestrate.kt b/src/main/kotlin/org/rooftop/netx/api/ContextOrchestrate.kt new file mode 100644 index 0000000..ca0e909 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/ContextOrchestrate.kt @@ -0,0 +1,6 @@ +package org.rooftop.netx.api + +fun interface ContextOrchestrate { + + fun orchestrate(context: Context, request: T): V +} diff --git a/src/main/kotlin/org/rooftop/netx/api/ContextRollback.kt b/src/main/kotlin/org/rooftop/netx/api/ContextRollback.kt new file mode 100644 index 0000000..07e8d47 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/ContextRollback.kt @@ -0,0 +1,6 @@ +package org.rooftop.netx.api + +fun interface ContextRollback { + + fun rollback(context: Context, request: T): V +} diff --git a/src/main/kotlin/org/rooftop/netx/api/OrchestrateFunction.kt b/src/main/kotlin/org/rooftop/netx/api/Orchestrate.kt similarity index 55% rename from src/main/kotlin/org/rooftop/netx/api/OrchestrateFunction.kt rename to src/main/kotlin/org/rooftop/netx/api/Orchestrate.kt index c8472d9..d14d841 100644 --- a/src/main/kotlin/org/rooftop/netx/api/OrchestrateFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Orchestrate.kt @@ -1,6 +1,6 @@ package org.rooftop.netx.api -fun interface OrchestrateFunction { +fun interface Orchestrate { fun orchestrate(request: T): V } diff --git a/src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt index 4d90d8f..40c5782 100644 --- a/src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt +++ b/src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt @@ -6,35 +6,64 @@ import reactor.core.publisher.Mono interface OrchestrateChain { fun join( - function: OrchestrateFunction, - rollback: RollbackFunction? = null, + orchestrate: Orchestrate, + rollback: Rollback? = null, ): DefaultOrchestrateChain fun joinReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>? = null, + orchestrate: Orchestrate>, + rollback: Rollback>? = null, + ): DefaultOrchestrateChain + + fun joinWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? = null, + ): DefaultOrchestrateChain + + fun joinReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? = null, ): DefaultOrchestrateChain fun commit( - function: OrchestrateFunction, - rollback: RollbackFunction? = null, + orchestrate: Orchestrate, + rollback: Rollback? = null, ): Orchestrator fun commitReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>? = null, + orchestrate: Orchestrate>, + rollback: Rollback>? = null, + ): Orchestrator + + fun commitWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? = null, + ): Orchestrator + + fun commitReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? = null, ): Orchestrator interface Pre { fun start( - function: OrchestrateFunction, - rollback: RollbackFunction? = null, + orchestrate: Orchestrate, + rollback: Rollback? = null, ): DefaultOrchestrateChain fun startReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>? = null, + orchestrate: Orchestrate>, + rollback: Rollback>? = null, ): DefaultOrchestrateChain + fun startWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? = null, + ): DefaultOrchestrateChain + + fun startReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? = null, + ): DefaultOrchestrateChain } } diff --git a/src/main/kotlin/org/rooftop/netx/api/RollbackFunction.kt b/src/main/kotlin/org/rooftop/netx/api/Rollback.kt similarity index 55% rename from src/main/kotlin/org/rooftop/netx/api/RollbackFunction.kt rename to src/main/kotlin/org/rooftop/netx/api/Rollback.kt index 6b8831c..088a204 100644 --- a/src/main/kotlin/org/rooftop/netx/api/RollbackFunction.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Rollback.kt @@ -1,6 +1,6 @@ package org.rooftop.netx.api -fun interface RollbackFunction { +fun interface Rollback { fun rollback(request: T): V } diff --git a/src/main/kotlin/org/rooftop/netx/api/TypeReference.kt b/src/main/kotlin/org/rooftop/netx/api/TypeReference.kt new file mode 100644 index 0000000..aecd8ed --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/api/TypeReference.kt @@ -0,0 +1,14 @@ +package org.rooftop.netx.api + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type + + +abstract class TypeReference() { + val type: Type + + init { + val superClass: Type = this.javaClass.genericSuperclass + type = (superClass as ParameterizedType).actualTypeArguments[0] + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt index f2739a2..26b70cd 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt @@ -1,6 +1,7 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.Codec +import org.rooftop.netx.api.Context import org.rooftop.netx.api.TransactionEvent import org.rooftop.netx.api.TransactionManager import reactor.core.publisher.Mono @@ -42,30 +43,39 @@ internal abstract class AbstractOrchestrateListener internal c castableType = type } - internal fun Mono.setNextCastableType(): Mono { - return this.doOnNext { - nextOrchestrateListener?.castableType = it::class - nextRollbackOrchestrateListener?.castableType = it::class + internal fun Mono>.setNextCastableType(): Mono> { + return this.doOnNext { (request, _) -> + nextOrchestrateListener?.castableType = request::class + nextRollbackOrchestrateListener?.castableType = request::class } } - protected fun Mono<*>.getHeldRequest(transactionEvent: TransactionEvent): Mono { - return this.flatMap { + protected fun Mono.getHeldRequest(transactionEvent: TransactionEvent): Mono> { + return this.flatMap { event -> requestHolder.getRequest( "${transactionEvent.transactionId}:$orchestrateSequence", getCastableType() - ) + ).map { it to event } } } - protected fun Mono.holdRequestIfRollbackable(transactionEvent: TransactionEvent): Mono { - return this.flatMap { request -> - if (!isRollbackable) { - Mono.just(request) - } - requestHolder.setRequest( - "${transactionEvent.transactionId}:$orchestrateSequence", - request + protected fun holdRequestIfRollbackable(request: T, transactionId: String): Mono { + if (!isRollbackable) { + Mono.just(request) + } + return requestHolder.setRequest( + "$transactionId:$orchestrateSequence", + request + ) + } + + protected fun Mono>.toOrchestrateEvent(): Mono { + return this.map { (response, context) -> + OrchestrateEvent( + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + clientEvent = codec.encode(response), + context = codec.encode(context.contexts), ) } } @@ -105,7 +115,12 @@ internal abstract class AbstractOrchestrateListener internal c orchestrateEvent: OrchestrateEvent, ) { val rollbackOrchestrateEvent = - OrchestrateEvent(orchestrateEvent.orchestratorId, rollbackSequence, "") + OrchestrateEvent( + orchestrateEvent.orchestratorId, + rollbackSequence, + "", + orchestrateEvent.context, + ) transactionManager.rollback( transactionId = transactionId, cause = throwable.message ?: throwable.localizedMessage, diff --git a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt index 6f1e118..6ea5df3 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt @@ -11,17 +11,55 @@ class DefaultOrchestrateChain private constru private val orchestrateListener: AbstractOrchestrateListener, private val rollbackOrchestrateListener: AbstractOrchestrateListener?, private val beforeDefaultOrchestrateChain: DefaultOrchestrateChain? = null, -): OrchestrateChain { +) : OrchestrateChain { private var nextDefaultOrchestrateChain: DefaultOrchestrateChain? = null override fun join( - function: OrchestrateFunction, - rollback: RollbackFunction?, + orchestrate: Orchestrate, + rollback: Rollback?, ): DefaultOrchestrateChain { - val nextJoinOrchestrateListener = getJoinOrchestrateListener(function) - val nextRollbackOrchestrateListener = getRollbackOrchestrateListener(rollback) + val nextJoinOrchestrateListener = + getJoinOrchestrateListener(CommandType.DEFAULT, orchestrate) + val nextRollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.DEFAULT, rollback) + return nextOrchestrateChain(nextJoinOrchestrateListener, nextRollbackOrchestrateListener) + } + + override fun joinWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? + ): DefaultOrchestrateChain { + val nextJoinOrchestrateListener = + getJoinOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val nextRollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) + + return nextOrchestrateChain(nextJoinOrchestrateListener, nextRollbackOrchestrateListener) + } + + private fun getJoinOrchestrateListener( + commandType: CommandType, + function: Any, + ) = JoinOrchestrateListener( + codec = chainContainer.codec, + transactionManager = chainContainer.transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + orchestrateCommand = OrchestrateCommand( + commandType, + chainContainer.codec, + function + ), + requestHolder = chainContainer.requestHolder, + resultHolder = chainContainer.resultHolder, + ) + + private fun nextOrchestrateChain( + nextJoinOrchestrateListener: JoinOrchestrateListener, + nextRollbackOrchestrateListener: RollbackOrchestrateListener? + ): DefaultOrchestrateChain { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( orchestratorId, orchestrateSequence + 1, @@ -35,24 +73,61 @@ class DefaultOrchestrateChain private constru return nextDefaultOrchestrateChain } - private fun getJoinOrchestrateListener(function: OrchestrateFunction) = - JoinOrchestrateListener( - codec = chainContainer.codec, - transactionManager = chainContainer.transactionManager, - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - orchestrateFunction = function, - requestHolder = chainContainer.requestHolder, - resultHolder = chainContainer.resultHolder, - ) - override fun joinReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>?, + orchestrate: Orchestrate>, + rollback: Rollback>?, + ): DefaultOrchestrateChain { + val nextJoinOrchestrateListener = + getMonoJoinOrchestrateListener(CommandType.DEFAULT, orchestrate) + val nextRollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.DEFAULT, rollback) + + return nextOrchestrateChain(nextJoinOrchestrateListener, nextRollbackOrchestrateListener) + } + + override fun joinReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? ): DefaultOrchestrateChain { - val nextJoinOrchestrateListener = getMonoJoinOrchestrateListener(function) - val nextRollbackOrchestrateListener = getMonoRollbackOrchestrateListener(rollback) + val nextJoinOrchestrateListener = + getMonoJoinOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val nextRollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) + + val nextDefaultOrchestrateChain = DefaultOrchestrateChain( + orchestratorId, + orchestrateSequence + 1, + chainContainer, + nextJoinOrchestrateListener, + nextRollbackOrchestrateListener, + this, + ) + this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain + return nextDefaultOrchestrateChain + } + + private fun getMonoJoinOrchestrateListener( + commandType: CommandType, + function: Any, + ) = MonoJoinOrchestrateListener( + codec = chainContainer.codec, + transactionManager = chainContainer.transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + monoOrchestrateCommand = MonoOrchestrateCommand( + commandType, + chainContainer.codec, + function + ), + requestHolder = chainContainer.requestHolder, + resultHolder = chainContainer.resultHolder, + ) + + private fun nextOrchestrateChain( + nextJoinOrchestrateListener: MonoJoinOrchestrateListener, + nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener? + ): DefaultOrchestrateChain { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( orchestratorId, orchestrateSequence + 1, @@ -66,24 +141,62 @@ class DefaultOrchestrateChain private constru return nextDefaultOrchestrateChain } - private fun getMonoJoinOrchestrateListener(function: OrchestrateFunction>) = - MonoJoinOrchestrateListener( + override fun commit( + orchestrate: Orchestrate, + rollback: Rollback?, + ): Orchestrator { + val nextCommitOrchestrateListener = + getCommitOrchestrateListener(CommandType.DEFAULT, orchestrate) + val nextRollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.DEFAULT, rollback) + + return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener) + } + + override fun commitWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? + ): Orchestrator { + val nextCommitOrchestrateListener = + getCommitOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val nextRollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) + + return createOrchestrator(nextCommitOrchestrateListener, nextRollbackOrchestrateListener) + } + + private fun getCommitOrchestrateListener( + commandType: CommandType, + function: Any, + ) = CommitOrchestrateListener( + codec = chainContainer.codec, + transactionManager = chainContainer.transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + orchestrateCommand = OrchestrateCommand(commandType, chainContainer.codec, function), + resultHolder = chainContainer.resultHolder, + requestHolder = chainContainer.requestHolder, + ) + + private fun getRollbackOrchestrateListener( + commandType: CommandType, + rollback: Any? + ) = rollback?.let { + RollbackOrchestrateListener( codec = chainContainer.codec, transactionManager = chainContainer.transactionManager, orchestratorId = orchestratorId, orchestrateSequence = orchestrateSequence + 1, - orchestrateFunction = function, + rollbackCommand = RollbackCommand(commandType, chainContainer.codec, it), requestHolder = chainContainer.requestHolder, resultHolder = chainContainer.resultHolder, ) + } - override fun commit( - function: OrchestrateFunction, - rollback: RollbackFunction?, + private fun createOrchestrator( + nextCommitOrchestrateListener: CommitOrchestrateListener, + nextRollbackOrchestrateListener: RollbackOrchestrateListener? ): Orchestrator { - val nextCommitOrchestrateListener = getCommitOrchestrateListener(function) - val nextRollbackOrchestrateListener = getRollbackOrchestrateListener(rollback) - return OrchestratorCache.cache(orchestratorId) { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( orchestratorId, @@ -107,37 +220,34 @@ class DefaultOrchestrateChain private constru } } - private fun getCommitOrchestrateListener(function: OrchestrateFunction) = - CommitOrchestrateListener( - codec = chainContainer.codec, - transactionManager = chainContainer.transactionManager, - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - orchestrateFunction = function, - resultHolder = chainContainer.resultHolder, - requestHolder = chainContainer.requestHolder, - ) + override fun commitReactive( + orchestrate: Orchestrate>, + rollback: Rollback>?, + ): Orchestrator { + val nextJoinOrchestrateListener = + getMonoCommitOrchestrateListener(CommandType.DEFAULT, orchestrate) + val nextRollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.DEFAULT, rollback) - private fun getRollbackOrchestrateListener(rollback: RollbackFunction?) = - rollback?.let { - RollbackOrchestrateListener( - codec = chainContainer.codec, - transactionManager = chainContainer.transactionManager, - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - rollbackFunction = it, - requestHolder = chainContainer.requestHolder, - resultHolder = chainContainer.resultHolder, - ) - } + return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener) + } - override fun commitReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>?, + override fun commitReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? ): Orchestrator { - val nextJoinOrchestrateListener = getMonoCommitOrchestrateListener(function) - val nextRollbackOrchestrateListener = getMonoRollbackOrchestrateListener(rollback) + val nextJoinOrchestrateListener = + getMonoCommitOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val nextRollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) + + return createOrchestrator(nextJoinOrchestrateListener, nextRollbackOrchestrateListener) + } + private fun createOrchestrator( + nextJoinOrchestrateListener: MonoCommitOrchestrateListener, + nextRollbackOrchestrateListener: MonoRollbackOrchestrateListener? + ): Orchestrator { return OrchestratorCache.cache(orchestratorId) { val nextDefaultOrchestrateChain = DefaultOrchestrateChain( orchestratorId, @@ -178,7 +288,8 @@ class DefaultOrchestrateChain private constru val orchestrateListeners = mutableListOf< Pair, AbstractOrchestrateListener?>>() - var defaultOrchestrateChainCursor: DefaultOrchestrateChain? = this + var defaultOrchestrateChainCursor: DefaultOrchestrateChain? = + this while (defaultOrchestrateChainCursor != null) { orchestrateListeners.add( defaultOrchestrateChainCursor.orchestrateListener @@ -187,7 +298,8 @@ class DefaultOrchestrateChain private constru if (defaultOrchestrateChainCursor.beforeDefaultOrchestrateChain == null) { break } - defaultOrchestrateChainCursor = defaultOrchestrateChainCursor.beforeDefaultOrchestrateChain + defaultOrchestrateChainCursor = + defaultOrchestrateChainCursor.beforeDefaultOrchestrateChain } orchestrateListeners.reverse() @@ -246,29 +358,41 @@ class DefaultOrchestrateChain private constru } } - private fun getMonoCommitOrchestrateListener(function: OrchestrateFunction>) = - MonoCommitOrchestrateListener( + private fun getMonoCommitOrchestrateListener( + commandType: CommandType, + function: Any, + ) = MonoCommitOrchestrateListener( + codec = chainContainer.codec, + transactionManager = chainContainer.transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = orchestrateSequence + 1, + monoOrchestrateCommand = MonoOrchestrateCommand( + commandType, + chainContainer.codec, + function + ), + resultHolder = chainContainer.resultHolder, + requestHolder = chainContainer.requestHolder, + ) + + private fun getMonoRollbackOrchestrateListener( + commandType: CommandType, + rollback: Any? + ) = rollback?.let { + MonoRollbackOrchestrateListener( codec = chainContainer.codec, transactionManager = chainContainer.transactionManager, orchestratorId = orchestratorId, orchestrateSequence = orchestrateSequence + 1, - orchestrateFunction = function, - resultHolder = chainContainer.resultHolder, + monoRollbackCommand = MonoRollbackCommand( + commandType, + chainContainer.codec, + it + ), requestHolder = chainContainer.requestHolder, + resultHolder = chainContainer.resultHolder, ) - - private fun getMonoRollbackOrchestrateListener(rollback: RollbackFunction>?) = - rollback?.let { - MonoRollbackOrchestrateListener( - codec = chainContainer.codec, - transactionManager = chainContainer.transactionManager, - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - rollbackFunction = it, - requestHolder = chainContainer.requestHolder, - resultHolder = chainContainer.resultHolder, - ) - } + } internal class Pre internal constructor( private val orchestratorId: String, @@ -277,14 +401,34 @@ class DefaultOrchestrateChain private constru private val codec: Codec, private val resultHolder: ResultHolder, private val requestHolder: RequestHolder, - ): OrchestrateChain.Pre { + ) : OrchestrateChain.Pre { override fun start( - function: OrchestrateFunction, - rollback: RollbackFunction?, + orchestrate: Orchestrate, + rollback: Rollback?, + ): DefaultOrchestrateChain { + val startOrchestrateListener = + getStartOrchestrateListener(CommandType.DEFAULT, orchestrate) + val rollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.DEFAULT, rollback) + + return DefaultOrchestrateChain( + orchestratorId = orchestratorId, + orchestrateSequence = 0, + chainContainer = getStreamContainer(), + orchestrateListener = startOrchestrateListener, + rollbackOrchestrateListener = rollbackOrchestrateListener, + ) + } + + override fun startWithContext( + contextOrchestrate: ContextOrchestrate, + contextRollback: ContextRollback? ): DefaultOrchestrateChain { - val startOrchestrateListener = getStartOrchestrateListener(function) - val rollbackOrchestrateListener = getRollbackOrchestrateListener(rollback) + val startOrchestrateListener = + getStartOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val rollbackOrchestrateListener = + getRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) return DefaultOrchestrateChain( orchestratorId = orchestratorId, @@ -295,36 +439,50 @@ class DefaultOrchestrateChain private constru ) } - private fun getStartOrchestrateListener(function: OrchestrateFunction) = - StartOrchestrateListener( + private fun getStartOrchestrateListener( + commandType: CommandType, + function: Any, + ) = StartOrchestrateListener( + codec = codec, + transactionManager = transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = 0, + orchestrateCommand = OrchestrateCommand( + commandType, + codec, + function, + ), + requestHolder = requestHolder, + resultHolder = resultHolder, + ) + + private fun getRollbackOrchestrateListener( + commandType: CommandType, + rollback: Any? + ) = rollback?.let { + RollbackOrchestrateListener( codec = codec, transactionManager = transactionManager, orchestratorId = orchestratorId, orchestrateSequence = 0, - orchestrateFunction = function, + rollbackCommand = RollbackCommand( + commandType, + codec, + it + ), requestHolder = requestHolder, resultHolder = resultHolder, ) - - private fun getRollbackOrchestrateListener(rollback: RollbackFunction?) = - rollback?.let { - RollbackOrchestrateListener( - codec = codec, - transactionManager = transactionManager, - orchestratorId = orchestratorId, - orchestrateSequence = 0, - rollbackFunction = it, - requestHolder = requestHolder, - resultHolder = resultHolder, - ) - } + } override fun startReactive( - function: OrchestrateFunction>, - rollback: RollbackFunction>?, + orchestrate: Orchestrate>, + rollback: Rollback>?, ): DefaultOrchestrateChain { - val startOrchestrateListener = getMonoStartOrchestrateListener(function) - val rollbackOrchestrateListener = getMonoRollbackOrchestrateListener(rollback) + val startOrchestrateListener = + getMonoStartOrchestrateListener(CommandType.DEFAULT, orchestrate) + val rollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.DEFAULT, rollback) return DefaultOrchestrateChain( orchestratorId = orchestratorId, @@ -335,25 +493,56 @@ class DefaultOrchestrateChain private constru ) } - private fun getMonoStartOrchestrateListener(function: OrchestrateFunction>) = - MonoStartOrchestrateListener( - codec = codec, - transactionManager = transactionManager, + override fun startReactiveWithContext( + contextOrchestrate: ContextOrchestrate>, + contextRollback: ContextRollback>? + ): DefaultOrchestrateChain { + val startOrchestrateListener = + getMonoStartOrchestrateListener(CommandType.CONTEXT, contextOrchestrate) + val rollbackOrchestrateListener = + getMonoRollbackOrchestrateListener(CommandType.CONTEXT, contextRollback) + + return DefaultOrchestrateChain( orchestratorId = orchestratorId, orchestrateSequence = 0, - orchestrateFunction = function, - requestHolder = requestHolder, - resultHolder = resultHolder, + chainContainer = getStreamContainer(), + orchestrateListener = startOrchestrateListener, + rollbackOrchestrateListener = rollbackOrchestrateListener, ) + } + + private fun getMonoStartOrchestrateListener( + commandType: CommandType, + function: Any, + ) = MonoStartOrchestrateListener( + codec = codec, + transactionManager = transactionManager, + orchestratorId = orchestratorId, + orchestrateSequence = 0, + monoOrchestrateCommand = MonoOrchestrateCommand( + commandType, + codec, + function, + ), + requestHolder = requestHolder, + resultHolder = resultHolder, + ) - private fun getMonoRollbackOrchestrateListener(rollback: RollbackFunction>?) = + private fun getMonoRollbackOrchestrateListener( + commandType: CommandType, + rollback: Any? + ) = rollback?.let { MonoRollbackOrchestrateListener( codec = codec, transactionManager = transactionManager, orchestratorId = orchestratorId, orchestrateSequence = 0, - rollbackFunction = it, + monoRollbackCommand = MonoRollbackCommand( + commandType, + codec, + it, + ), requestHolder = requestHolder, resultHolder = resultHolder, ) diff --git a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt index 603b3ac..41c28bb 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.rooftop.netx.api.Codec import org.rooftop.netx.api.DecodeException import org.rooftop.netx.api.EncodeException +import org.rooftop.netx.api.TypeReference import kotlin.reflect.KClass class JsonCodec( @@ -24,4 +25,13 @@ class JsonCodec( throw DecodeException("Cannot decode \"$data\" to \"${type}\"", it) } } + + override fun decode(data: String, type: TypeReference): T { + return runCatching { + val javaType = objectMapper.typeFactory.constructType(type.type) + objectMapper.readValue(data, javaType) + }.getOrElse { + throw DecodeException("Cannot decode \"$data\" to \"${type.type}\"", it) + } + } } diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateEvent.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateEvent.kt index 7c5833d..b86e492 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateEvent.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateEvent.kt @@ -4,4 +4,5 @@ internal data class OrchestrateEvent( val orchestratorId: String, val orchestrateSequence: Int = 0, val clientEvent: String, + val context: String, ) diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt index 37b4f80..e018e43 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt @@ -37,6 +37,7 @@ class OrchestratorManager internal constructor( OrchestrateEvent( orchestratorId = orchestratorId, clientEvent = codec.encode(request), + context = codec.encode(mutableMapOf()) ) } .flatMap { transactionManager.start(UNDO, it) } diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommandType.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommandType.kt new file mode 100644 index 0000000..817af10 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommandType.kt @@ -0,0 +1,6 @@ +package org.rooftop.netx.engine.listen + +enum class CommandType { + DEFAULT, + CONTEXT, +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt index ceea3da..e297662 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt @@ -1,6 +1,9 @@ package org.rooftop.netx.engine.listen -import org.rooftop.netx.api.* +import org.rooftop.netx.api.Codec +import org.rooftop.netx.api.TransactionCommitEvent +import org.rooftop.netx.api.TransactionCommitListener +import org.rooftop.netx.api.TransactionManager import org.rooftop.netx.engine.AbstractOrchestrateListener import org.rooftop.netx.engine.OrchestrateEvent import org.rooftop.netx.engine.RequestHolder @@ -12,7 +15,7 @@ internal class CommitOrchestrateListener internal constructor( transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction, + private val orchestrateCommand: OrchestrateCommand, private val resultHolder: ResultHolder, requestHolder: RequestHolder, ) : AbstractOrchestrateListener( @@ -30,13 +33,18 @@ internal class CommitOrchestrateListener internal constructor( .map { it.decodeEvent(OrchestrateEvent::class) } .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionCommitEvent) - .map { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionCommitEvent.transactionId) + .map{ it to event } + } + .map { (request, event) -> + orchestrateCommand.command(request, event.context) + } + .flatMap { (response, _) -> + resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response) } - .flatMap { resultHolder.setSuccessResult(transactionCommitEvent.transactionId, it) } .onErrorRollback( transactionCommitEvent.transactionId, transactionCommitEvent.decodeEvent(OrchestrateEvent::class) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt index c56916d..1e25660 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt @@ -12,7 +12,7 @@ internal class JoinOrchestrateListener( private val transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction, + private val orchestrateCommand: OrchestrateCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -32,24 +32,21 @@ internal class JoinOrchestrateListener( && it.orchestratorId == orchestratorId } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionJoinEvent) - .map { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionJoinEvent.transactionId) + .map{ it to event } + } + .map { (request, event) -> + orchestrateCommand.command(request, event.context) } .setNextCastableType() .onErrorRollback( transactionJoinEvent.transactionId, transactionJoinEvent.decodeEvent(OrchestrateEvent::class) ) - .map { response -> - OrchestrateEvent( - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - clientEvent = codec.encode(response), - ) - } + .toOrchestrateEvent() .flatMap { if (isLast) { return@flatMap transactionManager.commit( diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt index be0522a..6884162 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt @@ -1,6 +1,9 @@ package org.rooftop.netx.engine.listen -import org.rooftop.netx.api.* +import org.rooftop.netx.api.Codec +import org.rooftop.netx.api.TransactionCommitEvent +import org.rooftop.netx.api.TransactionCommitListener +import org.rooftop.netx.api.TransactionManager import org.rooftop.netx.engine.AbstractOrchestrateListener import org.rooftop.netx.engine.OrchestrateEvent import org.rooftop.netx.engine.RequestHolder @@ -12,7 +15,7 @@ internal class MonoCommitOrchestrateListener internal construc transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction>, + private val monoOrchestrateCommand: MonoOrchestrateCommand, requestHolder: RequestHolder, private val resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -29,13 +32,18 @@ internal class MonoCommitOrchestrateListener internal construc .map { it.decodeEvent(OrchestrateEvent::class) } .filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionCommitEvent) - .flatMap { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionCommitEvent.transactionId) + .map{ it to event } + } + .flatMap { (request, event) -> + monoOrchestrateCommand.command(request, event.context) + } + .flatMap { (response, _) -> + resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response) } - .flatMap { resultHolder.setSuccessResult(transactionCommitEvent.transactionId, it) } .onErrorRollback( transactionCommitEvent.transactionId, transactionCommitEvent.decodeEvent(OrchestrateEvent::class) diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt index 11671ed..140e193 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt @@ -12,7 +12,7 @@ internal class MonoJoinOrchestrateListener( private val transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction>, + private val monoOrchestrateCommand: MonoOrchestrateCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -32,24 +32,21 @@ internal class MonoJoinOrchestrateListener( && it.orchestratorId == orchestratorId } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionJoinEvent) - .flatMap { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionJoinEvent.transactionId) + .map{ it to event } + } + .flatMap { (request, event) -> + monoOrchestrateCommand.command(request, event.context) } .setNextCastableType() .onErrorRollback( transactionJoinEvent.transactionId, transactionJoinEvent.decodeEvent(OrchestrateEvent::class) ) - .map { response -> - OrchestrateEvent( - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - clientEvent = codec.encode(response), - ) - } + .toOrchestrateEvent() .flatMap { if (isLast) { return@flatMap transactionManager.commit( diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoOrchestrateCommand.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoOrchestrateCommand.kt new file mode 100644 index 0000000..ef457af --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoOrchestrateCommand.kt @@ -0,0 +1,32 @@ +package org.rooftop.netx.engine.listen + +import org.rooftop.netx.api.* +import reactor.core.publisher.Mono + +class MonoOrchestrateCommand( + private val commandType: CommandType = CommandType.DEFAULT, + private val codec: Codec, + private val command: Any +) { + + @Suppress("UNCHECKED_CAST") + fun command( + request: T, + contextData: String, + ): Mono> { + val context = Context( + codec, + codec.decode(contextData, object : TypeReference>() {}) + ) + return when (commandType) { + CommandType.DEFAULT -> (command as Orchestrate>).orchestrate(request) + + CommandType.CONTEXT -> (command as ContextOrchestrate>).orchestrate( + context, + request, + ) + }.map { + it to context + } + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackCommand.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackCommand.kt new file mode 100644 index 0000000..c37985f --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackCommand.kt @@ -0,0 +1,31 @@ +package org.rooftop.netx.engine.listen + +import org.rooftop.netx.api.* +import reactor.core.publisher.Mono + +class MonoRollbackCommand( + private val commandType: CommandType = CommandType.DEFAULT, + private val codec: Codec, + private val command: Any +) { + + @Suppress("UNCHECKED_CAST") + fun command( + request: T, + contextData: String, + ): Mono> { + val context = Context( + codec, + codec.decode(contextData, object : TypeReference>() {}) + ) + return when (commandType) { + CommandType.DEFAULT -> (command as Rollback>).rollback(request) + + CommandType.CONTEXT -> (command as ContextRollback>).rollback( + context, + request, + ) + }.map { it to context } + .switchIfEmpty(Mono.just("ROLLBACK SUCCESS" to context)) + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt index 80875b5..768761c 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt @@ -6,14 +6,13 @@ import org.rooftop.netx.engine.OrchestrateEvent import org.rooftop.netx.engine.RequestHolder import org.rooftop.netx.engine.ResultHolder import reactor.core.publisher.Mono -import reactor.core.scheduler.Schedulers internal class MonoRollbackOrchestrateListener( - codec: Codec, + private val codec: Codec, private val orchestratorId: String, orchestrateSequence: Int, private val transactionManager: TransactionManager, - private val rollbackFunction: RollbackFunction>, + private val monoRollbackCommand: MonoRollbackCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -31,31 +30,27 @@ internal class MonoRollbackOrchestrateListener( .map { it.decodeEvent(OrchestrateEvent::class) } .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } .getHeldRequest(transactionRollbackEvent) - .flatMap { request -> - rollbackFunction.rollback(request) + .flatMap { (request, event) -> + monoRollbackCommand.command(request, event.context) } - .switchIfEmpty(Mono.just("SUCCESS ROLLBACK")) - .map { } .cascadeRollback(transactionRollbackEvent) } - private fun Mono.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono { - return this.doOnSuccess { - val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class) - if (!isFirst && orchestrateEvent.orchestratorId == orchestratorId - && orchestrateEvent.orchestrateSequence == orchestrateSequence - ) { + private fun Mono>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono { + return this.filter { !isFirst } + .flatMap { (_, context) -> + val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class) val nextOrchestrateEvent = OrchestrateEvent( orchestrateEvent.orchestratorId, beforeRollbackOrchestrateSequence, - orchestrateEvent.clientEvent + orchestrateEvent.clientEvent, + codec.encode(context.contexts), ) transactionManager.rollback( transactionRollbackEvent.transactionId, transactionRollbackEvent.cause, nextOrchestrateEvent - ).subscribeOn(Schedulers.parallel()).subscribe() - } - } + ) + }.map { } } } diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt index fc55746..a550774 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt @@ -12,7 +12,7 @@ internal class MonoStartOrchestrateListener( private val transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction>, + private val monoOrchestrateCommand: MonoOrchestrateCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -29,24 +29,21 @@ internal class MonoStartOrchestrateListener( return transactionStartEvent.toOrchestrateEvent() .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionStartEvent) - .flatMap { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionStartEvent.transactionId) + .map{ it to event } + } + .flatMap { (request, event) -> + monoOrchestrateCommand.command(request, event.context) } .setNextCastableType() .onErrorRollback( transactionStartEvent.transactionId, transactionStartEvent.decodeEvent(OrchestrateEvent::class) ) - .map { response -> - OrchestrateEvent( - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - clientEvent = codec.encode(response), - ) - } + .toOrchestrateEvent() .flatMap { if (isLast) { return@flatMap transactionManager.commit( diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/OrchestrateCommand.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/OrchestrateCommand.kt new file mode 100644 index 0000000..f9f4909 --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/OrchestrateCommand.kt @@ -0,0 +1,29 @@ +package org.rooftop.netx.engine.listen + +import org.rooftop.netx.api.* + +internal class OrchestrateCommand( + private val commandType: CommandType = CommandType.DEFAULT, + private val codec: Codec, + private val command: Any +) { + + @Suppress("UNCHECKED_CAST") + fun command( + request: T, + contextData: String, + ): Pair { + val context = Context( + codec, + codec.decode(contextData, object : TypeReference>() {}) + ) + return when (commandType) { + CommandType.DEFAULT -> (command as Orchestrate).orchestrate(request) + + CommandType.CONTEXT -> (command as ContextOrchestrate).orchestrate( + context, + request, + ) + } to context + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackCommand.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackCommand.kt new file mode 100644 index 0000000..a919a7b --- /dev/null +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackCommand.kt @@ -0,0 +1,29 @@ +package org.rooftop.netx.engine.listen + +import org.rooftop.netx.api.* + +class RollbackCommand( + private val commandType: CommandType = CommandType.DEFAULT, + private val codec: Codec, + private val command: Any +) { + + @Suppress("UNCHECKED_CAST") + fun command( + request: T, + contextData: String, + ): Pair { + val context = Context( + codec, + codec.decode(contextData, object : TypeReference>() {}) + ) + return when (commandType) { + CommandType.DEFAULT -> (command as Rollback).rollback(request) + + CommandType.CONTEXT -> (command as ContextRollback).rollback( + context, + request, + ) + } to context + } +} diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt index 4f178bb..d9b11fa 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt @@ -6,14 +6,13 @@ import org.rooftop.netx.engine.OrchestrateEvent import org.rooftop.netx.engine.RequestHolder import org.rooftop.netx.engine.ResultHolder import reactor.core.publisher.Mono -import reactor.core.scheduler.Schedulers internal class RollbackOrchestrateListener( - codec: Codec, + private val codec: Codec, private val orchestratorId: String, orchestrateSequence: Int, private val transactionManager: TransactionManager, - private val rollbackFunction: RollbackFunction, + private val rollbackCommand: RollbackCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -30,31 +29,33 @@ internal class RollbackOrchestrateListener( return transactionRollbackEvent.toOrchestrateEvent() .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } .getHeldRequest(transactionRollbackEvent) - .map { request -> - rollbackFunction.rollback(request) + .map { (request, event) -> + rollbackCommand.command(request, event.context) + } + .map { + if (it.first == null) { + return@map "ROLLBACK SUCCESS" to it.second + } + it } - .switchIfEmpty(Mono.just("SUCCESS ROLLBACK")) - .map { } .cascadeRollback(transactionRollbackEvent) } - private fun Mono.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono { - return this.doOnSuccess { - val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class) - if (!isFirst && orchestrateEvent.orchestratorId == orchestratorId - && orchestrateEvent.orchestrateSequence == orchestrateSequence - ) { + private fun Mono>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono { + return this.filter { !isFirst } + .flatMap { (_, context) -> + val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class) val nextOrchestrateEvent = OrchestrateEvent( orchestrateEvent.orchestratorId, beforeRollbackOrchestrateSequence, - orchestrateEvent.clientEvent + orchestrateEvent.clientEvent, + codec.encode(context.contexts), ) transactionManager.rollback( transactionRollbackEvent.transactionId, transactionRollbackEvent.cause, nextOrchestrateEvent - ).subscribeOn(Schedulers.parallel()).subscribe() - } - } + ) + }.map { } } } diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt index 6d91a63..8a16e1b 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt @@ -12,7 +12,7 @@ internal class StartOrchestrateListener( private val transactionManager: TransactionManager, private val orchestratorId: String, orchestrateSequence: Int, - private val orchestrateFunction: OrchestrateFunction, + private val orchestrateCommand: OrchestrateCommand, requestHolder: RequestHolder, resultHolder: ResultHolder, ) : AbstractOrchestrateListener( @@ -29,24 +29,21 @@ internal class StartOrchestrateListener( return transactionStartEvent.toOrchestrateEvent() .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence } .map { event -> - codec.decode(event.clientEvent, getCastableType()) + codec.decode(event.clientEvent, getCastableType()) to event } - .holdRequestIfRollbackable(transactionStartEvent) - .map { request -> - orchestrateFunction.orchestrate(request) + .flatMap { (request, event) -> + holdRequestIfRollbackable(request, transactionStartEvent.transactionId) + .map { it to event } + } + .map { (request, event) -> + orchestrateCommand.command(request, event.context) } .setNextCastableType() .onErrorRollback( transactionStartEvent.transactionId, transactionStartEvent.decodeEvent(OrchestrateEvent::class) ) - .map { response -> - OrchestrateEvent( - orchestratorId = orchestratorId, - orchestrateSequence = orchestrateSequence + 1, - clientEvent = codec.encode(response), - ) - } + .toOrchestrateEvent() .flatMap { if (isLast) { return@flatMap transactionManager.commit( diff --git a/src/test/kotlin/org/rooftop/netx/client/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/client/OrchestratorConfigurer.kt index 0d47be7..53cac1b 100644 --- a/src/test/kotlin/org/rooftop/netx/client/OrchestratorConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/client/OrchestratorConfigurer.kt @@ -1,6 +1,6 @@ package org.rooftop.netx.client -import org.rooftop.netx.api.OrchestrateFunction +import org.rooftop.netx.api.Orchestrate import org.rooftop.netx.api.Orchestrator import org.rooftop.netx.engine.OrchestratorFactory import org.springframework.context.annotation.Bean @@ -18,12 +18,12 @@ class OrchestratorConfigurer( .commit(IntOrchestrator) } - object IntOrchestrator : OrchestrateFunction { + object IntOrchestrator : Orchestrate { override fun orchestrate(request: Int): Int = request + 1 } - object MonoIntOrchestrator : OrchestrateFunction> { + object MonoIntOrchestrator : Orchestrate> { override fun orchestrate(request: Int): Mono = Mono.fromCallable { request + 1 } } diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt index 8afff3e..2652354 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt @@ -1,6 +1,8 @@ package org.rooftop.netx.engine import org.rooftop.netx.api.Orchestrator +import org.rooftop.netx.engine.OrchestratorTest.Companion.contextResult +import org.rooftop.netx.engine.OrchestratorTest.Companion.monoRollbackResult import org.rooftop.netx.engine.OrchestratorTest.Companion.rollbackOrchestratorResult import org.rooftop.netx.engine.OrchestratorTest.Companion.upChainResult import org.springframework.context.annotation.Bean @@ -16,13 +18,13 @@ class OrchestratorConfigurer( @Bean(name = ["numberOrchestrator"]) fun numberOrchestrator(): Orchestrator { return orchestratorFactory.create("numberOrchestrator") - .start(function = { it + 1 }) - .join(function = { it + 1 }) - .joinReactive(function = { Mono.just(it + 1) }) - .commit(function = { it + 1 }) + .start(orchestrate = { it + 1 }) + .join(orchestrate = { it + 1 }) + .joinReactive(orchestrate = { Mono.just(it + 1) }) + .commit(orchestrate = { it + 1 }) } - @Bean(name = ["homeOrchestartor"]) + @Bean(name = ["homeOrchestrator"]) fun homeOrchestrator(): Orchestrator { return orchestratorFactory.create("homeOrchestrator") .startReactive({ home -> @@ -64,7 +66,7 @@ class OrchestratorConfigurer( fun rollbackOrchestrator(): Orchestrator { return orchestratorFactory.create("rollbackOrchestrator") .start( - function = { + orchestrate = { rollbackOrchestratorResult.add("1") }, rollback = { @@ -72,18 +74,18 @@ class OrchestratorConfigurer( } ) .join( - function = { + orchestrate = { rollbackOrchestratorResult.add("2") } ) .join( - function = { + orchestrate = { rollbackOrchestratorResult.add("3") }, rollback = { rollbackOrchestratorResult.add("-3") } ) .commit( - function = { + orchestrate = { rollbackOrchestratorResult.add("4") throw IllegalArgumentException("Rollback") }, @@ -104,4 +106,72 @@ class OrchestratorConfigurer( throw IllegalArgumentException("Rollback for test") }) } + + @Bean(name = ["monoRollbackOrchestrator"]) + fun monoRollbackOrchestrator(): Orchestrator { + return orchestratorFactory.create("monoRollbackOrchestrator") + .startReactive( + { Mono.fromCallable { monoRollbackResult.add("1") } }, + { Mono.fromCallable { monoRollbackResult.add("-1") } } + ) + .joinReactive({ Mono.fromCallable { monoRollbackResult.add("2") } }) + .joinReactive( + { Mono.fromCallable { monoRollbackResult.add("3") } }, + { Mono.fromCallable { monoRollbackResult.add("-3") } } + ) + .commitReactive({ + Mono.fromCallable { + monoRollbackResult.add("4") + throw IllegalArgumentException("Rollback for test") + } + }) + } + + @Bean(name = ["contextOrchestrator"]) + fun contextOrchestrator(): Orchestrator { + return orchestratorFactory.create("contextOrchestrator") + .startWithContext( + contextOrchestrate = { context, request -> + context.set("start-1", request) + "1" + }, + contextRollback = { context, request -> + val start1 = context.decodeContext("start-1", String::class) + val join2 = context.decodeContext("join-2", String::class) + val join3 = context.decodeContext("join-3", String::class) + val rCommit4 = context.decodeContext("r-commit-4", String::class) + val rJoin3 = context.decodeContext("r-join-3", String::class) + + contextResult.addAll(listOf(start1, join2, join3, rCommit4, rJoin3)) + } + ) + .joinWithContext( + contextOrchestrate = { context, request -> + context.set("join-2", request) + "2" + } + ) + .joinReactiveWithContext( + contextOrchestrate = { context, request -> + Mono.fromCallable { + context.set("join-3", request) + "3" + } + }, + contextRollback = { context, request -> + Mono.fromCallable { + context.set("r-join-3", "r$request") + } + } + ) + .commitWithContext( + contextOrchestrate = { context, request -> + context.set("commit-4", request) + throw IllegalArgumentException("Rollback") + }, + contextRollback = { context, request -> + context.set("r-commit-4", "r$request") + } + ) + } } diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorFactoryTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorFactoryTest.kt index cb60139..3655d2d 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorFactoryTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorFactoryTest.kt @@ -65,7 +65,7 @@ internal class OrchestratorFactoryTest( }) { companion object { - + private fun OrchestratorFactory.createIntOrchestrator(orchestratorId: String): Orchestrator { return this.create(orchestratorId) .start({ it + 1 }) diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt index 67ba4eb..09f8912 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt @@ -6,6 +6,7 @@ import io.kotest.core.annotation.DisplayName import io.kotest.core.spec.style.DescribeSpec import io.kotest.matchers.equality.shouldBeEqualToComparingFields import io.kotest.matchers.equals.shouldBeEqual +import org.rooftop.netx.api.Orchestrate import org.rooftop.netx.api.Orchestrator import org.rooftop.netx.meta.EnableDistributedTransaction import org.rooftop.netx.redis.RedisContainer @@ -31,6 +32,8 @@ class OrchestratorTest( private val manyTypeOrchestrator: Orchestrator, @Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator, @Qualifier("upChainRollbackOrchestrator") private val upChainRollbackOrchestrator: Orchestrator, + @Qualifier("monoRollbackOrchestrator") private val monoRollbackOrchestrator: Orchestrator, + @Qualifier("contextOrchestrator") private val contextOrchestrator: Orchestrator, ) : DescribeSpec({ describe("numberOrchestrator 구현채는") { @@ -121,6 +124,43 @@ class OrchestratorTest( } } } + + describe("monoRollbackOrchestrator 구현채는") { + context("transaction 메소드가 호출되면,") { + val expected = listOf("1", "2", "3", "4", "-3", "-1") + + it("실패한 부분부터 위로 거슬러 올라가며 롤백한다.") { + val result = monoRollbackOrchestrator.transactionSync("") + + result.isSuccess shouldBeEqual false + shouldThrowWithMessage("Rollback for test") { + result.throwError() + } + eventually(5.seconds) { + monoRollbackResult shouldBeEqual expected + } + } + } + } + + describe("contextOrchestrator 구현채는") { + context("transaction 메소드가 호출되면,") { + val expected = listOf("0", "1", "2", "r3", "r2") + + it("context 에서 아이템을 교환하며 Saga를 진행한다.") { + val result = contextOrchestrator.transactionSync("0") + + result.isSuccess shouldBeEqual false + shouldThrowWithMessage("Rollback") { + result.throwError() + } + eventually(5.seconds) { + contextResult shouldBeEqual expected + } + } + } + } + }) { data class Home( val address: String, @@ -140,5 +180,7 @@ class OrchestratorTest( companion object { val rollbackOrchestratorResult = mutableListOf() val upChainResult = mutableListOf() + val monoRollbackResult = mutableListOf() + val contextResult = mutableListOf() } }