Skip to content

Async Function Support in Compiled plan #1234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
mbroecheler opened this issue May 18, 2025 · 1 comment
Open

Async Function Support in Compiled plan #1234

mbroecheler opened this issue May 18, 2025 · 1 comment
Milestone

Comments

@mbroecheler
Copy link
Contributor

It looks like compiled plan does not support async functions yet.
Once that is fixed, we need to remove the "compilePlan": false configuration in the package.json of the udf and openai use case tests.

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot load Plan:
{
  "flinkVersion" : "1.19",
  "nodes" : [ {
    "id" : 1,
    "type" : "stream-exec-values_1",
    "tuples" : [ [ {
      "kind" : "LITERAL",
      "value" : "1",
      "type" : "CHAR(1) NOT NULL"
    }, {
      "kind" : "LITERAL",
      "value" : 1,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : "2",
      "type" : "CHAR(1) NOT NULL"
    }, {
      "kind" : "LITERAL",
      "value" : 2,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : "3",
      "type" : "CHAR(1) NOT NULL"
    }, {
      "kind" : "LITERAL",
      "value" : 3,
      "type" : "INT NOT NULL"
    } ] ],
    "outputType" : "ROW<`val` CHAR(1) NOT NULL, `ival` INT NOT NULL>",
    "description" : "Values(tuples=[[{ '1', 1 }, { '2', 2 }, { '3', 3 }]])",
    "inputProperties" : [ ]
  }, {
    "id" : 2,
    "type" : "stream-exec-async-calc_1",
    "configuration" : {
      "table.exec.async-scalar.buffer-capacity" : "10",
      "table.exec.async-scalar.max-attempts" : "3",
      "table.exec.async-scalar.retry-delay" : "100 ms",
      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
      "table.exec.async-scalar.timeout" : "3 min"
    },
    "projection" : [ {
      "kind" : "INPUT_REF",
      "inputIndex" : 0,
      "type" : "CHAR(1) NOT NULL"
    }, {
      "kind" : "CALL",
      "catalogName" : "`default_catalog`.`default_database`.`MyAsyncScalarFunction`",
      "class" : "com.myudf.MyAsyncScalarFunction",
      "operands" : [ {
        "kind" : "INPUT_REF",
        "inputIndex" : 0,
        "type" : "CHAR(1) NOT NULL"
      }, {
        "kind" : "INPUT_REF",
        "inputIndex" : 1,
        "type" : "INT NOT NULL"
      } ],
      "type" : "VARCHAR(2147483647)"
    } ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` CHAR(1) NOT NULL, `myFnc` VARCHAR(2147483647)>",
    "description" : "AsyncCalc(select=[val, MyAsyncScalarFunction(val, ival) AS myFnc])"
  }, {
    "id" : 3,
    "type" : "stream-exec-sink_1",
    "configuration" : {
      "table.exec.sink.keyed-shuffle" : "AUTO",
      "table.exec.sink.not-null-enforcer" : "ERROR",
      "table.exec.sink.rowtime-inserter" : "ENABLED",
      "table.exec.sink.type-length-enforcer" : "IGNORE",
      "table.exec.sink.upsert-materialize" : "AUTO"
    },
    "dynamicTableSink" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`MyAsyncTable_1`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "val",
              "dataType" : "CHAR(1) NOT NULL"
            }, {
              "name" : "myFnc",
              "dataType" : "VARCHAR(2147483647)"
            } ],
            "watermarkSpecs" : [ ],
            "primaryKey" : {
              "name" : "PK_val",
              "type" : "PRIMARY_KEY",
              "columns" : [ "val" ]
            }
          },
          "partitionKeys" : [ ],
          "options" : {
            "connector" : "jdbc-sqrl",
            "driver" : "org.postgresql.Driver",
            "password" : "postgres",
            "table-name" : "MyAsyncTable_1",
            "url" : "jdbc:postgresql://localhost:5432/datasqrl",
            "username" : "postgres"
          }
        }
      }
    },
    "inputChangelogMode" : [ "INSERT" ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` CHAR(1) NOT NULL, `myFnc` VARCHAR(2147483647)>",
    "description" : "Sink(table=[default_catalog.default_database.MyAsyncTable_1], fields=[val, myFnc])"
  }, {
    "id" : 4,
    "type" : "stream-exec-values_1",
    "tuples" : [ [ {
      "kind" : "LITERAL",
      "value" : 1,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 2,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 3,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 4,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 5,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 6,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 7,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 8,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 9,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 10,
      "type" : "INT NOT NULL"
    } ] ],
    "outputType" : "ROW<`val` INT NOT NULL>",
    "description" : "Values(tuples=[[{ 1 }, { 2 }, { 3 }, { 4 }, { 5 }, { 6 }, { 7 }, { 8 }, { 9 }, { 10 }]])",
    "inputProperties" : [ ]
  }, {
    "id" : 5,
    "type" : "stream-exec-calc_1",
    "projection" : [ {
      "kind" : "INPUT_REF",
      "inputIndex" : 0,
      "type" : "INT NOT NULL"
    }, {
      "kind" : "CALL",
      "catalogName" : "`default_catalog`.`default_database`.`MyScalarFunction`",
      "class" : "com.myudf.MyScalarFunction",
      "operands" : [ {
        "kind" : "CALL",
        "syntax" : "SPECIAL",
        "internalName" : "$CAST$1",
        "operands" : [ {
          "kind" : "INPUT_REF",
          "inputIndex" : 0,
          "type" : "INT NOT NULL"
        } ],
        "type" : "BIGINT"
      }, {
        "kind" : "CALL",
        "syntax" : "SPECIAL",
        "internalName" : "$CAST$1",
        "operands" : [ {
          "kind" : "INPUT_REF",
          "inputIndex" : 0,
          "type" : "INT NOT NULL"
        } ],
        "type" : "BIGINT"
      } ],
      "type" : "BIGINT"
    } ],
    "condition" : null,
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` INT NOT NULL, `myFnc` BIGINT>",
    "description" : "Calc(select=[val, MyScalarFunction(CAST(val AS BIGINT), CAST(val AS BIGINT)) AS myFnc])"
  }, {
    "id" : 6,
    "type" : "stream-exec-sink_1",
    "configuration" : {
      "table.exec.sink.keyed-shuffle" : "AUTO",
      "table.exec.sink.not-null-enforcer" : "ERROR",
      "table.exec.sink.rowtime-inserter" : "ENABLED",
      "table.exec.sink.type-length-enforcer" : "IGNORE",
      "table.exec.sink.upsert-materialize" : "AUTO"
    },
    "dynamicTableSink" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`MyTable_2`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "val",
              "dataType" : "INT NOT NULL"
            }, {
              "name" : "myFnc",
              "dataType" : "BIGINT"
            } ],
            "watermarkSpecs" : [ ],
            "primaryKey" : {
              "name" : "PK_val",
              "type" : "PRIMARY_KEY",
              "columns" : [ "val" ]
            }
          },
          "partitionKeys" : [ ],
          "options" : {
            "connector" : "jdbc-sqrl",
            "driver" : "org.postgresql.Driver",
            "password" : "postgres",
            "table-name" : "MyTable_2",
            "url" : "jdbc:postgresql://localhost:5432/datasqrl",
            "username" : "postgres"
          }
        }
      }
    },
    "inputChangelogMode" : [ "INSERT" ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` INT NOT NULL, `myFnc` BIGINT>",
    "description" : "Sink(table=[default_catalog.default_database.MyTable_2], fields=[val, myFnc])"
  }, {
    "id" : 7,
    "type" : "stream-exec-values_1",
    "tuples" : [ [ {
      "kind" : "LITERAL",
      "value" : 1,
      "type" : "INT NOT NULL"
    } ], [ {
      "kind" : "LITERAL",
      "value" : 2,
      "type" : "INT NOT NULL"
    } ] ],
    "outputType" : "ROW<`val` INT NOT NULL>",
    "description" : "Values(tuples=[[{ 1 }, { 2 }]])",
    "inputProperties" : [ ]
  }, {
    "id" : 8,
    "type" : "stream-exec-calc_1",
    "projection" : [ {
      "kind" : "INPUT_REF",
      "inputIndex" : 0,
      "type" : "INT NOT NULL"
    }, {
      "kind" : "CALL",
      "catalogName" : "`default_catalog`.`default_database`.`AnotherFunction`",
      "class" : "com.myudf.MyScalarFunction",
      "operands" : [ {
        "kind" : "CALL",
        "syntax" : "SPECIAL",
        "internalName" : "$CAST$1",
        "operands" : [ {
          "kind" : "INPUT_REF",
          "inputIndex" : 0,
          "type" : "INT NOT NULL"
        } ],
        "type" : "BIGINT"
      }, {
        "kind" : "CALL",
        "syntax" : "SPECIAL",
        "internalName" : "$CAST$1",
        "operands" : [ {
          "kind" : "INPUT_REF",
          "inputIndex" : 0,
          "type" : "INT NOT NULL"
        } ],
        "type" : "BIGINT"
      } ],
      "type" : "BIGINT"
    } ],
    "condition" : null,
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` INT NOT NULL, `myFnc` BIGINT>",
    "description" : "Calc(select=[val, AnotherFunction(CAST(val AS BIGINT), CAST(val AS BIGINT)) AS myFnc])"
  }, {
    "id" : 9,
    "type" : "stream-exec-sink_1",
    "configuration" : {
      "table.exec.sink.keyed-shuffle" : "AUTO",
      "table.exec.sink.not-null-enforcer" : "ERROR",
      "table.exec.sink.rowtime-inserter" : "ENABLED",
      "table.exec.sink.type-length-enforcer" : "IGNORE",
      "table.exec.sink.upsert-materialize" : "AUTO"
    },
    "dynamicTableSink" : {
      "table" : {
        "identifier" : "`default_catalog`.`default_database`.`MyTableAnother_3`",
        "resolvedTable" : {
          "schema" : {
            "columns" : [ {
              "name" : "val",
              "dataType" : "INT NOT NULL"
            }, {
              "name" : "myFnc",
              "dataType" : "BIGINT"
            } ],
            "watermarkSpecs" : [ ],
            "primaryKey" : {
              "name" : "PK_val",
              "type" : "PRIMARY_KEY",
              "columns" : [ "val" ]
            }
          },
          "partitionKeys" : [ ],
          "options" : {
            "connector" : "jdbc-sqrl",
            "driver" : "org.postgresql.Driver",
            "password" : "postgres",
            "table-name" : "MyTableAnother_3",
            "url" : "jdbc:postgresql://localhost:5432/datasqrl",
            "username" : "postgres"
          }
        }
      }
    },
    "inputChangelogMode" : [ "INSERT" ],
    "inputProperties" : [ {
      "requiredDistribution" : {
        "type" : "UNKNOWN"
      },
      "damBehavior" : "PIPELINED",
      "priority" : 0
    } ],
    "outputType" : "ROW<`val` INT NOT NULL, `myFnc` BIGINT>",
    "description" : "Sink(table=[default_catalog.default_database.MyTableAnother_3], fields=[val, myFnc])"
  } ],
  "edges" : [ {
    "source" : 1,
    "target" : 2,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  }, {
    "source" : 2,
    "target" : 3,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  }, {
    "source" : 4,
    "target" : 5,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  }, {
    "source" : 5,
    "target" : 6,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  }, {
    "source" : 7,
    "target" : 8,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  }, {
    "source" : 8,
    "target" : 9,
    "shuffle" : {
      "type" : "FORWARD"
    },
    "shuffleMode" : "PIPELINED"
  } ]
}.
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.loadPlan(TableEnvironmentImpl.java:760)
        at com.datasqrl.DatasqrlRun.runFlinkJob(DatasqrlRun.java:257)
        at com.datasqrl.DatasqrlRun.run(DatasqrlRun.java:114)
        at com.datasqrl.DatasqrlTest.run(DatasqrlTest.java:115)
        at com.datasqrl.DatasqrlTest.main(DatasqrlTest.java:71)
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: Unsupported anonymous function kind 'ASYNC_SCALAR' for class 'com.myudf.MyAsyncScalarFunction'. (through reference chain: org.apache.flink.table.planner.plan.nodes.exec.serde.JsonPlanGraph["nodes"]->java.util.ArrayList[1]->org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAsyncCalc["projection"]->java.util.ArrayList[1])
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:373)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:375)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:564)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:220)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:187)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:144)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:110)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:361)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:564)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:439)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:352)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.readValue(DeserializationContext.java:971)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.readValue(DeserializationContext.java:958)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.ExecNodeGraphJsonDeserializer.deserialize(ExecNodeGraphJsonDeserializer.java:50)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.ExecNodeGraphJsonDeserializer.deserialize(ExecNodeGraphJsonDeserializer.java:37)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2105)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1546)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1566)
        at org.apache.flink.table.planner.delegation.StreamPlanner.loadPlan(StreamPlanner.scala:179)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.loadPlan(TableEnvironmentImpl.java:758)
        ... 4 more
Caused by: org.apache.flink.table.api.TableException: Unsupported anonymous function kind 'ASYNC_SCALAR' for class 'com.myudf.MyAsyncScalarFunction'.
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserializeFunctionClass(RexNodeJsonDeserializer.java:457)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserializeCatalogFunction(RexNodeJsonDeserializer.java:480)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserializeSqlOperator(RexNodeJsonDeserializer.java:351)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserializeCall(RexNodeJsonDeserializer.java:318)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserialize(RexNodeJsonDeserializer.java:148)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserialize(RexNodeJsonDeserializer.java:130)
        at org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer.deserialize(RexNodeJsonDeserializer.java:117)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:359)
        ... 35 more
@mbroecheler
Copy link
Contributor Author

Addressed in Flink:
apache/flink#26586

@mbroecheler mbroecheler added this to the v0.6.x milestone May 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Backlog
Development

No branches or pull requests

1 participant