Skip to content

Commit

Permalink
support range exec for CH
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Jan 14, 2025
1 parent 5755368 commit 6a042ba
Show file tree
Hide file tree
Showing 23 changed files with 588 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,40 @@
*/
package org.apache.spark.sql.execution.datasources.clickhouse;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.spark.sql.execution.datasources.mergetree.MetaSerializer;
import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer;
import org.apache.spark.sql.types.StructType;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import scala.collection.Seq;
import scala.collection.Seq$;

public class ExtensionTableBuilder {
private ExtensionTableBuilder() {}

public static ExtensionTableNode makeExtensionTable(
Long start, Long end, Long step, Integer numSlices, Integer sliceIndex)
throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();
json.put("start", start);
json.put("end", end);
json.put("step", step);
json.put("numSlices", numSlices);
json.put("sliceIndex", sliceIndex);

String result;
result = mapper.writeValueAsString(json);
return new ExtensionTableNode(
Collections.emptyList(), result, (Seq<String>) Seq$.MODULE$.<String>empty());
}

public static ExtensionTableNode makeExtensionTable(
String database,
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class ExtensionTableNode implements SplitInfo {
List<String> preferredLocations,
String serializerResult,
scala.collection.Seq<String> pathList) {
this.pathList = pathList;
this.preferredLocations.addAll(preferredLocations);
this.serializerResult = serializerResult;
this.pathList = pathList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,18 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
override def genPartitions(
wsCtx: WholeStageTransformContext,
splitInfos: Seq[Seq[SplitInfo]],
scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = {
leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = {
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splits, index) =>
val (splitInfosByteArray, files) = splits.zipWithIndex.map {
case (split, i) =>
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
case filesNode: LocalFilesNode if leafs(i).isInstanceOf[BasicScanExecTransformer] =>
setFileSchemaForLocalFiles(
filesNode,
leafs(i).asInstanceOf[BasicScanExecTransformer])
(filesNode.toProtobuf.toByteArray, filesNode.getPaths.asScala.toSeq)
case extensionTableNode: ExtensionTableNode =>
(extensionTableNode.toProtobuf.toByteArray, extensionTableNode.getPartList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, CollectList, CollectSet}
import org.apache.spark.sql.catalyst.optimizer.BuildSide
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.{logical, JoinType}
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -931,4 +931,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

override def genRangeExecTransformer(range: logical.Range): SparkPlan =
CHRangeExecTransformer(range)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.{CHRangeMetricsUpdater, MetricsUpdater}
import org.apache.gluten.substrait.`type`._
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct

import scala.collection.JavaConverters;

case class CHRangeExecTransformer(range: org.apache.spark.sql.catalyst.plans.logical.Range)
extends LeafTransformSupport {

override def output: Seq[Attribute] = range.output

override def getSplitInfos: Seq[SplitInfo] = {
val start = range.start
val end = range.end
val step = range.step
val numSlices = range.numSlices.getOrElse(1)

(0 until numSlices).map {
sliceIndex =>
ExtensionTableBuilder.makeExtensionTable(start, end, step, numSlices, sliceIndex)
}
}

override def getPartitions: Seq[InputPartition] = {
val start = range.start
val end = range.end
val step = range.step
val numSlices = range.numSlices.getOrElse(1)

(0 until numSlices).map {
sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, sliceIndex)
}
}

@transient
override lazy val metrics: Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
)

override def metricsUpdater(): MetricsUpdater = new CHRangeMetricsUpdater(metrics)

// No need to do native validation for CH backend. It is validated in [[doTransform]].
override protected def doValidateInternal(): ValidationResult = ValidationResult.succeeded

override def doTransform(context: SubstraitContext): TransformContext = {
val output = range.output
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = JavaConverters
.seqAsJavaListConverter(
output.map(attr => new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)))
.asJava

val optimizationContent = s"isRange=1\n"
val optimization =
BackendsApiManager.getTransformerApiInstance.packPBMessage(
StringValue.newBuilder.setValue(optimizationContent).build)
val extensionNode = ExtensionBuilder.makeAdvancedExtension(optimization, null)
val readNode = RelBuilder.makeReadRel(
typeNodes,
nameList,
columnTypeNodes,
null,
extensionNode,
context,
context.nextOperatorId(this.nodeName))

TransformContext(output, readNode)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.substrait.plan.PlanBuilder

case class GlutenRangeExecPartition(
start: Long,
end: Long,
step: Long,
numSlices: Int,
index: Int,
plan: Array[Byte] = PlanBuilder.EMPTY_PLAN)
extends BaseGlutenPartition {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class CHRangeMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong
metrics("outputVectors") += metricsData.outputVectors

MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("numInputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
)
}
}
}

}

object CHRangeMetricsUpdater {
val INCLUDING_PROCESSORS: Array[String] = Array("SourceFromRange")
val CH_PLAN_NODE_NAME: Array[String] = Array("SourceFromRange")
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos().size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(conf._2)(plans.head.getSplitInfos().size)
assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
Expand All @@ -1610,7 +1610,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos().size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down Expand Up @@ -1718,7 +1718,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
assertResult(conf._2)(scanExec(1).getSplitInfos().size)
assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {

/** Generate native row partition. */
override def genPartitions(
wsCtx: WholeStageTransformContext,
splitInfos: Seq[Seq[SplitInfo]],
scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = {
wsCtx: WholeStageTransformContext,
splitInfos: Seq[Seq[SplitInfo]],
leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = {
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray

Expand Down
Loading

0 comments on commit 6a042ba

Please sign in to comment.