@@ -18,11 +18,8 @@ import com.github.pgutkowski.kgraphql.schema.scalar.serializeScalar
18
18
import com.github.pgutkowski.kgraphql.schema.structure2.Field
19
19
import com.github.pgutkowski.kgraphql.schema.structure2.InputValue
20
20
import com.github.pgutkowski.kgraphql.schema.structure2.Type
21
- import kotlinx.coroutines.CoroutineScope
22
- import kotlinx.coroutines.Job
21
+ import kotlinx.coroutines.*
23
22
import kotlinx.coroutines.channels.Channel
24
- import kotlinx.coroutines.launch
25
- import kotlinx.coroutines.runBlocking
26
23
import kotlin.coroutines.CoroutineContext
27
24
import kotlin.reflect.KProperty1
28
25
@@ -51,36 +48,14 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
51
48
override suspend fun suspendExecute (plan : ExecutionPlan , variables : VariablesJson , context : Context ): String {
52
49
val root = jsonNodeFactory.objectNode()
53
50
val data = root.putObject(" data" )
54
- val channel = Channel <Pair <Execution , JsonNode >>()
55
- val jobs = plan
56
- .map { execution ->
57
- launch(dispatcher) {
58
- try {
59
- val writeOperation = writeOperation(
60
- ctx = ExecutionContext (Variables (schema, variables, execution.variables), context),
61
- node = execution,
62
- operation = execution.field as Field .Function <* , * >
63
- )
64
- channel.send(execution to writeOperation)
65
- } catch (e: Exception ) {
66
- channel.close(e)
67
- }
68
- }
69
- }
70
- .toList()
71
51
72
- // intermediate data structure necessary to preserve ordering
73
- val resultMap = mutableMapOf<Execution , JsonNode >()
74
- repeat(plan.size) {
75
- try {
76
- val (execution, jsonNode) = channel.receive()
77
- resultMap.put(execution, jsonNode)
78
- } catch (e: Exception ) {
79
- jobs.forEach { it.cancel() }
80
- throw e
81
- }
52
+ val resultMap = plan.toMapAsync() {
53
+ writeOperation(
54
+ ctx = ExecutionContext (Variables (schema, variables, it.variables), context),
55
+ node = it,
56
+ operation = it.field as Field .Function <* , * >
57
+ )
82
58
}
83
- channel.close()
84
59
85
60
for (operation in plan) {
86
61
data.set(operation.aliasOrKey, resultMap[operation])
@@ -93,6 +68,33 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
93
68
suspendExecute(plan, variables, context)
94
69
}
95
70
71
+ private suspend fun <T , R > Collection<T>.toMapAsync (block : suspend (T ) -> R ): Map <T , R > = coroutineScope {
72
+ val channel = Channel <Pair <T , R >>()
73
+ val jobs = map { item ->
74
+ launch(dispatcher) {
75
+ try {
76
+ val res = block(item)
77
+ channel.send(item to res)
78
+ } catch (e: Exception ) {
79
+ channel.close(e)
80
+ }
81
+ }
82
+ }
83
+ val resultMap = mutableMapOf<T , R >()
84
+ repeat(size) {
85
+ try {
86
+ val (item, result) = channel.receive()
87
+ resultMap[item] = result
88
+ } catch (e: Exception ) {
89
+ jobs.forEach(Job ::cancel)
90
+ throw e
91
+ }
92
+ }
93
+
94
+ channel.close()
95
+ resultMap
96
+ }
97
+
96
98
private suspend fun <T > writeOperation (ctx : ExecutionContext , node : Execution .Node , operation : FunctionWrapper <T >): JsonNode {
97
99
node.field.checkAccess(null , ctx.requestContext)
98
100
val operationResult: T ? = operation.invoke(
@@ -134,9 +136,12 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
134
136
// check value, not returnType, because this method can be invoked with element value
135
137
value is Collection <* > -> {
136
138
if (returnType.isList()) {
137
- val arrayNode = jsonNodeFactory.arrayNode(value.size)
138
- value.forEach { element -> arrayNode.add(createNode(ctx, element, node, returnType.unwrapList())) }
139
- arrayNode
139
+ val valuesMap = value.toMapAsync {
140
+ createNode(ctx, it, node, returnType.unwrapList())
141
+ }
142
+ value.fold(jsonNodeFactory.arrayNode(value.size)) { array, v ->
143
+ array.add(valuesMap[v])
144
+ }
140
145
} else {
141
146
throw ExecutionException (" Invalid collection value for non collection property" )
142
147
}
0 commit comments