@@ -51,36 +51,14 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
51
51
override suspend fun suspendExecute (plan : ExecutionPlan , variables : VariablesJson , context : Context ): String {
52
52
val root = jsonNodeFactory.objectNode()
53
53
val data = root.putObject(" data" )
54
- val channel = Channel <Pair <Execution , JsonNode >>()
55
- val jobs = plan
56
- .map { execution ->
57
- launch(dispatcher) {
58
- try {
59
- val writeOperation = writeOperation(
60
- ctx = ExecutionContext (Variables (schema, variables, execution.variables), context),
61
- node = execution,
62
- operation = execution.field as Field .Function <* , * >
63
- )
64
- channel.send(execution to writeOperation)
65
- } catch (e: Exception ) {
66
- channel.close(e)
67
- }
68
- }
69
- }
70
- .toList()
71
54
72
- // intermediate data structure necessary to preserve ordering
73
- val resultMap = mutableMapOf<Execution , JsonNode >()
74
- repeat(plan.size) {
75
- try {
76
- val (execution, jsonNode) = channel.receive()
77
- resultMap.put(execution, jsonNode)
78
- } catch (e: Exception ) {
79
- jobs.forEach { it.cancel() }
80
- throw e
81
- }
55
+ val resultMap = listToMapAsync(plan) {
56
+ writeOperation(
57
+ ctx = ExecutionContext (Variables (schema, variables, it.variables), context),
58
+ node = it,
59
+ operation = it.field as Field .Function <* , * >
60
+ )
82
61
}
83
- channel.close()
84
62
85
63
for (operation in plan) {
86
64
data.set(operation.aliasOrKey, resultMap[operation])
@@ -93,6 +71,34 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
93
71
suspendExecute(plan, variables, context)
94
72
}
95
73
74
+ private suspend fun <T , R > listToMapAsync (data : Collection <T >, block : suspend (T ) -> R ): Map <T , R > {
75
+ val channel = Channel <Pair <T , R >>()
76
+ val jobs = data.map { item ->
77
+ launch(dispatcher) {
78
+ try {
79
+ val res = block(item)
80
+ channel.send(item to res)
81
+ } catch (e: Exception ) {
82
+ channel.close(e)
83
+ }
84
+ }
85
+ }
86
+
87
+ val resultMap = mutableMapOf<T , R >()
88
+ repeat(data.size) {
89
+ try {
90
+ val (item, result) = channel.receive()
91
+ resultMap[item] = result
92
+ } catch (e: Exception ) {
93
+ jobs.forEach(Job ::cancel)
94
+ throw e
95
+ }
96
+ }
97
+ channel.close()
98
+
99
+ return resultMap
100
+ }
101
+
96
102
private suspend fun <T > writeOperation (ctx : ExecutionContext , node : Execution .Node , operation : FunctionWrapper <T >): JsonNode {
97
103
node.field.checkAccess(null , ctx.requestContext)
98
104
val operationResult: T ? = operation.invoke(
@@ -134,9 +140,12 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
134
140
// check value, not returnType, because this method can be invoked with element value
135
141
value is Collection <* > -> {
136
142
if (returnType.isList()) {
137
- val arrayNode = jsonNodeFactory.arrayNode(value.size)
138
- value.forEach { element -> arrayNode.add(createNode(ctx, element, node, returnType.unwrapList())) }
139
- arrayNode
143
+ val valuesMap = listToMapAsync(value) {
144
+ createNode(ctx, it, node, returnType.unwrapList())
145
+ }
146
+ value.fold(jsonNodeFactory.arrayNode(value.size)) { array, v ->
147
+ array.add(valuesMap[v])
148
+ }
140
149
} else {
141
150
throw ExecutionException (" Invalid collection value for non collection property" )
142
151
}
0 commit comments