Skip to content

Commit d3d335b

Browse files
committed
BroadcastChannelMultiReceiveStressTest streamlined,
consistent stdout headers in all stress-tests
1 parent 7f1380a commit d3d335b

9 files changed

+34
-23
lines changed

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class BroadcastChannelMultiReceiveStressTest(
4040
}
4141

4242
private val nReceivers = if (isStressTest) 10 else 5
43-
private val nSeconds = 5 * stressTestMultiplier
43+
private val nSeconds = 3 * stressTestMultiplier
4444

4545
private val broadcast = kind.create<Long>()
4646
private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest")
@@ -57,6 +57,7 @@ class BroadcastChannelMultiReceiveStressTest(
5757

5858
@Test
5959
fun testStress() = runBlocking {
60+
println("--- BroadcastChannelMultiReceiveStressTest $kind with nReceivers=$nReceivers")
6061
val ctx = pool + coroutineContext[Job]!!
6162
val sender =
6263
launch(context = ctx + CoroutineName("Sender")) {
@@ -67,31 +68,35 @@ class BroadcastChannelMultiReceiveStressTest(
6768
}
6869
}
6970
val receivers = mutableListOf<Job>()
70-
repeat(nSeconds) { sec ->
71-
// launch new receiver up to max
72-
if (receivers.size < nReceivers) {
73-
val receiverIndex = receivers.size
74-
val name = "Receiver$receiverIndex"
75-
println("$sec: Launching $name")
76-
receivers += launch(ctx + CoroutineName(name)) {
77-
broadcast.openSubscription().use { sub ->
78-
when (receiverIndex % 5) {
79-
0 -> doReceive(sub, receiverIndex)
80-
1 -> doReceiveOrNull(sub, receiverIndex)
81-
2 -> doIterator(sub, receiverIndex)
82-
3 -> doReceiveSelect(sub, receiverIndex)
83-
4 -> doReceiveSelectOrNull(sub, receiverIndex)
84-
}
71+
fun printProgress() {
72+
println("Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
73+
}
74+
// ramp up receivers
75+
repeat(nReceivers) {
76+
delay(100) // wait 0.1 sec
77+
val receiverIndex = receivers.size
78+
val name = "Receiver$receiverIndex"
79+
println("Launching $name")
80+
receivers += launch(ctx + CoroutineName(name)) {
81+
broadcast.openSubscription().use { sub ->
82+
when (receiverIndex % 5) {
83+
0 -> doReceive(sub, receiverIndex)
84+
1 -> doReceiveOrNull(sub, receiverIndex)
85+
2 -> doIterator(sub, receiverIndex)
86+
3 -> doReceiveSelect(sub, receiverIndex)
87+
4 -> doReceiveSelectOrNull(sub, receiverIndex)
8588
}
8689
}
8790
}
88-
// wait a sec
89-
delay(100)
90-
// print progress
91-
println("${sec + 1}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}, receivers=${receivers.size}")
91+
printProgress()
92+
}
93+
// wait
94+
repeat(nSeconds) { sec ->
95+
delay(1000)
96+
printProgress()
9297
}
9398
sender.cancelAndJoin()
94-
println("Tested with nReceivers=$nReceivers")
99+
println("Tested $kind with nReceivers=$nReceivers")
95100
val total = sentTotal.get()
96101
println(" Sent $total events, waiting for receivers")
97102
stopOnReceive.set(total)

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelSubStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class BroadcastChannelSubStressTest(
4747

4848
@Test
4949
fun testStress() = runBlocking {
50+
println("--- BroadcastChannelSubStressTest $kind")
5051
val ctx = coroutineContext + CommonPool
5152
val sender =
5253
launch(context = ctx + CoroutineName("Sender")) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelAtomicCancelStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class ChannelAtomicCancelStressTest(val kind: TestChannelKind) : TestBase() {
7272

7373
@Test
7474
fun testAtomicCancelStress() = runBlocking<Unit> {
75+
println("--- ChannelAtomicCancelStressTest $kind")
7576
val deadline = System.currentTimeMillis() + TEST_DURATION
7677
launchSender()
7778
launchReceiver()

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelSendReceiveStressTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ class ChannelSendReceiveStressTest(
5959

6060
@Test
6161
fun testSendReceiveStress() = runBlocking {
62-
println("-------------------------------------")
63-
println("Testing $kind with nSenders=$nSenders, nReceivers=$nReceivers")
62+
println("--- ChannelSendReceiveStressTest $kind with nSenders=$nSenders, nReceivers=$nReceivers")
6463
val receivers = List(nReceivers) { receiverIndex ->
6564
// different event receivers use different code
6665
launch(CommonPool + CoroutineName("receiver$receiverIndex")) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelNotifyStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
3737

3838
@Test
3939
fun testStressNotify()= runBlocking<Unit> {
40+
println("--- ConflatedBroadcastChannelNotifyStressTest")
4041
val senders = List(nSenders) { senderId ->
4142
launch(CommonPool + CoroutineName("Sender$senderId")) {
4243
repeat(nEvents) { i ->

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class ConflatedChannelCloseStressTest : TestBase() {
4141

4242
@Test
4343
fun testStressClose() = runBlocking<Unit> {
44+
println("--- ConflatedChannelCloseStressTest with nSenders=$nSenders")
4445
val senderJobs = List(nSenders) { Job() }
4546
val senders = List(nSenders) { senderId ->
4647
launch(pool) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListLongStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class LockFreeLinkedListLongStressTest : TestBase() {
4343

4444
@Test
4545
fun testStress() {
46+
println("--- LockFreeLinkedListLongStressTest")
4647
for (j in 0 until nAddThreads)
4748
threads += thread(start = false, name = "adder-$j") {
4849
for (i in j until nAdded step nAddThreads) {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedListShortStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class LockFreeLinkedListShortStressTest : TestBase() {
4646

4747
@Test
4848
fun testStress() {
49+
println("--- LockFreeLinkedListShortStressTest")
4950
val deadline = System.currentTimeMillis() + TEST_DURATION
5051
repeat(nAdderThreads) { threadId ->
5152
threads += thread(start = false, name = "adder-$threadId") {

core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectPhilosophersStressTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class SelectPhilosophersStressTest : TestBase() {
4949

5050
@Test
5151
fun testPhilosophers() = runBlocking<Unit> {
52+
println("--- SelectPhilosophersStressTest")
5253
val timeLimit = System.currentTimeMillis() + TEST_DURATION
5354
val philosophers = List<Deferred<Int>>(n) { id ->
5455
async(CommonPool) {

0 commit comments

Comments
 (0)