Skip to content

[FLINK-37820][table-planner] Support AsyncScalarFunction registration via CompiledPlan #26586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ private static SqlOperator deserializeFunctionClass(

switch (functionInstance.getKind()) {
case SCALAR:
case ASYNC_SCALAR:
case TABLE:
return BridgingSqlFunction.of(
serdeContext.getFlinkContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
Expand All @@ -105,15 +107,25 @@ public class RexNodeJsonSerdeTest {
new FlinkTypeFactory(
RexNodeJsonSerdeTest.class.getClassLoader(), FlinkTypeSystem.INSTANCE);
private static final String FUNCTION_NAME = "MyFunc";
private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
private static final FunctionIdentifier FUNCTION_SYS_ID = FunctionIdentifier.of(FUNCTION_NAME);
private static final FunctionIdentifier FUNCTION_CAT_ID =
FunctionIdentifier.of(
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, FUNCTION_NAME));
private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
FunctionIdentifier.of(
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, ASYNC_FUNCTION_NAME));
private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID =
UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
private static final SerializableScalarFunction SER_UDF_IMPL = new SerializableScalarFunction();
private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
new SerializableAsyncScalarFunction();
private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
SerializableScalarFunction.class;
private static final Class<SerializableAsyncScalarFunction> SER_ASYNC_UDF_CLASS =
SerializableAsyncScalarFunction.class;
private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
new OtherSerializableScalarFunction();
private static final Class<OtherSerializableScalarFunction> SER_UDF_CLASS_OTHER =
Expand Down Expand Up @@ -141,6 +153,12 @@ public void testInlineFunction() throws IOException {
createFunctionCall(serdeContext, ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
RexNode.class);

// Serializable async function
testJsonRoundTrip(
createFunctionCall(
serdeContext, ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
RexNode.class);

// Non-serializable function due to fields
assertThatThrownBy(
() ->
Expand Down Expand Up @@ -743,6 +761,11 @@ private static SerdeContext serdeContextWithPermanentFunction(
.getFlinkContext()
.getFunctionCatalog()
.registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS, false);
serdeContext
.getFlinkContext()
.getFunctionCatalog()
.registerCatalogFunction(
UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, false);
return serdeContext;
}

Expand Down Expand Up @@ -845,6 +868,15 @@ public boolean equals(Object obj) {
}
}

/** Serializable async function. */
public static class SerializableAsyncScalarFunction extends AsyncScalarFunction {

@SuppressWarnings("unused")
public void eval(CompletableFuture<String> res, Integer i) {
throw new UnsupportedOperationException();
}
}

/** Non-serializable function. */
public static class NonSerializableScalarFunction extends ScalarFunction {
@SuppressWarnings({"FieldCanBeLocal", "unused"})
Expand Down