Skip to content

Commit 19ea653

Browse files
committed
Added parallelReduce
1 parent f07b34f commit 19ea653

File tree

7 files changed

+232
-33
lines changed

7 files changed

+232
-33
lines changed

transformations/src/main/kotlin/ParallelMap.kt

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.lukaskusik.coroutines.transformations
2+
3+
import kotlinx.coroutines.async
4+
import kotlinx.coroutines.coroutineScope
5+
6+
/**
7+
* Performs map transformation on the iterable using coroutines.
8+
*/
9+
suspend fun <T, R> Iterable<T>.mapParallel(
10+
transform: (T) -> R
11+
): List<R> = coroutineScope {
12+
map { async { transform(it) } }.map { it.await() }
13+
}
14+
15+
/**
16+
* Performs map transformation on the iterable using coroutines.
17+
* The chunkSize parameter is used to run multiple transformations on a single coroutine.
18+
*
19+
* @param chunkSize Size of each sub-collection that will be reduced in each coroutine.
20+
*/
21+
suspend fun <T, R> Iterable<T>.mapParallelChunked(
22+
chunkSize: Int,
23+
transform: (T) -> R
24+
): List<R> = coroutineScope {
25+
chunked(chunkSize).map { subChunk ->
26+
async {
27+
subChunk.map(transform)
28+
}
29+
}.flatMap {
30+
it.await()
31+
}
32+
}
33+
34+
/**
35+
* Performs map transformation on the iterable using coroutines.
36+
*
37+
* It can split the collection into multiple chunks using the chunksCount parameter.
38+
* Each chunk will then run on a single coroutine, minimizing thread management, etc.
39+
* The default and recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8.
40+
*
41+
* @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors.
42+
*
43+
*/
44+
suspend fun <T, E> Collection<T>.mapParallelChunked(
45+
chunksCount: Int = Runtime.getRuntime().availableProcessors(),
46+
transform: (T) -> E
47+
): List<E> {
48+
val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt()
49+
return asIterable().mapParallelChunked(chunkSize, transform)
50+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.lukaskusik.coroutines.transformations
2+
3+
import kotlinx.coroutines.async
4+
import kotlinx.coroutines.coroutineScope
5+
6+
7+
/**
8+
* The reduce operation must be associative, since the reduce will be most likely out of order.
9+
* This method splits the original collection into chunks and reduces each of the chunks separately on their own coroutines and then reduces the result again.
10+
*
11+
* @param chunkSize Size of each sub-collection that will be reduced in each coroutine.
12+
*/
13+
suspend fun <T> Iterable<T>.reduceParallel(
14+
chunkSize: Int,
15+
operation: (T, T) -> T
16+
): T = coroutineScope {
17+
chunked(chunkSize).map { subChunk ->
18+
async {
19+
subChunk.reduce(operation)
20+
}
21+
}.map { it.await() }.reduce(operation)
22+
}
23+
24+
/**
25+
* The operation must be associative, since the reduce will be most likely out of order.
26+
* This method splits the original collection into chunks and reduces each of the chunks separately on their own coroutines and then reduces the result again.
27+
* The recommended chunksCount for multithreading is the number of CPU threads, e.g. 4 or 8.
28+
*
29+
* @param chunksCount How many chunks should the collection be split into. Defaults to the number of available processors.
30+
*/
31+
suspend fun <T> Collection<T>.reduceParallel(
32+
chunksCount: Int = Runtime.getRuntime().availableProcessors(),
33+
operation: (T, T) -> T
34+
): T {
35+
val chunkSize = Math.ceil(size / chunksCount.toDouble()).toInt()
36+
return asIterable().reduceParallel(chunkSize, operation)
37+
}

transformations/src/test/kotlin/ParallelMapBenchmark.kt renamed to transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/benchmark/ParallelMapBenchmark.kt

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
1-
package com.lukaskusik.coroutines.transformations.test
1+
package com.lukaskusik.coroutines.transformations.benchmark
22

33
import com.carrotsearch.junitbenchmarks.AbstractBenchmark
4+
import com.carrotsearch.junitbenchmarks.annotation.AxisRange
5+
import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart
46
import com.lukaskusik.coroutines.transformations.mapParallel
7+
import com.lukaskusik.coroutines.transformations.mapParallelChunked
8+
import com.lukaskusik.coroutines.transformations.test.ParallelMapTest
59
import kotlinx.coroutines.Dispatchers
610
import kotlinx.coroutines.runBlocking
711
import org.junit.Test
812

913
class ParallelMapBenchmark : AbstractBenchmark() {
1014

1115
companion object {
12-
const val LIST_SIZE = 100
16+
const val LIST_SIZE = 1000
1317
}
1418

1519
private val list = ParallelMapTest.getRandomListOfSize(LIST_SIZE)
@@ -34,11 +38,19 @@ class ParallelMapBenchmark : AbstractBenchmark() {
3438
}
3539
}
3640

37-
// @Test
38-
// fun parallelJustThreads() {
39-
// runBlocking(Dispatchers.Default) {
40-
// list.mapParallel(LIST_SIZE, sumClosure)
41-
// }
42-
// }
41+
@Test
42+
fun coroutineOnThreadPoolChunked4() {
43+
runBlocking(Dispatchers.Default) {
44+
list.mapParallelChunked(4) { Thread.sleep(1); it / 2 }
45+
}
46+
}
47+
48+
@Test
49+
fun coroutineOnThreadPoolChunked8() {
50+
runBlocking(Dispatchers.Default) {
51+
list.mapParallelChunked(8) { Thread.sleep(1); it / 2 }
52+
}
53+
}
54+
4355

4456
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.lukaskusik.coroutines.transformations.benchmark
2+
3+
import com.carrotsearch.junitbenchmarks.AbstractBenchmark
4+
import com.lukaskusik.coroutines.transformations.reduceParallel
5+
import com.lukaskusik.coroutines.transformations.test.ParallelMapTest
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.runBlocking
8+
import org.junit.Test
9+
10+
class ParallelReduceBenchmark : AbstractBenchmark() {
11+
12+
companion object {
13+
const val LIST_SIZE = 1000
14+
}
15+
16+
private val list = ParallelMapTest.getRandomListOfSize(LIST_SIZE)
17+
18+
private val operation = { acc: Int, i: Int -> Thread.sleep(1); acc + i }
19+
20+
@Test
21+
fun sequential() {
22+
list.reduce(operation)
23+
}
24+
25+
@Test
26+
fun coroutineOnMain() {
27+
runBlocking {
28+
list.reduceParallel(1, operation)
29+
}
30+
}
31+
32+
@Test
33+
fun coroutineOnThreadPool() {
34+
runBlocking(Dispatchers.Default) {
35+
list.reduceParallel(1, operation)
36+
}
37+
}
38+
39+
@Test
40+
fun coroutineOnThreadPoolChunked4() {
41+
runBlocking(Dispatchers.Default) {
42+
list.reduceParallel(4, operation)
43+
}
44+
}
45+
46+
@Test
47+
fun coroutineOnThreadPoolChunked8() {
48+
runBlocking(Dispatchers.Default) {
49+
list.reduceParallel(8, operation)
50+
}
51+
}
52+
53+
54+
}

transformations/src/test/kotlin/ParallelMapTest.kt renamed to transformations/src/test/kotlin/com/lukaskusik/coroutines/transformations/test/ParallelMapTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.lukaskusik.coroutines.transformations.test
22

33
import com.lukaskusik.coroutines.transformations.mapParallel
4+
import com.lukaskusik.coroutines.transformations.mapParallelChunked
45
import kotlinx.coroutines.runBlocking
56
import org.junit.Assert
67
import org.junit.Test
@@ -33,5 +34,17 @@ class ParallelMapTest {
3334
Assert.assertEquals(listSequential, listParallel)
3435
}
3536

37+
@Test
38+
fun parallelMap4Chunks() {
39+
var listSequential = listOf(1, 3, 3, 4, 5)
40+
var listParallel = listSequential.toList()
3641

42+
listSequential = listSequential.map { it * 2 }
43+
runBlocking {
44+
listParallel = listParallel.mapParallelChunked(4) { it * 2 }
45+
}
46+
47+
48+
Assert.assertEquals(listSequential, listParallel)
49+
}
3750
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.lukaskusik.coroutines.transformations.test
2+
3+
import com.lukaskusik.coroutines.transformations.reduceParallel
4+
import kotlinx.coroutines.runBlocking
5+
import org.junit.Assert
6+
import org.junit.Test
7+
import kotlin.random.Random
8+
9+
class ParallelReduceTest {
10+
companion object {
11+
fun getList() = listOf(1, 2, 3, 4, 5)
12+
13+
fun getRandomListOfSize(listSize: Int): List<Int> {
14+
val random = Random(648)
15+
val list = ArrayList<Int>(listSize)
16+
repeat(listSize) {
17+
list.add(random.nextInt())
18+
}
19+
return list
20+
}
21+
}
22+
23+
private fun theTest(chunks: Int) {
24+
val listSequential = getList()
25+
val listParallel = listSequential.toList()
26+
val operation = { acc: Int, i: Int -> acc + i }
27+
28+
val sequentialResult = listSequential.reduce(operation)
29+
var parallelResult: Int? = null
30+
runBlocking {
31+
parallelResult =
32+
listParallel.reduceParallel(chunks, operation)
33+
}
34+
35+
Assert.assertEquals(sequentialResult, parallelResult)
36+
}
37+
38+
@Test
39+
fun parallelReduceNoChunks() {
40+
theTest(1)
41+
}
42+
43+
@Test
44+
fun parallelReduce4Chunks() {
45+
theTest(4)
46+
}
47+
48+
49+
@Test(expected = IllegalArgumentException::class)
50+
fun parallelReduce0ChunksError() {
51+
theTest(0)
52+
}
53+
54+
@Test(expected = IllegalArgumentException::class)
55+
fun parallelReduceNegativeChunksError() {
56+
theTest(-10)
57+
}
58+
}

0 commit comments

Comments
 (0)