Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall
import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.table.planner.codegen.Indenter.toISC
import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
import org.apache.flink.table.planner.codegen.calls.{BridgingFunctionGenUtil, ScalarOperatorGens}
import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil.{verifyFunctionAwareOutputType, DefaultExpressionEvaluatorFactory}
import org.apache.flink.table.planner.delegation.PlannerBase
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
Expand All @@ -44,7 +44,8 @@ import org.apache.flink.table.types.extraction.ExtractionUtils
import org.apache.flink.table.types.inference.TypeInferenceUtil
import org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo
import org.apache.flink.table.types.logical.LogicalType
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks
import org.apache.flink.table.types.logical.utils.{LogicalTypeCasts, LogicalTypeChecks}
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isCompositeType
import org.apache.flink.types.Row

import org.apache.calcite.rex.{RexCall, RexNode}
Expand Down Expand Up @@ -385,8 +386,12 @@ object ProcessTableRunnerGenerator {
val inputRowTerm = "inputRow"
val collectorTerm = "evalCollector"
val callOperands = generateEvalOperands(ctx, call, inputIndexTerm, inputRowTerm)
val castedOperands = castOperandsIfNeeded(ctx, callOperands, enrichedArgumentDataTypes)
val externalCallOperands =
BridgingFunctionGenUtil.prepareExternalOperands(ctx, callOperands, enrichedArgumentDataTypes)
BridgingFunctionGenUtil.prepareExternalOperands(
ctx,
castedOperands,
enrichedArgumentDataTypes)
val allExternalOperands = externalStateOperands ++ externalCallOperands
val allDataTypes = stateDataTypes ++ enrichedArgumentDataTypes

Expand Down Expand Up @@ -456,4 +461,40 @@ object ProcessTableRunnerGenerator {

functionCallCode
}

/**
* Casts operands to match target data types if needed. This handles cases where Calcite didn't
* insert CAST nodes (e.g., for PTF arguments). Only applies to basic/primitive types; composite
* types (ROW, ARRAY, MAP, etc.) are skipped.
*
* @param ctx
* Code generator context
* @param operands
* The operands to potentially cast
* @param targetDataTypes
* The target data types to cast to
* @return
* Operands with casts inserted where needed
*/
def castOperandsIfNeeded(
ctx: CodeGeneratorContext,
operands: Seq[GeneratedExpression],
targetDataTypes: Seq[DataType]): Seq[GeneratedExpression] = {
operands.zip(targetDataTypes).map {
case (operand, targetDataType) =>
val targetType = targetDataType.getLogicalType
// Only cast basic types, not composite types (ROW, ARRAY, MAP, etc.)
if (
!isCompositeType(operand.resultType) &&
!isCompositeType(targetType) &&
operand.resultType != targetType &&
LogicalTypeCasts.supportsImplicitCast(operand.resultType, targetType)
) {
// Need an explicit cast for basic types
ScalarOperatorGens.generateCast(ctx, operand, targetType, nullOnFailure = false)
} else {
operand
}
}
}
}
Loading