Skip to content

Commit 84e362d

Browse files
committed
[FLINK-37820][table-planner] Support AsyncScalarFunction registration via CompiledPlan
1 parent 8348f7d commit 84e362d

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ private static SqlOperator deserializeFunctionClass(
442442

443443
switch (functionInstance.getKind()) {
444444
case SCALAR:
445+
case ASYNC_SCALAR:
445446
case TABLE:
446447
return BridgingSqlFunction.of(
447448
serdeContext.getFlinkContext(),

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.catalog.DataTypeFactory;
2929
import org.apache.flink.table.catalog.ObjectIdentifier;
3030
import org.apache.flink.table.catalog.UnresolvedIdentifier;
31+
import org.apache.flink.table.functions.AsyncScalarFunction;
3132
import org.apache.flink.table.functions.FunctionDefinition;
3233
import org.apache.flink.table.functions.FunctionIdentifier;
3334
import org.apache.flink.table.functions.FunctionKind;
@@ -80,6 +81,7 @@
8081
import java.util.Collections;
8182
import java.util.Optional;
8283
import java.util.Set;
84+
import java.util.concurrent.CompletableFuture;
8385
import java.util.stream.Stream;
8486

8587
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -104,15 +106,25 @@ public class RexNodeJsonSerdeTest {
104106
new FlinkTypeFactory(
105107
RexNodeJsonSerdeTest.class.getClassLoader(), FlinkTypeSystem.INSTANCE);
106108
private static final String FUNCTION_NAME = "MyFunc";
109+
private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
107110
private static final FunctionIdentifier FUNCTION_SYS_ID = FunctionIdentifier.of(FUNCTION_NAME);
108111
private static final FunctionIdentifier FUNCTION_CAT_ID =
109112
FunctionIdentifier.of(
110113
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, FUNCTION_NAME));
114+
private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
115+
FunctionIdentifier.of(
116+
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, ASYNC_FUNCTION_NAME));
111117
private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
112118
UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
119+
private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID =
120+
UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
113121
private static final SerializableScalarFunction SER_UDF_IMPL = new SerializableScalarFunction();
122+
private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
123+
new SerializableAsyncScalarFunction();
114124
private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
115125
SerializableScalarFunction.class;
126+
private static final Class<SerializableAsyncScalarFunction> SER_ASYNC_UDF_CLASS =
127+
SerializableAsyncScalarFunction.class;
116128
private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
117129
new OtherSerializableScalarFunction();
118130
private static final Class<OtherSerializableScalarFunction> SER_UDF_CLASS_OTHER =
@@ -140,6 +152,12 @@ public void testInlineFunction() throws IOException {
140152
createFunctionCall(serdeContext, ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
141153
RexNode.class);
142154

155+
// Serializable async function
156+
testJsonRoundTrip(
157+
createFunctionCall(
158+
serdeContext, ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
159+
RexNode.class);
160+
143161
// Non-serializable function due to fields
144162
assertThatThrownBy(
145163
() ->
@@ -732,6 +750,11 @@ private static SerdeContext serdeContextWithPermanentFunction(
732750
.getFlinkContext()
733751
.getFunctionCatalog()
734752
.registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS, false);
753+
serdeContext
754+
.getFlinkContext()
755+
.getFunctionCatalog()
756+
.registerCatalogFunction(
757+
UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, false);
735758
return serdeContext;
736759
}
737760

@@ -834,6 +857,15 @@ public boolean equals(Object obj) {
834857
}
835858
}
836859

860+
/** Serializable async function. */
861+
public static class SerializableAsyncScalarFunction extends AsyncScalarFunction {
862+
863+
@SuppressWarnings("unused")
864+
public void eval(CompletableFuture<String> res, Integer i) {
865+
throw new UnsupportedOperationException();
866+
}
867+
}
868+
837869
/** Non-serializable function. */
838870
public static class NonSerializableScalarFunction extends ScalarFunction {
839871
@SuppressWarnings({"FieldCanBeLocal", "unused"})

0 commit comments

Comments
 (0)