Skip to content

Commit

Permalink
refactor(core): introduce createAsMono method in StateAggregateFactory (
Browse files Browse the repository at this point in the history
#1092)

- Add createAsMono method to StateAggregateFactory interface
- Update implementations to use createAsMono instead of create
- Remove unnecessary toMono conversions
- Simplify code in various classes by using the new method
  • Loading branch information
Ahoo-Wang authored Dec 31, 2024
1 parent 8788408 commit b5597a7
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ internal class DefaultWhenStage<C : Any, S : Any>(
)
serverCommandExchange.setServiceProvider(serviceProvider)
val commandAggregateId = commandMessage.aggregateId
val expectedResultMono = stateAggregateFactory.create(
val expectedResultMono = stateAggregateFactory.createAsMono(
metadata.state,
commandAggregateId,
).toMono().map {
).map {
try {
commandMessage.body.validate()
} catch (throwable: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,22 @@ class StateEventCompensator(
): Mono<Long> {
val aggregateMetadata = aggregateId.requiredAggregateType<Any>()
.aggregateMetadata<Any, Any>()
val stateAggregate = stateAggregateFactory.create(aggregateMetadata.state, aggregateId)
return eventStore
.load(
aggregateId = aggregateId,
tailVersion = tailVersion,
)
.map {
stateAggregate.onSourcing(it)
it.toStateEvent(stateAggregate)
}
.filter {
it.version in headVersion..tailVersion
}
.concatMap {
compensate(it, target).thenReturn(it.aggregateId)
}.count()
return stateAggregateFactory.createAsMono(aggregateMetadata.state, aggregateId).flatMap { stateAggregate ->
eventStore
.load(
aggregateId = aggregateId,
tailVersion = tailVersion,
)
.map {
stateAggregate.onSourcing(it)
it.toStateEvent(stateAggregate)
}
.filter {
it.version in headVersion..tailVersion
}
.concatMap {
compensate(it, target).thenReturn(it.aggregateId)
}.count()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import me.ahoo.wow.modeling.state.StateAggregateFactory
import me.ahoo.wow.modeling.state.StateAggregateRepository
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono

/**
* Event Sourcing State Aggregate Repository .
Expand Down Expand Up @@ -52,7 +51,7 @@ class EventSourcingStateAggregateRepository(
}
.defaultIfEmpty(stateAggregateFactory.create(metadata, aggregateId))
} else {
stateAggregateFactory.create(metadata, aggregateId).toMono()
stateAggregateFactory.createAsMono(metadata, aggregateId)
}

return loadStateAggregate
Expand All @@ -75,16 +74,17 @@ class EventSourcingStateAggregateRepository(
metadata: StateAggregateMetadata<S>,
tailEventTime: Long
): Mono<StateAggregate<S>> {
val stateAggregate = stateAggregateFactory.create(metadata, aggregateId)
return eventStore
.load(
aggregateId = aggregateId,
headEventTime = stateAggregate.eventTime + 1,
tailEventTime = tailEventTime
)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
return stateAggregateFactory.createAsMono(metadata, aggregateId).flatMap { stateAggregate ->
eventStore
.load(
aggregateId = aggregateId,
headEventTime = stateAggregate.eventTime + 1,
tailEventTime = tailEventTime
)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ class EventStoreStateAggregateRepository(
metadata: StateAggregateMetadata<S>,
loadEventStream: (StateAggregate<S>) -> Flux<DomainEventStream>
): Mono<StateAggregate<S>> {
val stateAggregate = stateAggregateFactory.create(metadata, aggregateId)
return loadEventStream(stateAggregate)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
return stateAggregateFactory.createAsMono(metadata, aggregateId).flatMap { stateAggregate ->
loadEventStream(stateAggregate)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
}
}

override fun <S : Any> load(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ class RetryableAggregateProcessor<C : Any, S : Any>(

override fun process(exchange: ServerCommandExchange<*>): Mono<DomainEventStream> {
val stateAggregateMono = if (exchange.message.isCreate) {
Mono.fromCallable {
aggregateFactory.create(aggregateMetadata.state, exchange.message.aggregateId)
}
aggregateFactory.createAsMono(aggregateMetadata.state, exchange.message.aggregateId)
} else {
stateAggregateRepository.load(aggregateId, aggregateMetadata.state)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import me.ahoo.wow.modeling.matedata.AggregateMetadata
import me.ahoo.wow.modeling.matedata.StateAggregateMetadata
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono

/**
* Aggregate Factory .
Expand All @@ -31,6 +32,11 @@ import org.slf4j.LoggerFactory
*/
interface StateAggregateFactory {
fun <S : Any> create(metadata: StateAggregateMetadata<S>, aggregateId: AggregateId): StateAggregate<S>
fun <S : Any> createAsMono(metadata: StateAggregateMetadata<S>, aggregateId: AggregateId): Mono<StateAggregate<S>> {
return Mono.fromCallable {
create(metadata, aggregateId)
}
}
}

object ConstructorStateAggregateFactory : StateAggregateFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ class RegenerateSnapshotHandler(
) {

fun handle(aggregateId: AggregateId): Mono<Snapshot<*>> {
val stateAggregate = stateAggregateFactory.create(aggregateMetadata.state, aggregateId)
return eventStore
.load(
aggregateId = aggregateId,
headVersion = stateAggregate.expectedNextVersion,
)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
.filter {
it.initialized
}.flatMap {
val snapshot = SimpleSnapshot(it)
snapshotRepository.save(snapshot).thenReturn(snapshot)
}
return stateAggregateFactory.createAsMono(aggregateMetadata.state, aggregateId).flatMap { stateAggregate ->
eventStore
.load(
aggregateId = aggregateId,
headVersion = stateAggregate.expectedNextVersion,
)
.map {
stateAggregate.onSourcing(it)
}
.then(Mono.just(stateAggregate))
}.filter {
it.initialized
}.flatMap {
val snapshot = SimpleSnapshot(it)
snapshotRepository.save(snapshot).thenReturn(snapshot)
}
}
}

0 comments on commit b5597a7

Please sign in to comment.