Skip to content

Commit 0bc0c5b

Browse files
authored
feat: Prevent retry of specific exceptions (#65)
* feat: noRetryFor에 등록된 예외는 무시하도록 한다 * perf: blokcing call을 제거하고 non-blocking 하게 수정한다 * docs: netx.logging.level 의 오타를 수정한다 * docs: README.md 에 noRetryFor을 설명한다
1 parent e4f267a commit 0bc0c5b

16 files changed

+297
-151
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class Application {
5353
| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
5454
| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
5555
| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
56-
| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warning", "off" | "off" |
56+
| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
5757

5858
### Usage example
5959

@@ -160,8 +160,12 @@ class TransactionHandler {
160160
// ...
161161
}
162162

163-
@TransactionCommitHandler
163+
@TransactionCommitHandler(
164+
event = Foo::class,
165+
noRetryFor = [IllegalArgumentException::class]
166+
) // Dont retry when throw IllegalArgumentException. *Retry if throw Throwable or IllegalArgumentException's super type*
164167
fun handleTransactionCommitEvent(event: TransactionCommitEvent): Mono<String> { // In Webflux framework, publisher must be returned.
168+
throw IllegalArgumentException("Ignore this exception")
165169
// ...
166170
}
167171

src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ import kotlin.reflect.KClass
55
@Target(AnnotationTarget.FUNCTION)
66
@Retention(AnnotationRetention.RUNTIME)
77
annotation class TransactionCommitListener(
8-
val event: KClass<*> = Any::class
8+
val event: KClass<*> = Any::class,
9+
val noRetryFor: Array<KClass<out Throwable>> = [],
910
)

src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ import kotlin.reflect.KClass
55
@Target(AnnotationTarget.FUNCTION)
66
@Retention(AnnotationRetention.RUNTIME)
77
annotation class TransactionJoinListener(
8-
val event: KClass<*> = Any::class
8+
val event: KClass<*> = Any::class,
9+
val noRetryFor: Array<KClass<out Throwable>> = [],
910
)

src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ import kotlin.reflect.KClass
55
@Target(AnnotationTarget.FUNCTION)
66
@Retention(AnnotationRetention.RUNTIME)
77
annotation class TransactionRollbackListener(
8-
val event: KClass<*> = Any::class
8+
val event: KClass<*> = Any::class,
9+
val noRetryFor: Array<KClass<out Throwable>> = [],
910
)

src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ import kotlin.reflect.KClass
55
@Target(AnnotationTarget.FUNCTION)
66
@Retention(AnnotationRetention.RUNTIME)
77
annotation class TransactionStartListener(
8-
val event: KClass<*> = Any::class
8+
val event: KClass<*> = Any::class,
9+
val noRetryFor: Array<KClass<out Throwable>> = [],
910
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.rooftop.netx.engine
2+
3+
import org.rooftop.netx.api.TransactionEvent
4+
import kotlin.reflect.KClass
5+
import kotlin.reflect.KFunction
6+
7+
internal sealed class AbstractDispatchFunction<T>(
8+
protected val eventType: KClass<*>,
9+
protected val function: KFunction<T>,
10+
protected val handler: Any,
11+
protected val noRetryFor: Array<KClass<out Throwable>>,
12+
) {
13+
fun name(): String = function.name
14+
15+
abstract fun call(transactionEvent: TransactionEvent): T
16+
}

src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt

Lines changed: 55 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,29 @@ abstract class AbstractTransactionDispatcher(
1818
private val codec: Codec,
1919
) {
2020

21-
private val monoTransactionHandleFunctions =
22-
mutableMapOf<TransactionState, MutableList<MonoFunction>>()
23-
24-
private val notPublisherTransactionHandlerFunctions =
25-
mutableMapOf<TransactionState, MutableList<NotPublisherFunction>>()
21+
private val functions =
22+
mutableMapOf<TransactionState, MutableList<AbstractDispatchFunction<*>>>()
23+
24+
fun dispatch(transaction: Transaction, messageId: String): Mono<String> {
25+
return Flux.fromIterable(functions[transaction.state] ?: listOf())
26+
.flatMap { function ->
27+
when (function) {
28+
is MonoDispatchFunction -> {
29+
mapToTransactionEvent(transaction)
30+
.callMono(function)
31+
.warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
32+
}
2633

27-
fun dispatch(transaction: Transaction, messageId: String): Boolean {
28-
var isSuccess = true
29-
if (notPublisherTransactionHandlerFunctions.isEmpty()) {
30-
dispatchToMonoHandler(transaction)
31-
.subscribeOn(Schedulers.boundedElastic())
32-
.ackWhenComplete(transaction, messageId)
33-
.subscribe({ isSuccess = true }, { isSuccess = false })
34-
return isSuccess
35-
}
36-
if (monoTransactionHandleFunctions.isEmpty()) {
37-
dispatchToNotPublisherHandler(transaction)
38-
.subscribeOn(Schedulers.boundedElastic())
39-
.ackWhenComplete(transaction, messageId)
40-
.subscribe({ isSuccess = true }, { isSuccess = false })
41-
return isSuccess
42-
}
43-
dispatchToMonoHandler(transaction)
44-
.flatMap { dispatchToNotPublisherHandler(transaction) }
34+
is NotPublishDispatchFunction -> {
35+
mapToTransactionEvent(transaction)
36+
.callNotPublish(function)
37+
.warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
38+
}
39+
}
40+
}
4541
.subscribeOn(Schedulers.boundedElastic())
4642
.ackWhenComplete(transaction, messageId)
47-
.subscribe({ isSuccess = true }, { isSuccess = false })
48-
return isSuccess
43+
.then(Mono.just(DISPATHCED))
4944
}
5045

5146
private fun Flux<*>.ackWhenComplete(
@@ -62,32 +57,6 @@ abstract class AbstractTransactionDispatcher(
6257
.subscribe()
6358
}
6459

65-
private fun dispatchToMonoHandler(transaction: Transaction): Flux<Any> {
66-
return Mono.just(transaction.state)
67-
.flatMapMany { state ->
68-
Flux.fromIterable(monoTransactionHandleFunctions[state] ?: listOf())
69-
}
70-
.publishOn(Schedulers.boundedElastic())
71-
.flatMap { monoFunction ->
72-
mapToTransactionEvent(transaction)
73-
.flatMap { monoFunction.call(it) }
74-
.warningOnError("Error occurred in TransactionHandler function \"${monoFunction.name()}\" with transaction \n{\n$transaction}")
75-
}
76-
}
77-
78-
private fun dispatchToNotPublisherHandler(transaction: Transaction): Flux<*> {
79-
return Mono.just(transaction.state)
80-
.flatMapMany { state ->
81-
Flux.fromIterable(notPublisherTransactionHandlerFunctions[state] ?: listOf())
82-
}
83-
.publishOn(Schedulers.boundedElastic())
84-
.flatMap { notPublisherFunction ->
85-
mapToTransactionEvent(transaction)
86-
.map { notPublisherFunction.call(it) }
87-
.warningOnError("Error occurred in TransactionHandler function \"${notPublisherFunction.name()}\" with transaction \n{\n$transaction}")
88-
}
89-
}
90-
9160
private fun mapToTransactionEvent(transaction: Transaction): Mono<TransactionEvent> {
9261
return when (transaction.state) {
9362
TransactionState.TRANSACTION_STATE_START -> Mono.just(
@@ -153,19 +122,18 @@ abstract class AbstractTransactionDispatcher(
153122
@PostConstruct
154123
fun initHandler() {
155124
val transactionHandler = findHandlers(TransactionHandler::class)
156-
val monoFunctions = getMonoFunctions(transactionHandler)
157-
monoTransactionHandleFunctions.putAll(monoFunctions)
158-
val notPublisherFunctions = getNotPublisherFunctions(transactionHandler)
159-
notPublisherTransactionHandlerFunctions.putAll(notPublisherFunctions)
125+
initMonoFunctions(transactionHandler)
126+
initNotPublisherFunctions(transactionHandler)
127+
functions.forEach { (_, notPublisherFunction) ->
128+
val notPublisherFunctionNames = notPublisherFunction.map { it.name() }.toList()
129+
info("Register functions names : \"${notPublisherFunctionNames}\"")
130+
}
160131
}
161132

162133
@Suppress("UNCHECKED_CAST")
163-
private fun getMonoFunctions(
134+
private fun initMonoFunctions(
164135
foundHandlers: List<Any>,
165-
): MutableMap<TransactionState, MutableList<MonoFunction>> {
166-
val handlers =
167-
mutableMapOf<TransactionState, MutableList<MonoFunction>>()
168-
136+
) {
169137
for (handler in foundHandlers) {
170138
val returnTypeMatchedHandlers = handler::class.declaredMemberFunctions
171139
.filter { it.returnType.classifier == Mono::class }
@@ -176,25 +144,27 @@ abstract class AbstractTransactionDispatcher(
176144
runCatching {
177145
val transactionState = getMatchedTransactionState(annotation)
178146
val eventType = getEventType(annotation)
179-
handlers.putIfAbsent(transactionState, mutableListOf())
180-
handlers[transactionState]?.add(
181-
MonoFunction(eventType, function as KFunction<Mono<*>>, handler)
147+
val noRetryFor = getNoRetryFor(annotation)
148+
functions.putIfAbsent(transactionState, mutableListOf())
149+
functions[transactionState]?.add(
150+
MonoDispatchFunction(
151+
eventType,
152+
function as KFunction<Mono<*>>,
153+
handler,
154+
noRetryFor,
155+
)
182156
)
183157
}.onFailure {
184158
throw IllegalStateException("Cannot add Mono TransactionHandler", it)
185159
}
186160
}
187161
}
188162
}
189-
190-
return handlers
191163
}
192164

193-
private fun getNotPublisherFunctions(
165+
private fun initNotPublisherFunctions(
194166
foundHandlers: List<Any>
195-
): MutableMap<TransactionState, MutableList<NotPublisherFunction>> {
196-
val handlers =
197-
mutableMapOf<TransactionState, MutableList<NotPublisherFunction>>()
167+
) {
198168

199169
for (handler in foundHandlers) {
200170
val returnTypeMatchedHandlers = handler::class.declaredMemberFunctions
@@ -206,18 +176,17 @@ abstract class AbstractTransactionDispatcher(
206176
runCatching {
207177
val transactionState = getMatchedTransactionState(annotation)
208178
val eventType = getEventType(annotation)
209-
handlers.putIfAbsent(transactionState, mutableListOf())
210-
handlers[transactionState]?.add(
211-
NotPublisherFunction(eventType, function, handler)
179+
val noRetryFor = getNoRetryFor(annotation)
180+
functions.putIfAbsent(transactionState, mutableListOf())
181+
functions[transactionState]?.add(
182+
NotPublishDispatchFunction(eventType, function, handler, noRetryFor)
212183
)
213184
}.onFailure {
214185
throw IllegalStateException("Cannot add TransactionHandler", it)
215186
}
216187
}
217188
}
218189
}
219-
220-
return handlers
221190
}
222191

223192
protected abstract fun <T : Annotation> findHandlers(type: KClass<T>): List<Any>
@@ -232,6 +201,16 @@ abstract class AbstractTransactionDispatcher(
232201
}
233202
}
234203

204+
private fun getNoRetryFor(annotation: Annotation): Array<KClass<out Throwable>> {
205+
return when (annotation) {
206+
is TransactionStartListener -> annotation.noRetryFor
207+
is TransactionCommitListener -> annotation.noRetryFor
208+
is TransactionJoinListener -> annotation.noRetryFor
209+
is TransactionRollbackListener -> annotation.noRetryFor
210+
else -> throw notMatchedTransactionHandlerException
211+
}
212+
}
213+
235214
private fun getMatchedTransactionState(annotation: Annotation): TransactionState {
236215
return when (annotation) {
237216
is TransactionStartListener -> TransactionState.TRANSACTION_STATE_START
@@ -242,65 +221,14 @@ abstract class AbstractTransactionDispatcher(
242221
}
243222
}
244223

245-
private class MonoFunction(
246-
private val eventType: KClass<*>,
247-
private val function: KFunction<Mono<*>>,
248-
private val handler: Any,
249-
) {
250-
251-
fun name(): String = function.name
252-
fun call(transactionEvent: TransactionEvent): Mono<*> {
253-
runCatching { transactionEvent.decodeEvent(eventType) }
254-
.fold(
255-
onSuccess = {
256-
return function.call(handler, transactionEvent)
257-
.info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
258-
},
259-
onFailure = {
260-
if (it is NullPointerException && eventType == Any::class) {
261-
return function.call(handler, transactionEvent)
262-
.info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
263-
}
264-
}
265-
)
266-
return Mono.empty<String>()
267-
}
268-
}
269-
270-
private class NotPublisherFunction(
271-
private val eventType: KClass<*>,
272-
private val function: KFunction<*>,
273-
private val handler: Any,
274-
) {
275-
fun name(): String = function.name
276-
277-
fun call(transactionEvent: TransactionEvent): Any? {
278-
runCatching {
279-
transactionEvent.decodeEvent(eventType)
280-
}.fold(
281-
onSuccess = {
282-
val result = function.call(handler, transactionEvent)
283-
info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
284-
return result
285-
},
286-
onFailure = {
287-
if (it is NullPointerException && eventType == Any::class) {
288-
val result = function.call(handler, transactionEvent)
289-
info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
290-
return result
291-
}
292-
}
293-
)
294-
return Unit
295-
}
296-
}
297-
298224
protected abstract fun ack(
299225
transaction: Transaction,
300226
messageId: String
301227
): Mono<Pair<Transaction, String>>
302228

303229
private companion object {
230+
private const val DISPATHCED = "dispatched"
231+
304232
private val cannotFindMatchedTransactionEventException =
305233
java.lang.IllegalStateException("Cannot find matched transaction event")
306234

src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,9 @@ abstract class AbstractTransactionListener(
2020
.doOnNext {
2121
info("Listen transaction \n{\n${it.first}}\nmessageId \"${it.second}\"")
2222
}
23-
.map { (transaction, messageId) ->
24-
val isSuccess = transactionDispatcher.dispatch(transaction, messageId)
25-
if (!isSuccess) {
26-
warningOnError("Error occurred when listen transaction \n{\n$transaction}\nmessageId \"$messageId\"")
27-
}
28-
isSuccess
23+
.flatMap { (transaction, messageId) ->
24+
transactionDispatcher.dispatch(transaction, messageId)
25+
.warningOnError("Error occurred when listen transaction \n{\n$transaction}\nmessageId \"$messageId\"")
2926
}
3027
.onErrorResume { Mono.empty() }
3128
.restartWhenTerminated()

src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,9 @@ abstract class AbstractTransactionRetrySupporter(
3838
.doOnNext {
3939
info("Retry orphan transaction \n{\n${it.first}}\nmessageId \"${it.second}\"")
4040
}
41-
.map { (transaction, messageId) ->
42-
val isSuccess = transactionDispatcher.dispatch(transaction, messageId)
43-
if (!isSuccess) {
44-
warningOnError("Error occurred when retry orphan transaction \n{\n$transaction}\nmessageId \"${messageId}\"")
45-
}
46-
isSuccess
41+
.flatMap { (transaction, messageId) ->
42+
transactionDispatcher.dispatch(transaction, messageId)
43+
.warningOnError("Error occurred when retry orphan transaction \n{\n$transaction}\nmessageId \"${messageId}\"")
4744
}
4845
.onErrorResume { Mono.empty() }
4946
.subscribeOn(Schedulers.immediate())

0 commit comments

Comments
 (0)