Skip to content
This repository was archived by the owner on Dec 30, 2020. It is now read-only.

Commit 16d8315

Browse files
author
Mikolaj Leszczynski
authored
Merge pull request #82 from babylonhealth/orbit-2-performance
[PDT-159] Orbit2 performance improvements
2 parents edb9bac + 3698cf3 commit 16d8315

File tree

13 files changed

+210
-139
lines changed

13 files changed

+210
-139
lines changed

.idea/jarRepositories.xml

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

buildSrc/src/main/kotlin/DependencyManagement.kt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ object Versions {
2525

2626
const val kotlin = "1.3.72"
2727
const val coroutines = "1.3.5"
28-
const val coroutineExtensions = "0.0.4"
2928

3029
const val androidLifecycles = "2.2.0"
3130
const val androidLifecyclesSavedState = "2.2.0"
@@ -62,8 +61,6 @@ object ProjectDependencies {
6261
"org.jetbrains.kotlinx:kotlinx-coroutines-core:${Versions.coroutines}"
6362
const val kotlinCoroutinesRx2 =
6463
"org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${Versions.coroutines}"
65-
const val kotlinCoroutineExtensions =
66-
"com.github.akarnokd:kotlin-flow-extensions:${Versions.coroutineExtensions}"
6764
const val kotlinTest = "org.jetbrains.kotlin:kotlin-test:${Versions.kotlin}"
6865

6966
// Android libraries

orbit-2-core/orbit-2-core_build.gradle.kts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ plugins {
2222
dependencies {
2323
implementation(kotlin("stdlib-jdk8"))
2424
implementation(ProjectDependencies.kotlinCoroutines)
25-
implementation(ProjectDependencies.kotlinCoroutineExtensions)
2625

2726
// Testing
2827
GroupedDependencies.testsImplementationJUnit5.forEach { testImplementation(it) }

orbit-2-core/src/main/java/com/babylon/orbit2/BasePlugin.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,21 @@ object BasePlugin : OrbitPlugin {
7979
}
8080
is SideEffect<*, *, *> -> flow.onEach {
8181
with(operator as SideEffect<S, SE, E>) {
82-
createContext(it).let {
82+
createContext(it).let { context ->
8383
SideEffectContext(
84-
it.state,
85-
it.event,
84+
context.state,
85+
context.event,
8686
containerContext.postSideEffect
8787
)
8888
}
8989
.block()
9090
}
9191
}
92-
is Reduce -> flow.onEach {
92+
is Reduce -> flow.onEach { event ->
9393
with(operator) {
94-
containerContext.setState {
95-
createContext(it).block() as S
96-
}
94+
containerContext.setState.send(
95+
createContext(event).block() as S
96+
)
9797
}
9898
}
9999
else -> flow

orbit-2-core/src/main/java/com/babylon/orbit2/OrbitPlugin.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.babylon.orbit2
1818

1919
import kotlinx.coroutines.CoroutineDispatcher
20+
import kotlinx.coroutines.channels.SendChannel
2021
import kotlinx.coroutines.flow.Flow
2122

2223
interface OrbitPlugin {
@@ -29,7 +30,7 @@ interface OrbitPlugin {
2930

3031
class ContainerContext<S : Any, SE : Any>(
3132
val backgroundDispatcher: CoroutineDispatcher,
32-
val setState: suspend (() -> S) -> Unit,
33+
val setState: SendChannel<S>,
3334
val postSideEffect: (SE) -> Unit
3435
)
3536
}

orbit-2-core/src/main/java/com/babylon/orbit2/RealContainer.kt

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,80 +16,67 @@
1616

1717
package com.babylon.orbit2
1818

19-
import hu.akarnokd.kotlin.flow.replay
2019
import kotlinx.coroutines.CoroutineDispatcher
2120
import kotlinx.coroutines.CoroutineScope
2221
import kotlinx.coroutines.Dispatchers
22+
import kotlinx.coroutines.ExperimentalCoroutinesApi
2323
import kotlinx.coroutines.FlowPreview
2424
import kotlinx.coroutines.asCoroutineDispatcher
2525
import kotlinx.coroutines.channels.Channel
2626
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
2727
import kotlinx.coroutines.flow.Flow
28-
import kotlinx.coroutines.flow.asFlow
2928
import kotlinx.coroutines.flow.collect
30-
import kotlinx.coroutines.flow.distinctUntilChanged
3129
import kotlinx.coroutines.flow.flowOf
3230
import kotlinx.coroutines.launch
3331
import kotlinx.coroutines.sync.Mutex
3432
import kotlinx.coroutines.sync.withLock
3533
import java.util.concurrent.Executors
3634

35+
@ExperimentalCoroutinesApi
3736
@FlowPreview
3837
open class RealContainer<STATE : Any, SIDE_EFFECT : Any>(
3938
initialState: STATE,
4039
settings: Container.Settings,
4140
orbitDispatcher: CoroutineDispatcher = DEFAULT_DISPATCHER,
4241
backgroundDispatcher: CoroutineDispatcher = Dispatchers.IO
4342
) : Container<STATE, SIDE_EFFECT> {
44-
override val currentState: STATE
45-
get() = stateChannel.value
43+
private val scope = CoroutineScope(orbitDispatcher)
4644
private val stateChannel = ConflatedBroadcastChannel(initialState)
4745
private val sideEffectChannel = Channel<SIDE_EFFECT>(Channel.RENDEZVOUS)
48-
private val scope = CoroutineScope(orbitDispatcher)
49-
private val stateMutex = Mutex()
5046
private val sideEffectMutex = Mutex()
47+
private val pluginContext = OrbitPlugin.ContainerContext(
48+
backgroundDispatcher = backgroundDispatcher,
49+
setState = stateChannel,
50+
postSideEffect = { event: SIDE_EFFECT ->
51+
scope.launch {
52+
// Ensure side effect ordering
53+
sideEffectMutex.withLock {
54+
sideEffectChannel.send(event)
55+
}
56+
}
57+
}
58+
)
59+
60+
override val currentState: STATE
61+
get() = stateChannel.value
5162

52-
override val stateStream: Stream<STATE> =
53-
stateChannel.asFlow().distinctUntilChanged().replay(1) { it }.asStream()
63+
override val stateStream = stateChannel.asStateStream { currentState }
5464

5565
override val sideEffectStream: Stream<SIDE_EFFECT> =
5666
if (settings.sideEffectCaching) {
5767
sideEffectChannel.asCachingStream(scope)
5868
} else {
59-
sideEffectChannel.asStream(scope)
69+
sideEffectChannel.asNonCachingStream()
6070
}
6171

62-
override fun orbit(
63-
init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>
64-
) {
72+
override fun orbit(init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>) {
6573
scope.launch {
6674
collectFlow(init)
6775
}
6876
}
6977

70-
private val pluginContext = OrbitPlugin.ContainerContext<STATE, SIDE_EFFECT>(
71-
backgroundDispatcher = backgroundDispatcher,
72-
setState = {
73-
scope.launch {
74-
stateMutex.withLock {
75-
val reduced = it()
76-
stateChannel.send(reduced)
77-
}
78-
}.join()
79-
},
80-
postSideEffect = { event: SIDE_EFFECT ->
81-
scope.launch {
82-
sideEffectMutex.withLock {
83-
sideEffectChannel.send(event)
84-
}
85-
}
86-
}
87-
)
88-
8978
@Suppress("UNCHECKED_CAST")
90-
suspend fun collectFlow(
91-
init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>
92-
) {
79+
suspend fun collectFlow(init: Builder<STATE, SIDE_EFFECT, Unit>.() -> Builder<STATE, SIDE_EFFECT, *>) {
9380
Builder<STATE, SIDE_EFFECT, Unit>()
9481
.init().stack.fold(flowOf(Unit)) { flow: Flow<Any>, operator: Operator<STATE, *> ->
9582
Orbit.plugins.fold(flow) { flow2: Flow<Any>, plugin: OrbitPlugin ->

orbit-2-core/src/main/java/com/babylon/orbit2/StreamExtensions.kt

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,93 +19,79 @@ package com.babylon.orbit2
1919
import kotlinx.coroutines.CoroutineScope
2020
import kotlinx.coroutines.CoroutineStart
2121
import kotlinx.coroutines.Dispatchers
22-
import kotlinx.coroutines.cancel
22+
import kotlinx.coroutines.ExperimentalCoroutinesApi
2323
import kotlinx.coroutines.channels.BroadcastChannel
2424
import kotlinx.coroutines.channels.Channel
25-
import kotlinx.coroutines.channels.ReceiveChannel
2625
import kotlinx.coroutines.channels.broadcast
27-
import kotlinx.coroutines.flow.Flow
28-
import kotlinx.coroutines.flow.collect
29-
import kotlinx.coroutines.isActive
3026
import kotlinx.coroutines.launch
31-
import kotlinx.coroutines.runBlocking
32-
import kotlinx.coroutines.sync.Mutex
33-
import kotlinx.coroutines.sync.withLock
3427
import java.io.Closeable
28+
import java.util.concurrent.atomic.AtomicInteger
3529

36-
internal fun <T> Flow<T>.asStream(): Stream<T> {
30+
@ExperimentalCoroutinesApi
31+
internal fun <T> BroadcastChannel<T>.asStateStream(initial: () -> T): Stream<T> {
3732
return object : Stream<T> {
3833
override fun observe(lambda: (T) -> Unit): Closeable {
39-
val scope = CoroutineScope(Dispatchers.Unconfined)
40-
scope.launch {
41-
this@asStream.collect {
42-
lambda(it)
34+
val sub = this@asStateStream.openSubscription()
35+
36+
CoroutineScope(Dispatchers.Unconfined).launch {
37+
var lastState = initial()
38+
lambda(lastState)
39+
40+
for (state in sub) {
41+
if (state != lastState) {
42+
lastState = state
43+
lambda(state)
44+
}
4345
}
4446
}
45-
return Closeable { scope.cancel() }
47+
return Closeable { sub.cancel() }
4648
}
4749
}
4850
}
4951

50-
internal fun <T> Channel<T>.asStream(originalScope: CoroutineScope): Stream<T> {
51-
return object : Stream<T> {
52-
private val broadcastChannel = originalScope.broadcast(
53-
capacity = 1024,
54-
start = CoroutineStart.DEFAULT
55-
) {
56-
for (item in this@asStream) {
57-
if (isActive) {
58-
send(item)
59-
} else {
60-
break
61-
}
62-
}
63-
}
52+
@ExperimentalCoroutinesApi
53+
internal fun <T> Channel<T>.asNonCachingStream(): Stream<T> {
54+
val broadcastChannel = this.broadcast(start = CoroutineStart.DEFAULT)
6455

56+
return object : Stream<T> {
6557
override fun observe(lambda: (T) -> Unit): Closeable {
66-
val scope = CoroutineScope(Dispatchers.Unconfined)
6758
val receiveChannel = broadcastChannel.openSubscription()
68-
scope.launch {
59+
CoroutineScope(Dispatchers.Unconfined).launch {
6960
for (item in receiveChannel) {
7061
lambda(item)
7162
}
7263
}
7364
return Closeable {
7465
receiveChannel.cancel()
75-
scope.cancel()
7666
}
7767
}
7868
}
7969
}
8070

71+
@ExperimentalCoroutinesApi
8172
internal fun <T> Channel<T>.asCachingStream(originalScope: CoroutineScope): Stream<T> {
8273
return object : Stream<T> {
83-
private val channels = mutableSetOf<ReceiveChannel<T>>()
74+
private val subCount = AtomicInteger(0)
8475
private val buffer = mutableListOf<T>()
85-
private val bufferMutex = Mutex()
8676
private val channel = BroadcastChannel<T>(Channel.BUFFERED)
8777

8878
init {
8979
originalScope.launch {
9080
for (item in this@asCachingStream) {
91-
bufferMutex.withLock {
92-
if (channels.isEmpty()) {
93-
buffer.add(item)
94-
} else {
95-
channel.send(item)
96-
}
81+
if (subCount.get() == 0) {
82+
buffer.add(item)
83+
} else {
84+
channel.send(item)
9785
}
9886
}
9987
}
10088
}
10189

10290
override fun observe(lambda: (T) -> Unit): Closeable {
103-
val scope = CoroutineScope(Dispatchers.Unconfined)
10491
val receiveChannel = channel.openSubscription()
10592

106-
scope.launch {
107-
bufferMutex.withLock {
108-
channels += receiveChannel
93+
CoroutineScope(Dispatchers.Unconfined).launch {
94+
if (subCount.compareAndSet(0, 1)) {
10995
buffer.forEach { buffered ->
11096
channel.send(buffered)
11197
}
@@ -116,12 +102,8 @@ internal fun <T> Channel<T>.asCachingStream(originalScope: CoroutineScope): Stre
116102
}
117103
}
118104
return Closeable {
119-
runBlocking {
120-
bufferMutex.withLock {
121-
channels.remove(receiveChannel)
122-
receiveChannel.cancel()
123-
}
124-
}
105+
receiveChannel.cancel()
106+
subCount.decrementAndGet()
125107
}
126108
}
127109
}

0 commit comments

Comments
 (0)