Skip to content

Commit fdd6db5

Browse files
committed
Fixed BroadcastChannelMultiReceiveStressTest
1 parent 8348b8c commit fdd6db5

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ class BroadcastChannelMultiReceiveStressTest(
6060
val ctx = pool + coroutineContext[Job]!!
6161
val sender =
6262
launch(context = ctx + CoroutineName("Sender")) {
63+
var i = 0
6364
while (isActive) {
64-
broadcast.send(sentTotal.incrementAndGet())
65+
broadcast.send(++i)
66+
sentTotal.set(i) // set sentTotal only if `send` was not cancelled
6567
}
6668
}
6769
val receivers = mutableListOf<Job>()
@@ -84,7 +86,7 @@ class BroadcastChannelMultiReceiveStressTest(
8486
}
8587
}
8688
// wait a sec
87-
delay(1000)
89+
delay(100)
8890
// print progress
8991
println("${sec + 1}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
9092
}
@@ -93,13 +95,21 @@ class BroadcastChannelMultiReceiveStressTest(
9395
val total = sentTotal.get()
9496
println(" Sent $total events, waiting for receivers")
9597
stopOnReceive.set(total)
96-
withTimeout(5, TimeUnit.SECONDS) {
97-
receivers.forEachIndexed { index, receiver ->
98-
if (lastReceived[index].get() == total)
99-
receiver.cancel()
100-
else
101-
receiver.join()
98+
try {
99+
withTimeout(5, TimeUnit.SECONDS) {
100+
receivers.forEachIndexed { index, receiver ->
101+
if (lastReceived[index].get() == total)
102+
receiver.cancel()
103+
else
104+
receiver.join()
105+
}
106+
}
107+
} catch (e: Exception) {
108+
println("Failed: $e")
109+
receivers.indices.forEach { index ->
110+
println("lastReceived[$index] = ${lastReceived[index].get()}")
102111
}
112+
throw e
103113
}
104114
println(" Received ${receivedTotal.get()} events")
105115
}

0 commit comments

Comments
 (0)