Skip to content

Commit 3b697a1

Browse files
committed
\pgutkowski#38: Added support for suspendable property resolvers
1 parent e3a02a2 commit 3b697a1

File tree

4 files changed

+146
-37
lines changed

4 files changed

+146
-37
lines changed

src/main/kotlin/com/github/pgutkowski/kgraphql/schema/dsl/PropertyDSL.kt

+18
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,24 @@ class PropertyDSL<T : Any, R>(val name : String, block : PropertyDSL<T, R>.() ->
4040
fun <E, W, Q, A, S>resolver(function: (T, E, W, Q, A, S) -> R)
4141
= resolver(FunctionWrapper.on(function, true))
4242

43+
fun suspendResolver(function: suspend (T) -> R)
44+
= resolver(FunctionWrapper.onSuspend(function, true))
45+
46+
fun <E>suspendResolver(function: suspend (T, E) -> R)
47+
= resolver(FunctionWrapper.onSuspend(function, true))
48+
49+
fun <E, W>suspendResolver(function: suspend (T, E, W) -> R)
50+
= resolver(FunctionWrapper.onSuspend(function, true))
51+
52+
fun <E, W, Q>suspendResolver(function: suspend (T, E, W, Q) -> R)
53+
= resolver(FunctionWrapper.onSuspend(function, true))
54+
55+
fun <E, W, Q, A>suspendResolver(function: suspend (T, E, W, Q, A) -> R)
56+
= resolver(FunctionWrapper.onSuspend(function, true))
57+
58+
fun <E, W, Q, A, S>suspendResolver(function: suspend (T, E, W, Q, A, S) -> R)
59+
= resolver(FunctionWrapper.onSuspend(function, true))
60+
4361
fun accessRule(rule: (T, Context) -> Exception?){
4462

4563
val accessRuleAdapter: (T?, Context) -> Exception? = { parent, ctx ->

src/main/kotlin/com/github/pgutkowski/kgraphql/schema/execution/ParallelRequestExecutor.kt

+40-31
Original file line numberDiff line numberDiff line change
@@ -51,36 +51,14 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
5151
override suspend fun suspendExecute(plan: ExecutionPlan, variables: VariablesJson, context: Context): String {
5252
val root = jsonNodeFactory.objectNode()
5353
val data = root.putObject("data")
54-
val channel = Channel<Pair<Execution, JsonNode>>()
55-
val jobs = plan
56-
.map { execution ->
57-
launch(dispatcher) {
58-
try {
59-
val writeOperation = writeOperation(
60-
ctx = ExecutionContext(Variables(schema, variables, execution.variables), context),
61-
node = execution,
62-
operation = execution.field as Field.Function<*, *>
63-
)
64-
channel.send(execution to writeOperation)
65-
} catch (e: Exception) {
66-
channel.close(e)
67-
}
68-
}
69-
}
70-
.toList()
7154

72-
//intermediate data structure necessary to preserve ordering
73-
val resultMap = mutableMapOf<Execution, JsonNode>()
74-
repeat(plan.size) {
75-
try {
76-
val (execution, jsonNode) = channel.receive()
77-
resultMap.put(execution, jsonNode)
78-
} catch (e: Exception) {
79-
jobs.forEach { it.cancel() }
80-
throw e
81-
}
55+
val resultMap = listToMapAsync(plan) {
56+
writeOperation(
57+
ctx = ExecutionContext(Variables(schema, variables, it.variables), context),
58+
node = it,
59+
operation = it.field as Field.Function<*, *>
60+
)
8261
}
83-
channel.close()
8462

8563
for (operation in plan) {
8664
data.set(operation.aliasOrKey, resultMap[operation])
@@ -93,6 +71,34 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
9371
suspendExecute(plan, variables, context)
9472
}
9573

74+
private suspend fun <T, R> listToMapAsync(data: Collection<T>, block: suspend (T) -> R): Map<T, R> {
75+
val channel = Channel<Pair<T, R>>()
76+
val jobs = data.map { item ->
77+
launch(dispatcher) {
78+
try {
79+
val res = block(item)
80+
channel.send(item to res)
81+
} catch (e: Exception) {
82+
channel.close(e)
83+
}
84+
}
85+
}
86+
87+
val resultMap = mutableMapOf<T, R>()
88+
repeat(data.size) {
89+
try {
90+
val (item, result) = channel.receive()
91+
resultMap[item] = result
92+
} catch (e: Exception) {
93+
jobs.forEach(Job::cancel)
94+
throw e
95+
}
96+
}
97+
channel.close()
98+
99+
return resultMap
100+
}
101+
96102
private suspend fun <T> writeOperation(ctx: ExecutionContext, node: Execution.Node, operation: FunctionWrapper<T>): JsonNode {
97103
node.field.checkAccess(null, ctx.requestContext)
98104
val operationResult: T? = operation.invoke(
@@ -134,9 +140,12 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor, Coro
134140
//check value, not returnType, because this method can be invoked with element value
135141
value is Collection<*> -> {
136142
if (returnType.isList()) {
137-
val arrayNode = jsonNodeFactory.arrayNode(value.size)
138-
value.forEach { element -> arrayNode.add(createNode(ctx, element, node, returnType.unwrapList())) }
139-
arrayNode
143+
val valuesMap = listToMapAsync(value) {
144+
createNode(ctx, it, node, returnType.unwrapList())
145+
}
146+
value.fold(jsonNodeFactory.arrayNode(value.size)) { array, v ->
147+
array.add(valuesMap[v])
148+
}
140149
} else {
141150
throw ExecutionException("Invalid collection value for non collection property")
142151
}

src/test/kotlin/com/github/pgutkowski/kgraphql/integration/ParallelExecutionTest.kt

+38-6
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,71 @@
11
package com.github.pgutkowski.kgraphql.integration
22

33
import com.github.pgutkowski.kgraphql.KGraphQL
4-
import com.github.pgutkowski.kgraphql.assertNoErrors
54
import com.github.pgutkowski.kgraphql.extract
65
import com.github.pgutkowski.kgraphql.deserialize
76
import kotlinx.coroutines.delay
87
import org.hamcrest.CoreMatchers
98
import org.hamcrest.MatcherAssert
109
import org.junit.Test
10+
import kotlin.random.Random
1111

1212
class ParallelExecutionTest {
1313

14+
data class AType(val id: Int)
15+
1416
val syncResolversSchema = KGraphQL.schema {
1517
repeat(1000) {
16-
query("automated-${it}") {
18+
query("automated-$it") {
1719
resolver { ->
1820
Thread.sleep(3)
19-
"${it}"
21+
"$it"
2022
}
2123
}
2224
}
2325
}
2426

2527
val suspendResolverSchema = KGraphQL.schema {
2628
repeat(1000) {
27-
query("automated-${it}") {
29+
query("automated-$it") {
2830
suspendResolver { ->
2931
delay(3)
30-
"${it}"
32+
"$it"
33+
}
34+
}
35+
}
36+
}
37+
38+
val suspendPropertySchema = KGraphQL.schema {
39+
query("getAll") {
40+
resolver { -> (0..999).map { AType(it) } }
41+
}
42+
type<AType> {
43+
property<List<AType>>("children") {
44+
suspendResolver { parent ->
45+
(0..50).map {
46+
delay(Random.nextLong(1, 100))
47+
AType((parent.id * 10) + it)
48+
}
3149
}
3250
}
3351
}
3452
}
3553

36-
val query = "{ " + (0..999).map { "automated-${it}" }.joinToString(", ") + " }"
54+
@Test
55+
fun `Suspendable property resolvers`() {
56+
val query = "{getAll{id,children{id}}}"
57+
val map = deserialize(suspendPropertySchema.execute(query))
58+
59+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[0]/id"), CoreMatchers.equalTo(0))
60+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[500]/id"), CoreMatchers.equalTo(500))
61+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[766]/id"), CoreMatchers.equalTo(766))
62+
63+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[5]/children[5]/id"), CoreMatchers.equalTo(55))
64+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[75]/children[9]/id"), CoreMatchers.equalTo(759))
65+
MatcherAssert.assertThat(map.extract<Int>("data/getAll[888]/children[50]/id"), CoreMatchers.equalTo(8930))
66+
}
67+
68+
val query = "{ " + (0..999).map { "automated-$it" }.joinToString(", ") + " }"
3769

3870
@Test
3971
fun `1000 synchronous resolvers sleeping with Thread sleep`(){

src/test/kotlin/com/github/pgutkowski/kgraphql/specification/language/ArgumentsSpecificationTest.kt

+50
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,30 @@ class ArgumentsSpecificationTest {
3030
}.take(size)
3131
}
3232
}
33+
property<Int>("none") {
34+
suspendResolver { actor -> actor.age }
35+
}
36+
property<Int>("one") {
37+
suspendResolver {actor, one: Int -> actor.age + one }
38+
}
39+
property<Int>("two") {
40+
suspendResolver { actor, one: Int, two: Int -> actor.age + one + two }
41+
}
42+
property<Int>("three") {
43+
suspendResolver { actor, one: Int, two: Int, three: Int ->
44+
actor.age + one + two + three
45+
}
46+
}
47+
property<Int>("four") {
48+
suspendResolver { actor, one: Int, two: Int, three: Int, four: Int ->
49+
actor.age + one + two + three + four
50+
}
51+
}
52+
property<Int>("five") {
53+
suspendResolver { actor, one: Int, two: Int, three: Int, four: Int, five: Int ->
54+
actor.age + one + two + three + four + five
55+
}
56+
}
3357
}
3458
}
3559

@@ -50,5 +74,31 @@ class ArgumentsSpecificationTest {
5074
)
5175
}
5276

77+
@Test
78+
fun `all arguments to suspendResolvers`() {
79+
val request = """
80+
{
81+
actor {
82+
none
83+
one(one: 1)
84+
two(one: 2, two: 3)
85+
three(one: 4, two: 5, three: 6)
86+
four(one: 7, two: 8, three: 9, four: 10)
87+
five(one: 11, two: 12, three: 13, four: 14, five: 15)
88+
}
89+
}
90+
""".trimIndent()
91+
val response = deserialize(schema.execute(request)) as Map<String, Any>
92+
assertThat(response, equalTo(mapOf<String, Any>(
93+
"data" to mapOf("actor" to mapOf(
94+
"none" to age,
95+
"one" to age + 1,
96+
"two" to age + 2 + 3,
97+
"three" to age + 4 + 5 + 6,
98+
"four" to age + 7 + 8 + 9 + 10,
99+
"five" to age + 11 + 12 + 13 + 14 + 15
100+
))
101+
)))
102+
}
53103

54104
}

0 commit comments

Comments
 (0)