Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ public List<Integer> getAllBinaryInputOrdinalsWithOffset(int offset) {
.collect(Collectors.toList());
}

public List<TemporalTableSourceSpec> getAllBinaryInputTableSourceSpecs() {
return IntStream.range(0, binaryInputInfos.size())
.mapToObj(
i ->
new TemporalTableSourceSpec(
binaryInputInfos.get(i).tableScan.getTable()))
.collect(Collectors.toList());
}

/**
* Returns a concise summary of all source-to-target association pairs in both {@link
* #binary2BinaryJoinAssociation} and {@link #compositeBinary2BinaryJoinAssociation}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,29 @@ public static Node of(
deltaJoinSpec,
joinType);
}

/**
* Return whether all source ordinals are in left side, and the lookup ordinal is in right
* side.
*/
@JsonIgnore
public boolean isLeftLookupRight() {
if (Arrays.stream(inputTableBinaryInputOrdinals)
.allMatch(i -> i < lookupTableBinaryInputOrdinal)) {
return true;
} else if (Arrays.stream(inputTableBinaryInputOrdinals)
.allMatch(i -> i > lookupTableBinaryInputOrdinal)) {
return false;
} else {
// should not happen
throw new IllegalStateException(
String.format(
"Could not judge the direction of this lookup.\n"
+ "All input ordinals in the stream side is %s\n"
+ "The input ordinals in the lookup side is '%d'",
Arrays.toString(inputTableBinaryInputOrdinals),
lookupTableBinaryInputOrdinal));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@

package org.apache.flink.table.planner.plan.nodes.exec.spec;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.FilterCodeGenerator;
import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedFilterCondition;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinRuntimeTree;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -50,6 +61,7 @@

import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.combineOutputRowType;
import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.splitProjectionAndFilter;
import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;

/**
* A delta join tree used to describe the relationships among one or more joins in the input.
Expand Down Expand Up @@ -189,6 +201,66 @@ private RowType getOutputTypeOnNodeInternal(
return getOutputTypeOnNodeInternal(caresInputOrdinals, joinNode.right, typeFactory);
}

/** Convert this {@link DeltaJoinTree} to {@link DeltaJoinRuntimeTree}. */
public DeltaJoinRuntimeTree convert2RuntimeTree(PlannerBase planner, ExecNodeConfig config) {
return new DeltaJoinRuntimeTree(convert2RuntimeTreeInternal(planner, config, root));
}

private DeltaJoinRuntimeTree.Node convert2RuntimeTreeInternal(
PlannerBase planner, ExecNodeConfig config, Node node) {
ClassLoader classLoader = planner.getFlinkContext().getClassLoader();
FlinkTypeFactory typeFactory = unwrapTypeFactory(planner);
RowType rowTypeBeforeCalc = node.getRowTypeBeforeCalc(typeFactory);
RowType rowTypePassThroughCalc = node.getRowTypeAfterCalc(typeFactory);

String generatedCalcName =
node instanceof BinaryInputNode
? "BinaryInputNodeCalcFunction"
: "JoinNodeCalcFunction";
GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
Optional.ofNullable(node.projection)
.map(
projection ->
LookupJoinCodeGenerator.generateCalcMapFunction(
config,
classLoader,
JavaScalaConversionUtil.toScala(node.projection),
node.filter,
rowTypePassThroughCalc,
rowTypeBeforeCalc,
generatedCalcName))
.orElse(null);

if (node instanceof BinaryInputNode) {
return new DeltaJoinRuntimeTree.BinaryInputNode(
((BinaryInputNode) node).inputOrdinal,
generatedCalc,
InternalSerializers.create(rowTypePassThroughCalc));
}
JoinNode joinNode = (JoinNode) node;

DeltaJoinRuntimeTree.Node newLeft =
convert2RuntimeTreeInternal(planner, config, joinNode.left);
DeltaJoinRuntimeTree.Node newRight =
convert2RuntimeTreeInternal(planner, config, joinNode.right);

GeneratedFilterCondition generatedJoinCondition =
FilterCodeGenerator.generateFilterCondition(
config,
classLoader,
joinNode.condition,
joinNode.getRowTypeBeforeCalc(typeFactory),
"JoinCondition");

return new DeltaJoinRuntimeTree.JoinNode(
joinNode.joinType,
generatedJoinCondition,
generatedCalc,
newLeft,
newRight,
InternalSerializers.create(rowTypePassThroughCalc));
}

private static List<Integer> getAllInputOrdinals(Node node) {
List<Integer> collector = new ArrayList<>();
collectAllInputOrdinals(node, collector);
Expand Down
Loading