-
Notifications
You must be signed in to change notification settings - Fork 0
/
SimpleContinuation.kt
191 lines (166 loc) · 6.44 KB
/
SimpleContinuation.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package mycorda.app.continuations.simple
import mycorda.app.continuations.*
import mycorda.app.continuations.events.ScheduledActionCreatedFactory
import mycorda.app.registry.Registrar
import mycorda.app.registry.Registry
import mycorda.app.ses.EventStore
import mycorda.app.ses.InMemoryEventStore
import mycorda.app.sks.SKS
import mycorda.app.sks.SimpleKVStore
import java.lang.Exception
import java.lang.Long.max
import java.lang.RuntimeException
import kotlin.reflect.KClass
/**
* Something that can be scheduled to run at some point in the future
*/
data class Scheduled<out T : Any>(
val key: String,
val ctx: ContinuationContext,
val clazz: KClass<out T>, // can I get rid of this
val block: (ctx: ContinuationContext) -> T,
val scheduledTime: Long = System.currentTimeMillis() + 1000
)
interface Scheduler {
fun <T : Any> schedule(scheduled: Scheduled<T>)
fun <T : Any> waitFor(key: String): T
}
interface SchedulerFactory {
fun get(continuation: Continuation): Scheduler
}
/**
* Wires up a SimpleContinuation
*/
class SimpleContinuationRegistrar : Registrar {
override fun register(registry: Registry, strict: Boolean): Registry {
if (!registry.contains(EventStore::class.java)) {
if (strict) {
throw RuntimeException("There should be an EventStore class in the registry")
} else {
registry.store(InMemoryEventStore())
}
}
if (!registry.contains(SKS::class.java)) {
if (strict) {
throw RuntimeException("There should be a Simple Key Value Store (SKS) class in the registry")
} else {
registry.store(SimpleKVStore())
}
}
registry.store(SimpleSchedulerService(registry))
registry.store(SimpleSchedulerFactory(registry))
registry.store(SimpleContinuationFactory(registry))
registry.store(ContinuableFactory(registry))
return registry
}
}
class SimpleScheduler(
registry: Registry,
private val continuation: Continuation
) : Scheduler {
private val schedulerService = registry.get(SchedulerService::class.java)
override fun <T : Any> schedule(scheduled: Scheduled<T>) {
schedulerService.schedule(scheduled)
}
override fun <T : Any> waitFor(key: String): T {
val scheduled = schedulerService.get<T>(key)
val delayedRetry = max(scheduled.scheduledTime - System.currentTimeMillis(), 1)
Thread.sleep(delayedRetry)
schedulerService.completed(scheduled.key)
return continuation.execBlock(key, scheduled.clazz, scheduled.block, scheduled.ctx)
}
}
class SimpleSchedulerFactory(private val registry: Registry) : SchedulerFactory {
override fun get(continuation: Continuation): Scheduler {
return SimpleScheduler(registry, continuation)
}
}
interface SchedulerService {
fun <T : Any> schedule(action: Scheduled<T>)
fun <T : Any> get(key: String): Scheduled<T>
fun completed(key: String)
}
class SimpleSchedulerService(registry: Registry) : SchedulerService {
private val es = registry.get(EventStore::class.java)
private val schedules = ArrayList<Scheduled<Any>>()
override fun <T : Any> schedule(action: Scheduled<T>) {
// create a persistent event for recovery
es.store(ScheduledActionCreatedFactory.create(action))
schedules.add(action)
}
override fun <T : Any> get(key: String): Scheduled<T> {
@Suppress("UNCHECKED_CAST")
return schedules.single { it.key == key } as Scheduled<T>
}
override fun completed(key: String) {
schedules.removeIf { it.key == key }
}
}
class SimpleContinuation(
private val exceptionStrategy: ContinuationExceptionStrategy = RetryNTimesExceptionStrategy(),
schedulerFactory: SchedulerFactory
) : Continuation {
constructor(registry: Registry) : this(
registry.getOrElse(ContinuationExceptionStrategy::class.java, RetryNTimesExceptionStrategy()),
registry.get(SchedulerFactory::class.java)
)
private val scheduler = schedulerFactory.get(this)
private val lookup = HashMap<String, Any>()
override fun <T : Any> execBlock(
key: String,
clazz: KClass<out T>,
block: (ctx: ContinuationContext) -> T,
ctx: ContinuationContext
): T {
if (!lookup.containsKey(key)) {
// step has not run successfully before
try {
val result = block.invoke(ctx)
lookup[key] = result
return result
} catch (ex: Exception) {
val retry = exceptionStrategy.handle(ctx, ex)
when (retry) {
is ImmediateRetry -> {
if (retry.maxAttempts >= retry.newContext().attempts) {
return this.execBlock(key, clazz, block, retry.newContext())
}
}
is DelayedRetry -> {
if (retry.maxAttempts >= retry.newContext().attempts) {
val scheduled = Scheduled(key, retry.newContext(), clazz, block)
scheduler.schedule(scheduled)
return scheduler.waitFor(key)
}
}
is DontRetry -> {
println("opps")
// todo - log before throwing
throw ex
}
}
// what to do here ?
throw ex
}
} else {
// step has run successfully and can be skipped
@Suppress("UNCHECKED_CAST")
return lookup[key] as T
}
}
}
/**
* Create a SimpleContinuation
*/
class SimpleContinuationFactory(registry: Registry) : ContinuationFactory {
private val registry = registry.clone() // make a clean copy as registry is mutable
private val schedulerFactory = registry.get(SchedulerFactory::class.java)
private val lookup = HashMap<ContinuationId, SimpleContinuation>()
override fun get(continuationId: ContinuationId): Continuation {
lookup.putIfAbsent(continuationId, SimpleContinuation(exceptionStrategy(), schedulerFactory))
return lookup[continuationId]!!
}
override fun exceptionStrategy(): ContinuationExceptionStrategy {
return registry.getOrElse(ContinuationExceptionStrategy::class.java, RetryNTimesExceptionStrategy())
}
}