Skip to content

Commit

Permalink
refactor(wow-core): WaitStrategy (#1123)
Browse files Browse the repository at this point in the history
- Remove the static methods in the WaitingFor class and implement them in the companion object instead
- Add multiple waiting strategy implementation classes to handle different command stages
- Optimize the registration and handling logic of waiting strategies
- Update relevant test cases
  • Loading branch information
Ahoo-Wang authored Jan 13, 2025
1 parent a844575 commit eaba32e
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 181 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
spring-boot = "3.4.1"
cosid = "2.10.2"
simba = "2.6.2"
coapi="1.9.2"
coapi = "1.9.2"
testcontainers = "1.20.4"
opentelemetry = "1.46.0"
opentelemetry-instrumentation = "2.11.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ interface CommandGateway : CommandBus {
fun <C : Any> sendAndWaitForProcessed(
command: CommandMessage<C>
): Mono<CommandResult> =
sendAndWait(command, WaitingFor.processed(command.contextName))
sendAndWait(command, WaitingFor.processed())

fun <C : Any> sendAndWaitForSnapshot(
command: CommandMessage<C>
): Mono<CommandResult> =
sendAndWait(command, WaitingFor.snapshot(command.contextName))
sendAndWait(command, WaitingFor.snapshot())
}
92 changes: 0 additions & 92 deletions wow-core/src/main/kotlin/me/ahoo/wow/command/wait/WaitStrategy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
package me.ahoo.wow.command.wait

import me.ahoo.wow.api.messaging.processor.ProcessorInfo
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import java.util.*

/**
* Command Wait Strategy
Expand All @@ -33,92 +30,3 @@ interface WaitStrategy : ProcessorInfo {
*/
fun next(signal: WaitSignal)
}

class WaitingFor(
val stage: CommandStage,
override val contextName: String,
override val processorName: String = ""
) : WaitStrategy {

companion object {
private val log = LoggerFactory.getLogger(WaitingFor::class.java)
fun processed(contextName: String): WaitingFor =
stage(stage = CommandStage.PROCESSED, contextName = contextName)

fun snapshot(contextName: String): WaitingFor =
stage(stage = CommandStage.SNAPSHOT, contextName = contextName)

fun projected(contextName: String, processorName: String = ""): WaitingFor =
stage(stage = CommandStage.PROJECTED, contextName = contextName, processorName = processorName)

fun eventHandled(contextName: String, processorName: String = ""): WaitingFor =
stage(stage = CommandStage.EVENT_HANDLED, contextName = contextName, processorName = processorName)

fun sagaHandled(contextName: String, processorName: String = ""): WaitingFor =
stage(stage = CommandStage.SAGA_HANDLED, contextName = contextName, processorName = processorName)

fun stage(stage: CommandStage, contextName: String, processorName: String = ""): WaitingFor =
WaitingFor(stage = stage, contextName = contextName, processorName = processorName)

fun stage(stage: String, contextName: String, processorName: String = ""): WaitingFor =
stage(
stage = CommandStage.valueOf(stage.uppercase(Locale.getDefault())),
contextName = contextName,
processorName = processorName
)
}

private val sink: Sinks.One<WaitSignal> = Sinks.one()
private val result: MutableMap<String, Any> = mutableMapOf()
override fun waiting(): Mono<WaitSignal> {
return sink.asMono()
}

override fun error(throwable: Throwable) {
sink.tryEmitError(throwable)
}

private fun nextSignal(signal: WaitSignal) {
val mergedSignal = signal.copyResult(result)
sink.tryEmitValue(mergedSignal)
}

override fun next(signal: WaitSignal) {
if (log.isDebugEnabled) {
log.debug("Next $signal.")
}
result.putAll(signal.result)
if (!signal.succeeded && stage.isAfter(signal.stage)) {
// fail fast
nextSignal(signal)
return
}
if (stage != signal.stage) {
return
}

if (stage == CommandStage.SENT || stage == CommandStage.PROCESSED || stage == CommandStage.SNAPSHOT) {
nextSignal(signal)
return
}

if (!isSameBoundedContext(signal.function)) {
return
}
if (processorName.isBlank()) {
if (signal.isLastProjection) {
nextSignal(signal)
}
return
}

if (processorName == signal.function.processorName) {
nextSignal(signal)
return
}
}

override fun toString(): String {
return "WaitingFor(stage=$stage, contextName='$contextName', processorName='$processorName')"
}
}
78 changes: 78 additions & 0 deletions wow-core/src/main/kotlin/me/ahoo/wow/command/wait/WaitingFor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import java.util.*

interface WaitingFor : WaitStrategy {
val stage: CommandStage

companion object {

fun processed(): WaitingFor = WaitingForProcessed()

fun snapshot(): WaitingFor = WaitingForSnapshot()

fun projected(contextName: String, processorName: String = ""): WaitingFor =
WaitingForProjected(
contextName = contextName,
processorName = processorName
)

fun eventHandled(contextName: String, processorName: String = ""): WaitingFor =
WaitingForEventHandled(
contextName = contextName,
processorName = processorName
)

fun sagaHandled(contextName: String, processorName: String = ""): WaitingFor =
WaitingForSagaHandled(
contextName = contextName,
processorName = processorName
)

fun stage(stage: CommandStage, contextName: String, processorName: String = ""): WaitingFor {
return when (stage) {
CommandStage.PROCESSED -> processed()
CommandStage.SNAPSHOT -> snapshot()
CommandStage.PROJECTED -> projected(contextName, processorName)
CommandStage.EVENT_HANDLED -> eventHandled(contextName, processorName)
CommandStage.SAGA_HANDLED -> sagaHandled(contextName, processorName)
CommandStage.SENT -> throw IllegalArgumentException("Unsupported stage: $stage")
}
}

fun stage(stage: String, contextName: String, processorName: String = ""): WaitingFor =
stage(
stage = CommandStage.valueOf(stage.uppercase(Locale.getDefault())),
contextName = contextName,
processorName = processorName
)
}
}

abstract class AbstractWaitingFor : WaitingFor {

protected val sink: Sinks.One<WaitSignal> = Sinks.one()

override fun waiting(): Mono<WaitSignal> {
return sink.asMono()
}

override fun error(throwable: Throwable) {
sink.tryEmitError(throwable)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

abstract class WaitingForAfterProcessed : AbstractWaitingFor() {
@Volatile
private var processedSignal: WaitSignal? = null

@Volatile
private var waitingForSignal: WaitSignal? = null
private val result: MutableMap<String, Any> = mutableMapOf()
protected fun nextSignal() {
val waitingForSignal = waitingForSignal
if (processedSignal == null || waitingForSignal == null) {
return
}
val mergedSignal = waitingForSignal.copyResult(result)
sink.tryEmitValue(mergedSignal)
}

open fun isWaitingForSignal(signal: WaitSignal): Boolean {
if (signal.stage != stage || !isSameBoundedContext(signal.function)) {
return false
}
if (processorName.isBlank()) {
return true
}
return signal.function.processorName == processorName
}

override fun next(signal: WaitSignal) {
result.putAll(signal.result)
if (signal.stage == CommandStage.PROCESSED) {
processedSignal = signal
if (!signal.succeeded) {
sink.tryEmitValue(signal)
return
}
}
if (isWaitingForSignal(signal)) {
waitingForSignal = signal
}
nextSignal()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

class WaitingForEventHandled(
override val contextName: String,
override val processorName: String = ""
) : WaitingForAfterProcessed() {
override val stage: CommandStage
get() = CommandStage.EVENT_HANDLED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

class WaitingForProcessed : AbstractWaitingFor() {
override val stage: CommandStage
get() = CommandStage.PROCESSED
override val contextName: String = ""
override val processorName: String = ""
override fun next(signal: WaitSignal) {
if (signal.stage != CommandStage.PROCESSED) {
return
}
sink.tryEmitValue(signal)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

class WaitingForProjected(
override val contextName: String,
override val processorName: String = ""
) : WaitingForAfterProcessed() {
override val stage: CommandStage
get() = CommandStage.PROJECTED

override fun isWaitingForSignal(signal: WaitSignal): Boolean {
return super.isWaitingForSignal(signal) && signal.isLastProjection
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

class WaitingForSagaHandled(
override val contextName: String,
override val processorName: String = ""
) : WaitingForAfterProcessed() {
override val stage: CommandStage
get() = CommandStage.SAGA_HANDLED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.command.wait

class WaitingForSnapshot : WaitingForAfterProcessed() {
override val stage: CommandStage
get() = CommandStage.SNAPSHOT
override val contextName: String = ""
override val processorName: String = ""

override fun isWaitingForSignal(signal: WaitSignal): Boolean {
return signal.stage == stage
}
}
Loading

0 comments on commit eaba32e

Please sign in to comment.