Skip to content

Commit

Permalink
fix: 자신이 발행한 transaction을 찾지 못하는 버그 수정 (#32)
Browse files Browse the repository at this point in the history
* 자신이 발행한 transaction을 찾지 못하는 버그를 수정한다

* refactor: 사용하지 않는 메소드를 삭제한다
  • Loading branch information
devxb authored Feb 18, 2024
1 parent 2411e4b commit 793f6da
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,22 @@ abstract class AbstractTransactionDispatcher {
)
)

TransactionState.TRANSACTION_STATE_ROLLBACK -> findOwnTransaction(transaction)
TransactionState.TRANSACTION_STATE_ROLLBACK -> findOwnUndo(transaction)
.map {
TransactionRollbackEvent(
transaction.id,
transaction.serverId,
transaction.group,
transaction.cause,
it.undo,
it,
)
}

else -> throw cannotFindMatchedTransactionEventException
}
}

protected abstract fun findOwnTransaction(transaction: Transaction): Mono<Transaction>
protected abstract fun findOwnUndo(transaction: Transaction): Mono<String>

protected abstract fun ack(
transaction: Transaction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
import org.rooftop.netx.meta.TransactionHandler
import org.springframework.context.ApplicationContext
import org.springframework.data.redis.connection.stream.ReadOffset
import org.springframework.data.redis.connection.stream.StreamOffset
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
Expand Down Expand Up @@ -64,19 +62,15 @@ class RedisStreamTransactionDispatcher(
}
}

override fun findOwnTransaction(transaction: Transaction): Mono<Transaction> {
return reactiveRedisTemplate.opsForStream<String, String>()
.read(StreamOffset.create(STREAM_KEY, ReadOffset.from("0")))
.map { Transaction.parseFrom(it.value["data"]?.toByteArray()) }
.filter { it.group == nodeGroup }
.filter { hasUndo(it) }
.next()
override fun findOwnUndo(transaction: Transaction): Mono<String> {
return reactiveRedisTemplate.opsForHash<String, String>()[transaction.id, nodeGroup]
.switchIfEmpty(
Mono.error {
error("Cannot find undo state in transaction hashes key \"${transaction.id}\"")
}
)
}

private fun hasUndo(transaction: Transaction): Boolean =
transaction.state == TransactionState.TRANSACTION_STATE_JOIN
|| transaction.state == TransactionState.TRANSACTION_STATE_START

override fun ack(transaction: Transaction, messageId: String): Mono<Pair<Transaction, String>> {
return reactiveRedisTemplate.opsForStream<String, ByteArray>()
.acknowledge(STREAM_KEY, nodeGroup, messageId)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package org.rooftop.netx.redis

import org.redisson.api.RedissonReactiveClient
import org.rooftop.netx.engine.AbstractTransactionManager
import org.rooftop.netx.engine.AbstractTransactionRetrySupporter
import org.rooftop.netx.idl.Transaction
import org.rooftop.netx.idl.TransactionState
import org.springframework.data.redis.connection.stream.Record
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import java.util.concurrent.TimeUnit

class RedisStreamTransactionManager(
nodeId: Int,
nodeName: String,
nodeGroup: String,
transactionRetrySupporter: AbstractTransactionRetrySupporter,
private val nodeGroup: String,
private val reactiveRedisTemplate: ReactiveRedisTemplate<String, ByteArray>,
private val redissonReactiveClient: RedissonReactiveClient,
) : AbstractTransactionManager(
nodeId = nodeId,
nodeName = nodeName,
Expand All @@ -26,13 +20,13 @@ class RedisStreamTransactionManager(

override fun findAnyTransaction(transactionId: String): Mono<TransactionState> {
return reactiveRedisTemplate
.opsForValue()[transactionId]
.opsForHash<String, String>()[transactionId, STATE_KEY]
.switchIfEmpty(
Mono.error {
error("Cannot find exists transaction id \"$transactionId\"")
}
)
.map { TransactionState.valueOf(String(it)) }
.map { TransactionState.valueOf(it) }
}

override fun publishTransaction(transactionId: String, transaction: Transaction): Mono<String> {
Expand All @@ -42,24 +36,32 @@ class RedisStreamTransactionManager(
.withStreamKey(STREAM_KEY)
)
.flatMap {
redissonReactiveClient.getLock("$transactionId-key")
.tryLock(10, TimeUnit.MINUTES)
}
.flatMap {
reactiveRedisTemplate.opsForValue()
.set(transactionId, transaction.state.name.toByteArray())
}
.doFinally {
redissonReactiveClient.getLock("$transactionId-key")
.forceUnlock()
.subscribeOn(Schedulers.parallel())
.subscribe()
if (hasUndo(transaction)) {
return@flatMap reactiveRedisTemplate.opsForHash<String, String>()
.putAll(
transactionId, mapOf(
STATE_KEY to transaction.state.name,
nodeGroup to transaction.undo
)
)
}
reactiveRedisTemplate.opsForHash<String, String>()
.putAll(
transactionId, mapOf(
STATE_KEY to transaction.state.name,
)
)
}
.map { transactionId }
}

private fun hasUndo(transaction: Transaction): Boolean =
transaction.state == TransactionState.TRANSACTION_STATE_JOIN
|| transaction.state == TransactionState.TRANSACTION_STATE_START

private companion object {
private const val DATA = "data"
private const val STREAM_KEY = "NETX_STREAM"
private const val STATE_KEY = "TX_STATE"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ class RedisTransactionConfigurer(
nodeId = nodeId,
nodeName = nodeName,
nodeGroup = nodeGroup,
transactionRetrySupporter = redisTransactionRetrySupporter(),
reactiveRedisTemplate = reactiveRedisTemplate(),
redissonReactiveClient = redissonReactiveClient(),
)

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,15 @@ class NoAckRedisStreamTransactionDispatcher(
}
}

override fun findOwnTransaction(transaction: Transaction): Mono<Transaction> {
return reactiveRedisTemplate.opsForStream<String, String>()
.read(StreamOffset.create(transaction.id, ReadOffset.from("0")))
.map { Transaction.parseFrom(it.value["data"]?.toByteArray()) }
.filter { it.group == nodeGroup }
.filter { hasUndo(it) }
.next()
override fun findOwnUndo(transaction: Transaction): Mono<String> {
return reactiveRedisTemplate.opsForHash<String, String>()[transaction.id, nodeGroup]
.switchIfEmpty(
Mono.error {
error("Cannot find undo state in transaction hashes key \"${transaction.id}\"")
}
)
}

private fun hasUndo(transaction: Transaction): Boolean =
transaction.state == TransactionState.TRANSACTION_STATE_JOIN
|| transaction.state == TransactionState.TRANSACTION_STATE_START

override fun ack(transaction: Transaction, messageId: String): Mono<Pair<Transaction, String>> =
Mono.just(transaction to messageId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ class NoAckRedisTransactionConfigurer(
nodeId = nodeId,
nodeName = nodeName,
nodeGroup = nodeGroup,
transactionRetrySupporter = redisTransactionRetrySupporter(),
reactiveRedisTemplate = reactiveRedisTemplate(),
redissonReactiveClient = redissonReactiveClient(),
)

@Bean
Expand Down

0 comments on commit 793f6da

Please sign in to comment.