diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java index bfec6ac64b29..adb588bd9939 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java @@ -16,16 +16,40 @@ */ package org.apache.spark.sql.execution.datasources.clickhouse; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.spark.sql.execution.datasources.mergetree.MetaSerializer; import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer; import org.apache.spark.sql.types.StructType; +import java.util.Collections; import java.util.List; import java.util.Map; +import scala.collection.Seq; +import scala.collection.Seq$; + public class ExtensionTableBuilder { private ExtensionTableBuilder() {} + public static ExtensionTableNode makeExtensionTable( + Long start, Long end, Long step, Integer numSlices, Integer sliceIndex) + throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode json = mapper.createObjectNode(); + json.put("start", start); + json.put("end", end); + json.put("step", step); + json.put("numSlices", numSlices); + json.put("sliceIndex", sliceIndex); + + String result; + result = mapper.writeValueAsString(json); + return new ExtensionTableNode( + Collections.emptyList(), result, (Seq) Seq$.MODULE$.empty()); + } + public static ExtensionTableNode makeExtensionTable( String database, String tableName, diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java index bb04652be440..7e5de6eb4636 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableNode.java @@ -35,9 +35,9 @@ public class ExtensionTableNode implements SplitInfo { List preferredLocations, String serializerResult, scala.collection.Seq pathList) { - this.pathList = pathList; this.preferredLocations.addAll(preferredLocations); this.serializerResult = serializerResult; + this.pathList = pathList; } @Override diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index c6ec95ded55f..a74779ba5d16 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -234,7 +234,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { override def genPartitions( wsCtx: WholeStageTransformContext, splitInfos: Seq[Seq[SplitInfo]], - scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = { + leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = { // Only serialize plan once, save lots time when plan is complex. val planByteArray = wsCtx.root.toProtobuf.toByteArray splitInfos.zipWithIndex.map { @@ -242,8 +242,10 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { val (splitInfosByteArray, files) = splits.zipWithIndex.map { case (split, i) => split match { - case filesNode: LocalFilesNode => - setFileSchemaForLocalFiles(filesNode, scans(i)) + case filesNode: LocalFilesNode if leafs(i).isInstanceOf[BasicScanExecTransformer] => + setFileSchemaForLocalFiles( + filesNode, + leafs(i).asInstanceOf[BasicScanExecTransformer]) (filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq) case extensionTableNode: ExtensionTableNode => (extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 679ac4435c72..5141d59cc81f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet} import org.apache.spark.sql.catalyst.optimizer.BuildSide -import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.{logical, JoinType} import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution._ @@ -931,4 +931,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { limitExpr: ExpressionTransformer, original: StringSplit): ExpressionTransformer = CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original) + + override def genRangeExecTransformer(range: logical.Range): SparkPlan = + CHRangeExecTransformer(range) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala new file mode 100644 index 000000000000..816a8c5c8b01 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.ConverterUtils +import org.apache.gluten.extension.ValidationResult +import org.apache.gluten.metrics.{CHRangeMetricsUpdater, MetricsUpdater} +import org.apache.gluten.substrait.`type`._ +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.extensions.ExtensionBuilder +import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo} + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.read.InputPartition +import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} + +import com.google.protobuf.StringValue +import io.substrait.proto.NamedStruct + +import scala.collection.JavaConverters; + +case class CHRangeExecTransformer(range: org.apache.spark.sql.catalyst.plans.logical.Range) + extends LeafTransformSupport { + + override def output: Seq[Attribute] = range.output + + override def getSplitInfos: Seq[SplitInfo] = { + val start = range.start + val end = range.end + val step = range.step + val numSlices = range.numSlices.getOrElse(1) + + (0 until numSlices).map { + sliceIndex => + ExtensionTableBuilder.makeExtensionTable(start, end, step, numSlices, sliceIndex) + } + } + + override def getPartitions: Seq[InputPartition] = { + val start = range.start + val end = range.end + val step = range.step + val numSlices = range.numSlices.getOrElse(1) + + (0 until numSlices).map { + sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, sliceIndex) + } + } + + @transient + override lazy val metrics: Map[String, SQLMetric] = + Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), + "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), + "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), + "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), + "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"), + "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time") + ) + + override def metricsUpdater(): MetricsUpdater = new CHRangeMetricsUpdater(metrics) + + // No need to do native validation for CH backend. It is validated in [[doTransform]]. + override protected def doValidateInternal(): ValidationResult = ValidationResult.succeeded + + override def doTransform(context: SubstraitContext): TransformContext = { + val output = range.output + val typeNodes = ConverterUtils.collectAttributeTypeNodes(output) + val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output) + val columnTypeNodes = JavaConverters + .seqAsJavaListConverter( + output.map(attr => new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL))) + .asJava + + val optimizationContent = s"isRange=1\n" + val optimization = + BackendsApiManager.getTransformerApiInstance.packPBMessage( + StringValue.newBuilder.setValue(optimizationContent).build) + val extensionNode = ExtensionBuilder.makeAdvancedExtension(optimization, null) + val readNode = RelBuilder.makeReadRel( + typeNodes, + nameList, + columnTypeNodes, + null, + extensionNode, + context, + context.nextOperatorId(this.nodeName)) + + TransformContext(output, readNode) + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenRangeExecPartition.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenRangeExecPartition.scala new file mode 100644 index 000000000000..f17eaba13ef4 --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenRangeExecPartition.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.substrait.plan.PlanBuilder + +case class GlutenRangeExecPartition( + start: Long, + end: Long, + step: Long, + numSlices: Int, + index: Int, + plan: Array[Byte] = PlanBuilder.EMPTY_PLAN) + extends BaseGlutenPartition {} diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/CHRangeMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/CHRangeMetricsUpdater.scala new file mode 100644 index 000000000000..519abc51a61f --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/CHRangeMetricsUpdater.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.metrics + +import org.apache.spark.sql.execution.metric.SQLMetric + +class CHRangeMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { + override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { + if (opMetrics != null) { + val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] + if (!operatorMetrics.metricsList.isEmpty) { + val metricsData = operatorMetrics.metricsList.get(0) + metrics("totalTime") += (metricsData.time / 1000L).toLong + metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong + metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong + metrics("outputVectors") += metricsData.outputVectors + + MetricsUtil.updateExtraTimeMetric( + metricsData, + metrics("extraTime"), + metrics("numOutputRows"), + metrics("outputBytes"), + metrics("numInputRows"), + metrics("inputBytes"), + GenerateMetricsUpdater.INCLUDING_PROCESSORS, + GenerateMetricsUpdater.CH_PLAN_NODE_NAME + ) + } + } + } + +} + +object CHRangeMetricsUpdater { + val INCLUDING_PROCESSORS: Array[String] = Array("SourceFromRange") + val CH_PLAN_NODE_NAME: Array[String] = Array("SourceFromRange") +} diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 9a5e0bad9f49..e78073076c44 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -632,7 +632,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos().size) + assertResult(1)(plans.head.getSplitInfos.size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index d3393730bc9e..c00bf7a2b8b6 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1586,7 +1586,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(conf._2)(plans.head.getSplitInfos().size) + assertResult(conf._2)(plans.head.getSplitInfos.size) } } }) @@ -1610,7 +1610,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos().size) + assertResult(1)(plans.head.getSplitInfos.size) } } } @@ -1718,7 +1718,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: BasicScanExecTransformer => f } assertResult(2)(scanExec.size) - assertResult(conf._2)(scanExec(1).getSplitInfos().size) + assertResult(conf._2)(scanExec(1).getSplitInfos.size) } } }) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index c06a8cfc9880..95e154a3fa9c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -89,9 +89,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { /** Generate native row partition. */ override def genPartitions( - wsCtx: WholeStageTransformContext, - splitInfos: Seq[Seq[SplitInfo]], - scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = { + wsCtx: WholeStageTransformContext, + splitInfos: Seq[Seq[SplitInfo]], + leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = { // Only serialize plan once, save lots time when plan is complex. val planByteArray = wsCtx.root.toProtobuf.toByteArray diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp index 2e4dd4af1b82..6dc74179b7c0 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp @@ -29,14 +29,17 @@ #include #include #include +#include #include #include +#include namespace DB { namespace Setting { extern const SettingsMaxThreads max_threads; +extern const SettingsUInt64 max_block_size; } namespace ErrorCodes { @@ -50,15 +53,36 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra { if (query_plan) throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source node's input plan should be null"); + const auto & read = rel.read(); - if (read.has_local_files() || (!read.has_extension_table() && !isReadFromMergeTree(read))) + + if (isReadRelFromMergeTree(read)) + { + substrait::ReadRel::ExtensionTable extension_table; + if (read.has_extension_table()) + extension_table = read.extension_table(); + else + { + extension_table = BinaryToMessage(split_info); + debug::dumpMessage(extension_table, "extension_table"); + } + + MergeTreeRelParser merge_tree_parser(parser_context, getContext()); + query_plan = merge_tree_parser.parseReadRel(std::make_unique(), read, extension_table); + steps = merge_tree_parser.getSteps(); + } + else if (isReadRelFromLocalFile(read) || isReadRelFromJavaIter(read) || isReadRelFromRange(read)) { - assert(read.has_base_schema()); + chassert(read.has_base_schema()); DB::QueryPlanStepPtr read_step; - if (isReadRelFromJava(read)) + + if (isReadRelFromJavaIter(read)) read_step = parseReadRelWithJavaIter(read); - else + else if (isReadRelFromRange(read)) + read_step = parseReadRelWithRange(read); + else if (isReadRelFromLocalFile(read)) read_step = parseReadRelWithLocalFile(read); + query_plan = std::make_unique(); steps.emplace_back(read_step.get()); query_plan->addStep(std::move(read_step)); @@ -71,40 +95,57 @@ DB::QueryPlanPtr ReadRelParser::parse(DB::QueryPlanPtr query_plan, const substra } } else - { - substrait::ReadRel::ExtensionTable extension_table; - if (read.has_extension_table()) - extension_table = read.extension_table(); - else - { - extension_table = BinaryToMessage(split_info); - debug::dumpMessage(extension_table, "extension_table"); - } - MergeTreeRelParser mergeTreeParser(parser_context, getContext()); - query_plan = mergeTreeParser.parseReadRel(std::make_unique(), read, extension_table); - steps = mergeTreeParser.getSteps(); - } + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown read rel:{}", read.ShortDebugString()); + return query_plan; } -bool ReadRelParser::isReadRelFromJava(const substrait::ReadRel & rel) +bool ReadRelParser::isReadRelFromJavaIter(const substrait::ReadRel & rel) { return rel.has_local_files() && rel.local_files().items().size() == 1 && rel.local_files().items().at(0).uri_file().starts_with("iterator"); } -bool ReadRelParser::isReadFromMergeTree(const substrait::ReadRel & rel) +bool ReadRelParser::isReadRelFromLocalFile(const substrait::ReadRel & rel) +{ + if (rel.has_local_files()) + return !isReadRelFromJavaIter(rel); + else + return !rel.has_extension_table() && !isReadRelFromMergeTree(rel) && !isReadRelFromRange(rel); +} + +bool ReadRelParser::isReadRelFromMergeTree(const substrait::ReadRel & rel) { - assert(rel.has_advanced_extension()); - bool is_read_from_merge_tree; + if (!rel.has_advanced_extension()) + return false; + google::protobuf::StringValue optimization; optimization.ParseFromString(rel.advanced_extension().optimization().value()); ReadBufferFromString in(optimization.value()); if (!checkString("isMergeTree=", in)) return false; - readBoolText(is_read_from_merge_tree, in); + + bool is_merge_tree = false; + readBoolText(is_merge_tree, in); + assertChar('\n', in); + return is_merge_tree; +} + +bool ReadRelParser::isReadRelFromRange(const substrait::ReadRel & rel) +{ + if (!rel.has_advanced_extension()) + return false; + + google::protobuf::StringValue optimization; + optimization.ParseFromString(rel.advanced_extension().optimization().value()); + ReadBufferFromString in(optimization.value()); + if (!checkString("isRange=", in)) + return false; + + bool is_range = false; + readBoolText(is_range, in); assertChar('\n', in); - return is_read_from_merge_tree; + return is_range; } DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::ReadRel & rel) @@ -126,6 +167,7 @@ DB::QueryPlanStepPtr ReadRelParser::parseReadRelWithJavaIter(const substrait::Re QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadRel & rel) { auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + substrait::ReadRel::LocalFiles local_files; if (rel.has_local_files()) local_files = rel.local_files(); @@ -134,10 +176,12 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR local_files = BinaryToMessage(split_info); debug::dumpMessage(local_files, "local_files"); } + auto source = std::make_shared(getContext(), header, local_files); auto source_pipe = Pipe(source); auto source_step = std::make_unique(getContext(), std::move(source_pipe), "substrait local files"); source_step->setStepDescription("read local files"); + if (rel.has_filter()) { DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)}; @@ -149,6 +193,41 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR return source_step; } +QueryPlanStepPtr ReadRelParser::parseReadRelWithRange(const substrait::ReadRel & rel) +{ + substrait::ReadRel::ExtensionTable extension_table; + if (rel.has_extension_table()) + extension_table = rel.extension_table(); + else + { + extension_table = BinaryToMessage(split_info); + debug::dumpMessage(extension_table, "extension_table"); + } + + chassert(extension_table.has_detail()); + std::string str_range_info = toString(extension_table.detail()); + std::cout << "range_info:" << str_range_info << std::endl; + + rapidjson::Document document; + document.Parse(str_range_info.c_str()); + if (!document.HasMember("start") || !document.HasMember("end") || !document.HasMember("step") || !document.HasMember("numSlices") + || !document.HasMember("sliceIndex")) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Missing required fields in range info"); + + Int64 start = document["start"].GetInt64(); + Int64 end = document["end"].GetInt64(); + Int64 step = document["step"].GetInt64(); + Int32 num_slices = document["numSlices"].GetInt(); + Int32 slice_index = document["sliceIndex"].GetInt(); + + auto header = TypeParser::buildBlockFromNamedStruct(rel.base_schema()); + size_t max_block_size = getContext()->getSettingsRef()[Setting::max_block_size]; + auto source = std::make_shared(header, start, end, step, num_slices, slice_index, max_block_size); + QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); + source_step->setStepDescription("Read From Range Exec"); + return source_step; +} + void registerReadRelParser(RelParserFactory & factory) { auto builder = [](ParserContextPtr parser_context) { return std::make_unique(parser_context); }; diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h index 7f84a89ed49c..0bc095a3e03e 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h +++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.h @@ -38,8 +38,10 @@ class ReadRelParser : public RelParser // This is source node, there is no input std::optional getSingleInput(const substrait::Rel & rel) override { return {}; } - bool isReadRelFromJava(const substrait::ReadRel & rel); - bool isReadFromMergeTree(const substrait::ReadRel & rel); + bool isReadRelFromLocalFile(const substrait::ReadRel & rel); + bool isReadRelFromJavaIter(const substrait::ReadRel & rel); + bool isReadRelFromMergeTree(const substrait::ReadRel & rel); + bool isReadRelFromRange(const substrait::ReadRel & rel); void setInputIter(jobject input_iter_, bool is_materialze) { @@ -47,13 +49,15 @@ class ReadRelParser : public RelParser is_input_iter_materialize = is_materialze; } - void setSplitInfo(String split_info_) { split_info = split_info_; } + void setSplitInfo(String split_info_) { split_info = std::move(split_info_); } private: + DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel); + DB::QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel); + DB::QueryPlanStepPtr parseReadRelWithRange(const substrait::ReadRel & rel); + jobject input_iter; bool is_input_iter_materialize; String split_info; - DB::QueryPlanStepPtr parseReadRelWithJavaIter(const substrait::ReadRel & rel); - DB::QueryPlanStepPtr parseReadRelWithLocalFile(const substrait::ReadRel & rel); }; } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index e1733e915a8a..10422e6df9b0 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -235,36 +235,39 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list rel_stack.pop_back(); - // source node is special + /// Sperical process for read relation because it may be incomplete when reading from scans/mergetrees/ranges. if (rel.rel_type_case() == substrait::Rel::RelTypeCase::kRead) { - assert(all_input_rels.empty()); + chassert(all_input_rels.empty()); auto read_rel_parser = std::dynamic_pointer_cast(rel_parser); const auto & read = rel.read(); - if (read.has_local_files()) + + if (read_rel_parser->isReadRelFromJavaIter(read)) { - if (read_rel_parser->isReadRelFromJava(read)) - { - auto iter = read.local_files().items().at(0).uri_file(); - auto pos = iter.find(':'); - auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); - auto [input_iter, materalize_input] = getInputIter(static_cast(iter_index)); - read_rel_parser->setInputIter(input_iter, materalize_input); - } + /// If read from java iter, local_files is guranteed to be set in read rel. + auto iter = read.local_files().items().at(0).uri_file(); + auto pos = iter.find(':'); + auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); + auto [input_iter, materalize_input] = getInputIter(static_cast(iter_index)); + read_rel_parser->setInputIter(input_iter, materalize_input); } - else if (read_rel_parser->isReadFromMergeTree(read)) + else if (read_rel_parser->isReadRelFromMergeTree(read)) { if (!read.has_extension_table()) - { read_rel_parser->setSplitInfo(nextSplitInfo()); - } } - else if (!read.has_local_files() && !read.has_extension_table()) + else if (read_rel_parser->isReadRelFromRange(read)) { - // read from split files - auto split_info = nextSplitInfo(); - read_rel_parser->setSplitInfo(split_info); + if (!read.has_extension_table()) + read_rel_parser->setSplitInfo(nextSplitInfo()); } + else if (read_rel_parser->isReadRelFromLocalFile(read)) + { + if (!read.has_local_files()) + read_rel_parser->setSplitInfo(nextSplitInfo()); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read rel:{}", read.ShortDebugString()); } DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); diff --git a/cpp-ch/local-engine/Storages/SourceFromRange.cpp b/cpp-ch/local-engine/Storages/SourceFromRange.cpp new file mode 100644 index 000000000000..36a01fb80961 --- /dev/null +++ b/cpp-ch/local-engine/Storages/SourceFromRange.cpp @@ -0,0 +1,132 @@ +#include "SourceFromRange.h" + +#include +#include +#include +#include + +using namespace DB; + +namespace DB::ErrorCodes +{ +extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; +extern const int TYPE_MISMATCH; +} + +namespace local_engine +{ +SourceFromRange::SourceFromRange(const DB::Block & header_, Int64 start_, Int64 end_, Int64 step_, Int32 num_slices_, Int32 slice_index_, size_t max_block_size_) + : DB::ISource(header_) + , start(start_) + , end(end_) + , step(step_) + , num_slices(num_slices_) + , slice_index(slice_index_) + , max_block_size(max_block_size_) + , num_elements(getNumElements()) + , is_empty_range(start == end ) +{ + const auto & header = getOutputs().front().getHeader(); + if (header.columns() != 1) + throw Exception(ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH, "Expected 1 column, got {}", header.columns()); + if (!header.getByPosition(0).type->equals(DataTypeInt64())) + throw Exception(ErrorCodes::TYPE_MISMATCH, "Expected Int64 column, got {}", header.getByPosition(0).type->getName()); + if (step == 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Step cannot be zero"); + + + Int128 partition_start = (slice_index * num_elements) / num_slices * step + start; + Int128 partition_end = (((slice_index + 1) * num_elements) / num_slices) * step + start; + + auto get_safe_margin = [](Int128 bi) -> Int64 + { + if (bi <= std::numeric_limits::max() && bi >= std::numeric_limits::min()) + return static_cast(bi); + else if (bi > 0) + return std::numeric_limits::max(); + else + return std::numeric_limits::min(); + }; + + safe_partition_start = get_safe_margin(partition_start); + safe_partition_end = get_safe_margin(partition_end); + current = safe_partition_start; + previous = 0; + overflow = false; + + /* + // Print all member variables + std::cout << "start: " << start << std::endl; + std::cout << "end: " << end << std::endl; + std::cout << "step: " << step << std::endl; + std::cout << "num_slices: " << num_slices << std::endl; + std::cout << "slice_index: " << slice_index << std::endl; + std::cout << "max_block_size: " << max_block_size << std::endl; + // std::cout << "num_elements: " << num_elements << std::endl; + std::cout << "is_empty_range: " << is_empty_range << std::endl; + std::cout << "safe_partition_start: " << safe_partition_start << std::endl; + std::cout << "safe_partition_end: " << safe_partition_end << std::endl; + std::cout << "current: " << current << std::endl; + std::cout << "previous: " << previous << std::endl; + std::cout << "overflow: " << overflow << std::endl; + */ +} + +Int128 SourceFromRange::getNumElements() const +{ + const auto safe_start = static_cast(start); + const auto safe_end = static_cast(end); + if ((safe_end - safe_start) % step == 0 || (safe_end > safe_start) != (step > 0)) + { + return (safe_end - safe_start) / step; + } + else + { + // the remainder has the same sign with range, could add 1 more + return (safe_end - safe_start) / step + 1; + } +} + + +DB::Chunk SourceFromRange::generate() +{ + if (is_empty_range) + return {}; + + if (overflow || (step > 0 && current >= safe_partition_end) || (step < 0 && current <= safe_partition_end)) + return {}; + + + auto column = DB::ColumnInt64::create(); + auto & data = column->getData(); + data.resize_exact(max_block_size); + + size_t row_i = 0; + if (step > 0) + { + for (; current < safe_partition_end && !overflow && row_i < max_block_size; ++row_i) + { + previous = current; + data[row_i] = current; + current += step; + overflow = current < previous; + } + } + else + { + for (; current > safe_partition_end && !overflow && row_i < max_block_size; ++row_i) + { + previous = current; + data[row_i] = current; + current += step; + overflow = current > previous; + } + } + data.resize_exact(row_i); + + // std::cout << "gen rows:" << column->size() << std::endl; + DB::Columns columns; + columns.push_back(std::move(column)); + return DB::Chunk(std::move(columns), row_i); +} +} diff --git a/cpp-ch/local-engine/Storages/SourceFromRange.h b/cpp-ch/local-engine/Storages/SourceFromRange.h new file mode 100644 index 000000000000..1a50d99c6645 --- /dev/null +++ b/cpp-ch/local-engine/Storages/SourceFromRange.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include + +namespace local_engine +{ +class SourceFromRange : public DB::ISource +{ +public: + SourceFromRange( + const DB::Block & header, + Int64 start_, + Int64 end_, + Int64 step_, + Int32 num_slices_, + Int32 slice_index_, + size_t max_block_size_ = 8192); + ~SourceFromRange() override = default; + + String getName() const override { return "SourceFromRange"; } + + +private: + DB::Chunk generate() override; + + Int128 getNumElements() const; + + const Int64 start; + const Int64 end; + const Int64 step; + const Int32 num_slices; + const Int32 slice_index; + const size_t max_block_size; + const Int128 num_elements; + const bool is_empty_range; + + Int64 safe_partition_start; + Int64 safe_partition_end; + Int64 current; + Int64 previous; + bool overflow; +}; +} + diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp index ef350472b64c..90cf0ff2ff09 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -159,9 +159,11 @@ bool SubstraitFileSource::tryPrepareReader() } else file_reader = std::make_unique(current_file, context, to_read_header, output_header); + input_file_name_parser.setFileName(current_file->getURIPath()); input_file_name_parser.setBlockStart(current_file->getStartOffset()); input_file_name_parser.setBlockLength(current_file->getLength()); + file_reader->applyKeyCondition(key_condition, column_index_filter); return true; } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 6f3df78fb61f..b80427ae3c06 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -315,6 +315,7 @@ JNIEXPORT jstring Java_org_apache_gluten_vectorized_BatchIterator_nativeFetchMet const local_engine::LocalExecutor * executor = reinterpret_cast(executor_address); const auto metric = executor->getMetric(); const String metrics_json = metric ? local_engine::RelMetricSerializer::serializeRelMetric(metric) : ""; + // std::cout << "metrics_json: " << metrics_json << std::endl; return local_engine::charTojstring(env, metrics_json.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 4e9b939debe5..2ae59db8429c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -17,7 +17,7 @@ package org.apache.gluten.backendsapi import org.apache.gluten.config.GlutenNumaBindingInfo -import org.apache.gluten.execution.{BaseGlutenPartition, BasicScanExecTransformer, WholeStageTransformContext} +import org.apache.gluten.execution.{BaseGlutenPartition, LeafTransformSupport, WholeStageTransformContext} import org.apache.gluten.metrics.IMetrics import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat @@ -43,7 +43,7 @@ trait IteratorApi { def genPartitions( wsCtx: WholeStageTransformContext, splitInfos: Seq[Seq[SplitInfo]], - scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] + leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] /** * Inject the task attempt temporary path for native write files, this method should be called diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 7432bc0af9e8..d0c07a4c1396 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -694,4 +694,9 @@ trait SparkPlanExecApi { limitExpr: ExpressionTransformer, original: StringSplit): ExpressionTransformer = GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original) + + def genRangeExecTransformer( + range: org.apache.spark.sql.catalyst.plans.logical.Range): SparkPlan = { + throw new GlutenNotSupportException("RangeExec is not supported") + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index 3b3ad4aa5c61..b5d3c03fc6ef 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -63,7 +63,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos(): Seq[SplitInfo] = { + override def getSplitInfos: Seq[SplitInfo] = { getSplitInfosFromPartitions(getPartitions) } @@ -74,7 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource _, getPartitionSchema, fileFormat, - getMetadataColumns.map(_.name), + getMetadataColumns().map(_.name), getProperties)) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala index a54bbfad4e54..45cb0e1e3e48 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/GlutenWholeStageColumnarRDD.scala @@ -39,7 +39,7 @@ case class GlutenPartition( splitInfosByteArray: Array[Array[Byte]] = Array.empty[Array[Byte]], locations: Array[String] = Array.empty[String], files: Array[String] = - Array.empty[String] // touched files, for implementing UDF input_file_names + Array.empty[String] // touched files, for implementing UDF input_file_name ) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = locations diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 7a06709da097..e351afe71d1f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -186,6 +186,8 @@ trait TransformSupport extends ValidatablePlan { trait LeafTransformSupport extends TransformSupport with LeafExecNode { final override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = Seq.empty + def getSplitInfos: Seq[SplitInfo] + def getPartitions: Seq[InputPartition] } trait UnaryTransformSupport extends TransformSupport with UnaryExecNode { @@ -322,33 +324,34 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f } /** Find all BasicScanExecTransformer in one WholeStageTransformer */ - private def findAllScanTransformers(): Seq[BasicScanExecTransformer] = { - val basicScanExecTransformers = new mutable.ListBuffer[BasicScanExecTransformer]() + private def findAllLeafTransformers(): Seq[LeafTransformSupport] = { + val allLeafTransformers = new mutable.ListBuffer[LeafTransformSupport]() def transformChildren( plan: SparkPlan, - basicScanExecTransformers: mutable.ListBuffer[BasicScanExecTransformer]): Unit = { + leafTransformers: mutable.ListBuffer[LeafTransformSupport]): Unit = { if (plan != null && plan.isInstanceOf[TransformSupport]) { plan match { - case transformer: BasicScanExecTransformer => - basicScanExecTransformers.append(transformer) + case transformer: LeafTransformSupport => + leafTransformers.append(transformer) case _ => } + // according to the substrait plan order // SHJ may include two scans in a whole stage. plan match { case shj: HashJoinLikeExecTransformer => - transformChildren(shj.streamedPlan, basicScanExecTransformers) - transformChildren(shj.buildPlan, basicScanExecTransformers) + transformChildren(shj.streamedPlan, leafTransformers) + transformChildren(shj.buildPlan, leafTransformers) case t: TransformSupport => t.children - .foreach(transformChildren(_, basicScanExecTransformers)) + .foreach(transformChildren(_, leafTransformers)) } } } - transformChildren(child, basicScanExecTransformers) - basicScanExecTransformers.toSeq + transformChildren(child, allLeafTransformers) + allLeafTransformers.toSeq } override def doExecuteColumnar(): RDD[ColumnarBatch] = { @@ -363,22 +366,21 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f GlutenConfig.get.substraitPlanLogLevel, s"$nodeName generating the substrait plan took: $t ms.")) val inputRDDs = new ColumnarInputRDDsWrapper(columnarInputRDDs) - // Check if BatchScan exists. - val basicScanExecTransformers = findAllScanTransformers() - if (basicScanExecTransformers.nonEmpty) { + val leafTransformers = findAllLeafTransformers() + if (leafTransformers.nonEmpty) { /** * If containing scan exec transformer this "whole stage" generates a RDD which itself takes * care of SCAN there won't be any other RDD for SCAN. As a result, genFirstStageIterator * rather than genFinalStageIterator will be invoked */ - val allScanPartitions = basicScanExecTransformers.map(_.getPartitions.toIndexedSeq) - val allScanSplitInfos = - getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) + val allInputPartitions = leafTransformers.map(_.getPartitions.toIndexedSeq) + val allSplitInfos = getSplitInfosFromPartitions(leafTransformers) + if (GlutenConfig.get.enableHdfsViewfs) { val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty - allScanSplitInfos.foreach { + allSplitInfos.foreach { splitInfos => splitInfos.foreach { case splitInfo: LocalFilesNode => @@ -394,8 +396,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, - allScanSplitInfos, - basicScanExecTransformers) + allSplitInfos, + leafTransformers) val rdd = new GlutenWholeStageColumnarRDD( sparkContext, @@ -410,9 +412,10 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f wsCtx.substraitContext.registeredAggregationParams ) ) - (0 until allScanPartitions.head.size).foreach( + + allInputPartitions.head.indices.foreach( i => { - val currentPartitions = allScanPartitions.map(_(i)) + val currentPartitions = allInputPartitions.map(_(i)) currentPartitions.indices.foreach( i => currentPartitions(i) match { @@ -421,6 +424,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f case _ => }) }) + rdd } else { @@ -481,8 +485,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f copy(child = newChild, materializeInput = materializeInput)(transformStageId) private def getSplitInfosFromPartitions( - basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { + leafTransformers: Seq[LeafTransformSupport]): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -498,17 +501,14 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f // p14 | p24 // ... // p1n | p2n => substraitContext.setSplitInfo([p1n, p2n]) - val allScanSplitInfos = - allScanPartitions.zip(basicScanExecTransformers).map { - case (partition, transformer) => - transformer.getSplitInfosFromPartitions(partition) - } - val partitionLength = allScanSplitInfos.head.size - if (allScanSplitInfos.exists(_.size != partitionLength)) { + val allSplitInfos = leafTransformers.map(_.getSplitInfos) + val partitionLength = allSplitInfos.head.size + if (allSplitInfos.exists(_.size != partitionLength)) { throw new GlutenException( "The partition length of all the scan transformer are not the same.") } - allScanSplitInfos.transpose + + allSplitInfos.transpose } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 8d0abcaa686a..b62127393153 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -333,6 +333,9 @@ object OffloadOthers { plan.withReplacement, plan.seed, child) + case plan: RangeExec => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + BackendsApiManager.getSparkPlanExecApiInstance.genRangeExecTransformer(plan.range) case p if !p.isInstanceOf[GlutenPlan] => logDebug(s"Transformation for ${p.getClass} is currently not supported.") p