diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java index c5111219283ed..9966f50155175 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java @@ -442,6 +442,7 @@ private static SqlOperator deserializeFunctionClass( switch (functionInstance.getKind()) { case SCALAR: + case ASYNC_SCALAR: case TABLE: return BridgingSqlFunction.of( serdeContext.getFlinkContext(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java index 489f10a6123c4..c8b618823d73e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.AsyncScalarFunction; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.FunctionKind; @@ -80,6 +81,7 @@ import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -104,15 +106,25 @@ public class RexNodeJsonSerdeTest { new FlinkTypeFactory( RexNodeJsonSerdeTest.class.getClassLoader(), FlinkTypeSystem.INSTANCE); private static final String FUNCTION_NAME = "MyFunc"; + private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc"; private static final FunctionIdentifier FUNCTION_SYS_ID = FunctionIdentifier.of(FUNCTION_NAME); private static final FunctionIdentifier FUNCTION_CAT_ID = FunctionIdentifier.of( ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, FUNCTION_NAME)); + private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID = + FunctionIdentifier.of( + ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, ASYNC_FUNCTION_NAME)); private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID = UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList()); + private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID = + UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList()); private static final SerializableScalarFunction SER_UDF_IMPL = new SerializableScalarFunction(); + private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL = + new SerializableAsyncScalarFunction(); private static final Class SER_UDF_CLASS = SerializableScalarFunction.class; + private static final Class SER_ASYNC_UDF_CLASS = + SerializableAsyncScalarFunction.class; private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER = new OtherSerializableScalarFunction(); private static final Class SER_UDF_CLASS_OTHER = @@ -140,6 +152,12 @@ public void testInlineFunction() throws IOException { createFunctionCall(serdeContext, ContextResolvedFunction.anonymous(SER_UDF_IMPL)), RexNode.class); + // Serializable async function + testJsonRoundTrip( + createFunctionCall( + serdeContext, ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)), + RexNode.class); + // Non-serializable function due to fields assertThatThrownBy( () -> @@ -732,6 +750,11 @@ private static SerdeContext serdeContextWithPermanentFunction( .getFlinkContext() .getFunctionCatalog() .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS, false); + serdeContext + .getFlinkContext() + .getFunctionCatalog() + .registerCatalogFunction( + UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, false); return serdeContext; } @@ -834,6 +857,15 @@ public boolean equals(Object obj) { } } + /** Serializable async function. */ + public static class SerializableAsyncScalarFunction extends AsyncScalarFunction { + + @SuppressWarnings("unused") + public void eval(CompletableFuture res, Integer i) { + throw new UnsupportedOperationException(); + } + } + /** Non-serializable function. */ public static class NonSerializableScalarFunction extends ScalarFunction { @SuppressWarnings({"FieldCanBeLocal", "unused"})