Skip to content

Commit

Permalink
feat: Bind values to context in Orchestrator (#106)
Browse files Browse the repository at this point in the history
* refactor: Orchestrate와 Rollback 인터페이스에 Function을 제거한다

* feat: Context binding in orchestrator

* docs: Netx version 0.3.3 to 0.3.4
devxb authored Mar 23, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 0eccf9d commit 87ff5c1
Showing 31 changed files with 789 additions and 252 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

<br>

![version 0.3.3](https://img.shields.io/badge/version-0.3.3-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.3.4](https://img.shields.io/badge/version-0.3.4-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Redis-Stream을 지원하는 Saga frame work 입니다.
@@ -88,7 +88,7 @@ class OrchestratorConfigurer(
fun orderOrchestartor(): Orchestrator<Order, OrderResponse> { // <First Request, Last Response>
return orchestratorFactory.create("orderOrchestrator")
.start(
function = { order -> // its order type
orchestrate = { order -> // its order type
// Start Transaction with your bussiness logic
// something like ... "Check valid seller"
return@start user
@@ -98,13 +98,19 @@ class OrchestratorConfigurer(
}
)
.joinReactive(
function = { user -> // Before operations response type "User" flow here
orchestrate = { user -> // Before operations response type "User" flow here
// Webflux supports, should return Mono type.
},
// Can skip rollback operation, if you want
)
.joinWithContext(
contextOrchestrate = { context, request ->
context.set("key", request) // save data on context
context.decode("foo", Foo::class) // The context set in the upstream chain can be retrieved.
},
)
.commit(
function = { request ->
orchestrate = { request ->
// When an error occurs, all rollbacks are called from the bottom up,
// starting from the location where the error occurred.
throw IllegalArgumentException("Oops! Something went wrong..")
2 changes: 2 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/Codec.kt
Original file line number Diff line number Diff line change
@@ -7,4 +7,6 @@ interface Codec {
fun <T : Any> encode(data: T): String

fun <T : Any> decode(data: String, type: KClass<T>): T

fun <T : Any> decode(data: String, type: TypeReference<T>): T
}
19 changes: 19 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/Context.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

data class Context internal constructor(
private val codec: Codec,
internal val contexts: MutableMap<String, String>,
) {

fun <T : Any> set(key: String, value: T) {
contexts[key] = codec.encode(value)
}

fun <T : Any> decodeContext(key: String, type: Class<T>): T = decodeContext(key, type.kotlin)

fun <T : Any> decodeContext(key: String, type: KClass<T>): T = contexts[key]?.let {
codec.decode(it, type)
} ?: throw NullPointerException("Cannot find context by key \"$key\"")
}
6 changes: 6 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/ContextOrchestrate.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.rooftop.netx.api

fun interface ContextOrchestrate<T : Any, V : Any> {

fun orchestrate(context: Context, request: T): V
}
6 changes: 6 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/ContextRollback.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.rooftop.netx.api

fun interface ContextRollback<T : Any, V : Any?> {

fun rollback(context: Context, request: T): V
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.rooftop.netx.api

fun interface OrchestrateFunction<T : Any, V : Any> {
fun interface Orchestrate<T : Any, V : Any> {

fun orchestrate(request: T): V
}
53 changes: 41 additions & 12 deletions src/main/kotlin/org/rooftop/netx/api/OrchestrateChain.kt
Original file line number Diff line number Diff line change
@@ -6,35 +6,64 @@ import reactor.core.publisher.Mono
interface OrchestrateChain<OriginReq : Any, T : Any, V : Any> {

fun <S : Any> join(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> joinReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> joinWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> joinReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>? = null,
): DefaultOrchestrateChain<OriginReq, V, S>

fun <S : Any> commit(
function: OrchestrateFunction<V, S>,
rollback: RollbackFunction<V, *>? = null,
orchestrate: Orchestrate<V, S>,
rollback: Rollback<V, *>? = null,
): Orchestrator<OriginReq, S>

fun <S : Any> commitReactive(
function: OrchestrateFunction<V, Mono<S>>,
rollback: RollbackFunction<V, Mono<*>>? = null,
orchestrate: Orchestrate<V, Mono<S>>,
rollback: Rollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

fun <S : Any> commitWithContext(
contextOrchestrate: ContextOrchestrate<V, S>,
contextRollback: ContextRollback<V, *>? = null,
): Orchestrator<OriginReq, S>

fun <S : Any> commitReactiveWithContext(
contextOrchestrate: ContextOrchestrate<V, Mono<S>>,
contextRollback: ContextRollback<V, Mono<*>>? = null,
): Orchestrator<OriginReq, S>

interface Pre<T : Any> {
fun <V : Any> start(
function: OrchestrateFunction<T, V>,
rollback: RollbackFunction<T, *>? = null,
orchestrate: Orchestrate<T, V>,
rollback: Rollback<T, *>? = null,
): DefaultOrchestrateChain<T, T, V>

fun <V : Any> startReactive(
function: OrchestrateFunction<T, Mono<V>>,
rollback: RollbackFunction<T, Mono<*>>? = null,
orchestrate: Orchestrate<T, Mono<V>>,
rollback: Rollback<T, Mono<*>>? = null,
): DefaultOrchestrateChain<T, T, V>

fun <V : Any> startWithContext(
contextOrchestrate: ContextOrchestrate<T, V>,
contextRollback: ContextRollback<T, *>? = null,
): DefaultOrchestrateChain<T, T, V>

fun <V : Any> startReactiveWithContext(
contextOrchestrate: ContextOrchestrate<T, Mono<V>>,
contextRollback: ContextRollback<T, Mono<*>>? = null,
): DefaultOrchestrateChain<T, T, V>
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.rooftop.netx.api

fun interface RollbackFunction<T : Any, V : Any?> {
fun interface Rollback<T : Any, V : Any?> {

fun rollback(request: T): V
}
14 changes: 14 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/TypeReference.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.rooftop.netx.api

import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type


abstract class TypeReference<T : Any>() {
val type: Type

init {
val superClass: Type = this.javaClass.genericSuperclass
type = (superClass as ParameterizedType).actualTypeArguments[0]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.Context
import org.rooftop.netx.api.TransactionEvent
import org.rooftop.netx.api.TransactionManager
import reactor.core.publisher.Mono
@@ -42,30 +43,39 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
castableType = type
}

internal fun Mono<V>.setNextCastableType(): Mono<V> {
return this.doOnNext {
nextOrchestrateListener?.castableType = it::class
nextRollbackOrchestrateListener?.castableType = it::class
internal fun Mono<Pair<V, Context>>.setNextCastableType(): Mono<Pair<V, Context>> {
return this.doOnNext { (request, _) ->
nextOrchestrateListener?.castableType = request::class
nextRollbackOrchestrateListener?.castableType = request::class
}
}

protected fun Mono<*>.getHeldRequest(transactionEvent: TransactionEvent): Mono<T> {
return this.flatMap {
protected fun Mono<OrchestrateEvent>.getHeldRequest(transactionEvent: TransactionEvent): Mono<Pair<T, OrchestrateEvent>> {
return this.flatMap { event ->
requestHolder.getRequest(
"${transactionEvent.transactionId}:$orchestrateSequence",
getCastableType()
)
).map { it to event }
}
}

protected fun Mono<T>.holdRequestIfRollbackable(transactionEvent: TransactionEvent): Mono<T> {
return this.flatMap { request ->
if (!isRollbackable) {
Mono.just(request)
}
requestHolder.setRequest(
"${transactionEvent.transactionId}:$orchestrateSequence",
request
protected fun holdRequestIfRollbackable(request: T, transactionId: String): Mono<T> {
if (!isRollbackable) {
Mono.just(request)
}
return requestHolder.setRequest(
"$transactionId:$orchestrateSequence",
request
)
}

protected fun Mono<Pair<V, Context>>.toOrchestrateEvent(): Mono<OrchestrateEvent> {
return this.map { (response, context) ->
OrchestrateEvent(
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
clientEvent = codec.encode(response),
context = codec.encode(context.contexts),
)
}
}
@@ -105,7 +115,12 @@ internal abstract class AbstractOrchestrateListener<T : Any, V : Any> internal c
orchestrateEvent: OrchestrateEvent,
) {
val rollbackOrchestrateEvent =
OrchestrateEvent(orchestrateEvent.orchestratorId, rollbackSequence, "")
OrchestrateEvent(
orchestrateEvent.orchestratorId,
rollbackSequence,
"",
orchestrateEvent.context,
)
transactionManager.rollback(
transactionId = transactionId,
cause = throwable.message ?: throwable.localizedMessage,
407 changes: 298 additions & 109 deletions src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.DecodeException
import org.rooftop.netx.api.EncodeException
import org.rooftop.netx.api.TypeReference
import kotlin.reflect.KClass

class JsonCodec(
@@ -24,4 +25,13 @@ class JsonCodec(
throw DecodeException("Cannot decode \"$data\" to \"${type}\"", it)
}
}

override fun <T : Any> decode(data: String, type: TypeReference<T>): T {
return runCatching {
val javaType = objectMapper.typeFactory.constructType(type.type)
objectMapper.readValue<T>(data, javaType)
}.getOrElse {
throw DecodeException("Cannot decode \"$data\" to \"${type.type}\"", it)
}
}
}
Original file line number Diff line number Diff line change
@@ -4,4 +4,5 @@ internal data class OrchestrateEvent(
val orchestratorId: String,
val orchestrateSequence: Int = 0,
val clientEvent: String,
val context: String,
)
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ class OrchestratorManager<T : Any, V : Any> internal constructor(
OrchestrateEvent(
orchestratorId = orchestratorId,
clientEvent = codec.encode(request),
context = codec.encode(mutableMapOf<String, String>())
)
}
.flatMap { transactionManager.start(UNDO, it) }
6 changes: 6 additions & 0 deletions src/main/kotlin/org/rooftop/netx/engine/listen/CommandType.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.rooftop.netx.engine.listen

enum class CommandType {
DEFAULT,
CONTEXT,
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.TransactionCommitEvent
import org.rooftop.netx.api.TransactionCommitListener
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.AbstractOrchestrateListener
import org.rooftop.netx.engine.OrchestrateEvent
import org.rooftop.netx.engine.RequestHolder
@@ -12,7 +15,7 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, V>,
private val orchestrateCommand: OrchestrateCommand<T, V>,
private val resultHolder: ResultHolder,
requestHolder: RequestHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -30,13 +33,18 @@ internal class CommitOrchestrateListener<T : Any, V : Any> internal constructor(
.map { it.decodeEvent(OrchestrateEvent::class) }
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionCommitEvent)
.map { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionCommitEvent.transactionId)
.map{ it to event }
}
.map { (request, event) ->
orchestrateCommand.command(request, event.context)
}
.flatMap { (response, _) ->
resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response)
}
.flatMap { resultHolder.setSuccessResult(transactionCommitEvent.transactionId, it) }
.onErrorRollback(
transactionCommitEvent.transactionId,
transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, V>,
private val orchestrateCommand: OrchestrateCommand<T, V>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -32,24 +32,21 @@ internal class JoinOrchestrateListener<T : Any, V : Any>(
&& it.orchestratorId == orchestratorId
}
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionJoinEvent)
.map { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionJoinEvent.transactionId)
.map{ it to event }
}
.map { (request, event) ->
orchestrateCommand.command(request, event.context)
}
.setNextCastableType()
.onErrorRollback(
transactionJoinEvent.transactionId,
transactionJoinEvent.decodeEvent(OrchestrateEvent::class)
)
.map { response ->
OrchestrateEvent(
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
clientEvent = codec.encode(response),
)
}
.toOrchestrateEvent()
.flatMap {
if (isLast) {
return@flatMap transactionManager.commit(
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.TransactionCommitEvent
import org.rooftop.netx.api.TransactionCommitListener
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.AbstractOrchestrateListener
import org.rooftop.netx.engine.OrchestrateEvent
import org.rooftop.netx.engine.RequestHolder
@@ -12,7 +15,7 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, Mono<V>>,
private val monoOrchestrateCommand: MonoOrchestrateCommand<T, V>,
requestHolder: RequestHolder,
private val resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -29,13 +32,18 @@ internal class MonoCommitOrchestrateListener<T : Any, V : Any> internal construc
.map { it.decodeEvent(OrchestrateEvent::class) }
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionCommitEvent)
.flatMap { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionCommitEvent.transactionId)
.map{ it to event }
}
.flatMap { (request, event) ->
monoOrchestrateCommand.command(request, event.context)
}
.flatMap { (response, _) ->
resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response)
}
.flatMap { resultHolder.setSuccessResult(transactionCommitEvent.transactionId, it) }
.onErrorRollback(
transactionCommitEvent.transactionId,
transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, Mono<V>>,
private val monoOrchestrateCommand: MonoOrchestrateCommand<T, V>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -32,24 +32,21 @@ internal class MonoJoinOrchestrateListener<T : Any, V : Any>(
&& it.orchestratorId == orchestratorId
}
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionJoinEvent)
.flatMap { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionJoinEvent.transactionId)
.map{ it to event }
}
.flatMap { (request, event) ->
monoOrchestrateCommand.command(request, event.context)
}
.setNextCastableType()
.onErrorRollback(
transactionJoinEvent.transactionId,
transactionJoinEvent.decodeEvent(OrchestrateEvent::class)
)
.map { response ->
OrchestrateEvent(
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
clientEvent = codec.encode(response),
)
}
.toOrchestrateEvent()
.flatMap {
if (isLast) {
return@flatMap transactionManager.commit(
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*
import reactor.core.publisher.Mono

class MonoOrchestrateCommand<T : Any, V : Any>(
private val commandType: CommandType = CommandType.DEFAULT,
private val codec: Codec,
private val command: Any
) {

@Suppress("UNCHECKED_CAST")
fun command(
request: T,
contextData: String,
): Mono<Pair<V, Context>> {
val context = Context(
codec,
codec.decode(contextData, object : TypeReference<MutableMap<String, String>>() {})
)
return when (commandType) {
CommandType.DEFAULT -> (command as Orchestrate<T, Mono<V>>).orchestrate(request)

CommandType.CONTEXT -> (command as ContextOrchestrate<T, Mono<V>>).orchestrate(
context,
request,
)
}.map {
it to context
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*
import reactor.core.publisher.Mono

class MonoRollbackCommand<T : Any>(
private val commandType: CommandType = CommandType.DEFAULT,
private val codec: Codec,
private val command: Any
) {

@Suppress("UNCHECKED_CAST")
fun command(
request: T,
contextData: String,
): Mono<Pair<Any?, Context>> {
val context = Context(
codec,
codec.decode(contextData, object : TypeReference<MutableMap<String, String>>() {})
)
return when (commandType) {
CommandType.DEFAULT -> (command as Rollback<T, Mono<Any?>>).rollback(request)

CommandType.CONTEXT -> (command as ContextRollback<T, Mono<Any?>>).rollback(
context,
request,
)
}.map { it to context }
.switchIfEmpty(Mono.just("ROLLBACK SUCCESS" to context))
}
}
Original file line number Diff line number Diff line change
@@ -6,14 +6,13 @@ import org.rooftop.netx.engine.OrchestrateEvent
import org.rooftop.netx.engine.RequestHolder
import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

internal class MonoRollbackOrchestrateListener<T : Any, V : Any>(
codec: Codec,
private val codec: Codec,
private val orchestratorId: String,
orchestrateSequence: Int,
private val transactionManager: TransactionManager,
private val rollbackFunction: RollbackFunction<T, Mono<*>>,
private val monoRollbackCommand: MonoRollbackCommand<T>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -31,31 +30,27 @@ internal class MonoRollbackOrchestrateListener<T : Any, V : Any>(
.map { it.decodeEvent(OrchestrateEvent::class) }
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.getHeldRequest(transactionRollbackEvent)
.flatMap { request ->
rollbackFunction.rollback(request)
.flatMap { (request, event) ->
monoRollbackCommand.command(request, event.context)
}
.switchIfEmpty(Mono.just("SUCCESS ROLLBACK"))
.map { }
.cascadeRollback(transactionRollbackEvent)
}

private fun Mono<Unit>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono<Unit> {
return this.doOnSuccess {
val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class)
if (!isFirst && orchestrateEvent.orchestratorId == orchestratorId
&& orchestrateEvent.orchestrateSequence == orchestrateSequence
) {
private fun Mono<Pair<Any?, Context>>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono<Unit> {
return this.filter { !isFirst }
.flatMap { (_, context) ->
val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class)
val nextOrchestrateEvent = OrchestrateEvent(
orchestrateEvent.orchestratorId,
beforeRollbackOrchestrateSequence,
orchestrateEvent.clientEvent
orchestrateEvent.clientEvent,
codec.encode(context.contexts),
)
transactionManager.rollback(
transactionRollbackEvent.transactionId,
transactionRollbackEvent.cause,
nextOrchestrateEvent
).subscribeOn(Schedulers.parallel()).subscribe()
}
}
)
}.map { }
}
}
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ internal class MonoStartOrchestrateListener<T : Any, V : Any>(
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, Mono<V>>,
private val monoOrchestrateCommand: MonoOrchestrateCommand<T, V>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -29,24 +29,21 @@ internal class MonoStartOrchestrateListener<T : Any, V : Any>(
return transactionStartEvent.toOrchestrateEvent()
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionStartEvent)
.flatMap { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionStartEvent.transactionId)
.map{ it to event }
}
.flatMap { (request, event) ->
monoOrchestrateCommand.command(request, event.context)
}
.setNextCastableType()
.onErrorRollback(
transactionStartEvent.transactionId,
transactionStartEvent.decodeEvent(OrchestrateEvent::class)
)
.map { response ->
OrchestrateEvent(
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
clientEvent = codec.encode(response),
)
}
.toOrchestrateEvent()
.flatMap {
if (isLast) {
return@flatMap transactionManager.commit(
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*

internal class OrchestrateCommand<T : Any, V : Any>(
private val commandType: CommandType = CommandType.DEFAULT,
private val codec: Codec,
private val command: Any
) {

@Suppress("UNCHECKED_CAST")
fun command(
request: T,
contextData: String,
): Pair<V, Context> {
val context = Context(
codec,
codec.decode(contextData, object : TypeReference<MutableMap<String, String>>() {})
)
return when (commandType) {
CommandType.DEFAULT -> (command as Orchestrate<T, V>).orchestrate(request)

CommandType.CONTEXT -> (command as ContextOrchestrate<T, V>).orchestrate(
context,
request,
)
} to context
}
}
29 changes: 29 additions & 0 deletions src/main/kotlin/org/rooftop/netx/engine/listen/RollbackCommand.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.rooftop.netx.engine.listen

import org.rooftop.netx.api.*

class RollbackCommand<T : Any>(
private val commandType: CommandType = CommandType.DEFAULT,
private val codec: Codec,
private val command: Any
) {

@Suppress("UNCHECKED_CAST")
fun command(
request: T,
contextData: String,
): Pair<Any?, Context> {
val context = Context(
codec,
codec.decode(contextData, object : TypeReference<MutableMap<String, String>>() {})
)
return when (commandType) {
CommandType.DEFAULT -> (command as Rollback<T, *>).rollback(request)

CommandType.CONTEXT -> (command as ContextRollback<T, *>).rollback(
context,
request,
)
} to context
}
}
Original file line number Diff line number Diff line change
@@ -6,14 +6,13 @@ import org.rooftop.netx.engine.OrchestrateEvent
import org.rooftop.netx.engine.RequestHolder
import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

internal class RollbackOrchestrateListener<T : Any, V : Any>(
codec: Codec,
private val codec: Codec,
private val orchestratorId: String,
orchestrateSequence: Int,
private val transactionManager: TransactionManager,
private val rollbackFunction: RollbackFunction<T, *>,
private val rollbackCommand: RollbackCommand<T>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -30,31 +29,33 @@ internal class RollbackOrchestrateListener<T : Any, V : Any>(
return transactionRollbackEvent.toOrchestrateEvent()
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.getHeldRequest(transactionRollbackEvent)
.map { request ->
rollbackFunction.rollback(request)
.map { (request, event) ->
rollbackCommand.command(request, event.context)
}
.map {
if (it.first == null) {
return@map "ROLLBACK SUCCESS" to it.second
}
it
}
.switchIfEmpty(Mono.just("SUCCESS ROLLBACK"))
.map { }
.cascadeRollback(transactionRollbackEvent)
}

private fun Mono<Unit>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono<Unit> {
return this.doOnSuccess {
val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class)
if (!isFirst && orchestrateEvent.orchestratorId == orchestratorId
&& orchestrateEvent.orchestrateSequence == orchestrateSequence
) {
private fun Mono<Pair<Any?, Context>>.cascadeRollback(transactionRollbackEvent: TransactionRollbackEvent): Mono<Unit> {
return this.filter { !isFirst }
.flatMap { (_, context) ->
val orchestrateEvent = transactionRollbackEvent.decodeEvent(OrchestrateEvent::class)
val nextOrchestrateEvent = OrchestrateEvent(
orchestrateEvent.orchestratorId,
beforeRollbackOrchestrateSequence,
orchestrateEvent.clientEvent
orchestrateEvent.clientEvent,
codec.encode(context.contexts),
)
transactionManager.rollback(
transactionRollbackEvent.transactionId,
transactionRollbackEvent.cause,
nextOrchestrateEvent
).subscribeOn(Schedulers.parallel()).subscribe()
}
}
)
}.map { }
}
}
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ internal class StartOrchestrateListener<T : Any, V : Any>(
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateFunction: OrchestrateFunction<T, V>,
private val orchestrateCommand: OrchestrateCommand<T, V>,
requestHolder: RequestHolder,
resultHolder: ResultHolder,
) : AbstractOrchestrateListener<T, V>(
@@ -29,24 +29,21 @@ internal class StartOrchestrateListener<T : Any, V : Any>(
return transactionStartEvent.toOrchestrateEvent()
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.map { event ->
codec.decode(event.clientEvent, getCastableType())
codec.decode(event.clientEvent, getCastableType()) to event
}
.holdRequestIfRollbackable(transactionStartEvent)
.map { request ->
orchestrateFunction.orchestrate(request)
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionStartEvent.transactionId)
.map { it to event }
}
.map { (request, event) ->
orchestrateCommand.command(request, event.context)
}
.setNextCastableType()
.onErrorRollback(
transactionStartEvent.transactionId,
transactionStartEvent.decodeEvent(OrchestrateEvent::class)
)
.map { response ->
OrchestrateEvent(
orchestratorId = orchestratorId,
orchestrateSequence = orchestrateSequence + 1,
clientEvent = codec.encode(response),
)
}
.toOrchestrateEvent()
.flatMap {
if (isLast) {
return@flatMap transactionManager.commit(
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.rooftop.netx.client

import org.rooftop.netx.api.OrchestrateFunction
import org.rooftop.netx.api.Orchestrate
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.engine.OrchestratorFactory
import org.springframework.context.annotation.Bean
@@ -18,12 +18,12 @@ class OrchestratorConfigurer(
.commit(IntOrchestrator)
}

object IntOrchestrator : OrchestrateFunction<Int, Int> {
object IntOrchestrator : Orchestrate<Int, Int> {

override fun orchestrate(request: Int): Int = request + 1
}

object MonoIntOrchestrator : OrchestrateFunction<Int, Mono<Int>> {
object MonoIntOrchestrator : Orchestrate<Int, Mono<Int>> {

override fun orchestrate(request: Int): Mono<Int> = Mono.fromCallable { request + 1 }
}
88 changes: 79 additions & 9 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.engine.OrchestratorTest.Companion.contextResult
import org.rooftop.netx.engine.OrchestratorTest.Companion.monoRollbackResult
import org.rooftop.netx.engine.OrchestratorTest.Companion.rollbackOrchestratorResult
import org.rooftop.netx.engine.OrchestratorTest.Companion.upChainResult
import org.springframework.context.annotation.Bean
@@ -16,13 +18,13 @@ class OrchestratorConfigurer(
@Bean(name = ["numberOrchestrator"])
fun numberOrchestrator(): Orchestrator<Int, Int> {
return orchestratorFactory.create<Int>("numberOrchestrator")
.start(function = { it + 1 })
.join(function = { it + 1 })
.joinReactive(function = { Mono.just(it + 1) })
.commit(function = { it + 1 })
.start(orchestrate = { it + 1 })
.join(orchestrate = { it + 1 })
.joinReactive(orchestrate = { Mono.just(it + 1) })
.commit(orchestrate = { it + 1 })
}

@Bean(name = ["homeOrchestartor"])
@Bean(name = ["homeOrchestrator"])
fun homeOrchestrator(): Orchestrator<OrchestratorTest.Home, OrchestratorTest.Home> {
return orchestratorFactory.create<OrchestratorTest.Home>("homeOrchestrator")
.startReactive({ home ->
@@ -64,26 +66,26 @@ class OrchestratorConfigurer(
fun rollbackOrchestrator(): Orchestrator<String, String> {
return orchestratorFactory.create<String>("rollbackOrchestrator")
.start(
function = {
orchestrate = {
rollbackOrchestratorResult.add("1")
},
rollback = {
rollbackOrchestratorResult.add("-1")
}
)
.join(
function = {
orchestrate = {
rollbackOrchestratorResult.add("2")
}
)
.join(
function = {
orchestrate = {
rollbackOrchestratorResult.add("3")
},
rollback = { rollbackOrchestratorResult.add("-3") }
)
.commit(
function = {
orchestrate = {
rollbackOrchestratorResult.add("4")
throw IllegalArgumentException("Rollback")
},
@@ -104,4 +106,72 @@ class OrchestratorConfigurer(
throw IllegalArgumentException("Rollback for test")
})
}

@Bean(name = ["monoRollbackOrchestrator"])
fun monoRollbackOrchestrator(): Orchestrator<String, String> {
return orchestratorFactory.create<String>("monoRollbackOrchestrator")
.startReactive(
{ Mono.fromCallable { monoRollbackResult.add("1") } },
{ Mono.fromCallable { monoRollbackResult.add("-1") } }
)
.joinReactive({ Mono.fromCallable { monoRollbackResult.add("2") } })
.joinReactive(
{ Mono.fromCallable { monoRollbackResult.add("3") } },
{ Mono.fromCallable { monoRollbackResult.add("-3") } }
)
.commitReactive({
Mono.fromCallable {
monoRollbackResult.add("4")
throw IllegalArgumentException("Rollback for test")
}
})
}

@Bean(name = ["contextOrchestrator"])
fun contextOrchestrator(): Orchestrator<String, String> {
return orchestratorFactory.create<String>("contextOrchestrator")
.startWithContext(
contextOrchestrate = { context, request ->
context.set("start-1", request)
"1"
},
contextRollback = { context, request ->
val start1 = context.decodeContext("start-1", String::class)
val join2 = context.decodeContext("join-2", String::class)
val join3 = context.decodeContext("join-3", String::class)
val rCommit4 = context.decodeContext("r-commit-4", String::class)
val rJoin3 = context.decodeContext("r-join-3", String::class)

contextResult.addAll(listOf(start1, join2, join3, rCommit4, rJoin3))
}
)
.joinWithContext(
contextOrchestrate = { context, request ->
context.set("join-2", request)
"2"
}
)
.joinReactiveWithContext(
contextOrchestrate = { context, request ->
Mono.fromCallable {
context.set("join-3", request)
"3"
}
},
contextRollback = { context, request ->
Mono.fromCallable {
context.set("r-join-3", "r$request")
}
}
)
.commitWithContext(
contextOrchestrate = { context, request ->
context.set("commit-4", request)
throw IllegalArgumentException("Rollback")
},
contextRollback = { context, request ->
context.set("r-commit-4", "r$request")
}
)
}
}
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ internal class OrchestratorFactoryTest(
}) {

companion object {

private fun OrchestratorFactory.createIntOrchestrator(orchestratorId: String): Orchestrator<Int, Int> {
return this.create<Int>(orchestratorId)
.start({ it + 1 })
42 changes: 42 additions & 0 deletions src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.equality.shouldBeEqualToComparingFields
import io.kotest.matchers.equals.shouldBeEqual
import org.rooftop.netx.api.Orchestrate
import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
@@ -31,6 +32,8 @@ class OrchestratorTest(
private val manyTypeOrchestrator: Orchestrator<Int, Home>,
@Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator<String, String>,
@Qualifier("upChainRollbackOrchestrator") private val upChainRollbackOrchestrator: Orchestrator<String, String>,
@Qualifier("monoRollbackOrchestrator") private val monoRollbackOrchestrator: Orchestrator<String, String>,
@Qualifier("contextOrchestrator") private val contextOrchestrator: Orchestrator<String, String>,
) : DescribeSpec({

describe("numberOrchestrator 구현채는") {
@@ -121,6 +124,43 @@ class OrchestratorTest(
}
}
}

describe("monoRollbackOrchestrator 구현채는") {
context("transaction 메소드가 호출되면,") {
val expected = listOf("1", "2", "3", "4", "-3", "-1")

it("실패한 부분부터 위로 거슬러 올라가며 롤백한다.") {
val result = monoRollbackOrchestrator.transactionSync("")

result.isSuccess shouldBeEqual false
shouldThrowWithMessage<IllegalArgumentException>("Rollback for test") {
result.throwError()
}
eventually(5.seconds) {
monoRollbackResult shouldBeEqual expected
}
}
}
}

describe("contextOrchestrator 구현채는") {
context("transaction 메소드가 호출되면,") {
val expected = listOf("0", "1", "2", "r3", "r2")

it("context 에서 아이템을 교환하며 Saga를 진행한다.") {
val result = contextOrchestrator.transactionSync("0")

result.isSuccess shouldBeEqual false
shouldThrowWithMessage<IllegalArgumentException>("Rollback") {
result.throwError()
}
eventually(5.seconds) {
contextResult shouldBeEqual expected
}
}
}
}

}) {
data class Home(
val address: String,
@@ -140,5 +180,7 @@ class OrchestratorTest(
companion object {
val rollbackOrchestratorResult = mutableListOf<String>()
val upChainResult = mutableListOf<String>()
val monoRollbackResult = mutableListOf<String>()
val contextResult = mutableListOf<String>()
}
}

0 comments on commit 87ff5c1

Please sign in to comment.