Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8492][CH] Offload RangeExec #8518

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,40 @@
*/
package org.apache.spark.sql.execution.datasources.clickhouse;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.spark.sql.execution.datasources.mergetree.MetaSerializer;
import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer;
import org.apache.spark.sql.types.StructType;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import scala.collection.Seq;
import scala.collection.Seq$;

public class ExtensionTableBuilder {
private ExtensionTableBuilder() {}

public static ExtensionTableNode makeExtensionTable(
Long start, Long end, Long step, Integer numSlices, Integer sliceIndex)
throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();
json.put("start", start);
json.put("end", end);
json.put("step", step);
json.put("numSlices", numSlices);
json.put("sliceIndex", sliceIndex);

String result;
result = mapper.writeValueAsString(json);
return new ExtensionTableNode(
Collections.emptyList(), result, (Seq<String>) Seq$.MODULE$.<String>empty());
}

public static ExtensionTableNode makeExtensionTable(
String database,
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class ExtensionTableNode implements SplitInfo {
List<String> preferredLocations,
String serializerResult,
scala.collection.Seq<String> pathList) {
this.pathList = pathList;
this.preferredLocations.addAll(preferredLocations);
this.serializerResult = serializerResult;
this.pathList = pathList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,4 +445,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = true

override def supportRangeExec(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,18 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
override def genPartitions(
wsCtx: WholeStageTransformContext,
splitInfos: Seq[Seq[SplitInfo]],
scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = {
leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray
splitInfos.zipWithIndex.map {
case (splits, index) =>
val splitInfosByteArray = splits.zipWithIndex.map {
case (split, i) =>
split match {
case filesNode: LocalFilesNode =>
setFileSchemaForLocalFiles(filesNode, scans(i))
case filesNode: LocalFilesNode if leafs(i).isInstanceOf[BasicScanExecTransformer] =>
setFileSchemaForLocalFiles(
filesNode,
leafs(i).asInstanceOf[BasicScanExecTransformer])
filesNode.toProtobuf.toByteArray
case extensionTableNode: ExtensionTableNode =>
extensionTableNode.toProtobuf.toByteArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,5 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): ColumnarRangeBaseExec =
throw new GlutenNotSupportException("ColumnarRange is not supported in ch backend.")

CHRangeExecTransformer(start, end, step, numSlices, numElements, outputAttributes, child)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.{CHRangeMetricsUpdater, MetricsUpdater}
import org.apache.gluten.substrait.`type`._
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct

import scala.collection.JavaConverters;

case class CHRangeExecTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan])
extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, outputAttributes, child)
with LeafTransformSupport {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

override def output: Seq[Attribute] = outputAttributes
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved

override def getSplitInfos: Seq[SplitInfo] = {
(0 until numSlices).map {
sliceIndex =>
ExtensionTableBuilder.makeExtensionTable(start, end, step, numSlices, sliceIndex)
}
}

override def getPartitions: Seq[InputPartition] = {
(0 until numSlices).map {
sliceIndex => GlutenRangeExecPartition(start, end, step, numSlices, sliceIndex)
}
}

@transient
override lazy val metrics: Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
)

override def metricsUpdater(): MetricsUpdater = new CHRangeMetricsUpdater(metrics)

// No need to do native validation for CH backend. It is validated in [[doTransform]].
override protected def doValidateInternal(): ValidationResult = ValidationResult.succeeded

override def doTransform(context: SubstraitContext): TransformContext = {
val output = outputAttributes
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = JavaConverters
.seqAsJavaListConverter(
output.map(attr => new ColumnTypeNode(NamedStruct.ColumnType.NORMAL_COL)))
.asJava

val optimizationContent = s"isRange=1\n"
val optimization =
BackendsApiManager.getTransformerApiInstance.packPBMessage(
StringValue.newBuilder.setValue(optimizationContent).build)
val extensionNode = ExtensionBuilder.makeAdvancedExtension(optimization, null)
val readNode = RelBuilder.makeReadRel(
typeNodes,
nameList,
columnTypeNodes,
null,
extensionNode,
context,
context.nextOperatorId(this.nodeName))

TransformContext(output, readNode)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.substrait.plan.PlanBuilder

case class GlutenRangeExecPartition(
start: Long,
end: Long,
step: Long,
numSlices: Int,
index: Int,
plan: Array[Byte] = PlanBuilder.EMPTY_PLAN)
extends BaseGlutenPartition {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class CHRangeMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong
metrics("outputVectors") += metricsData.outputVectors

MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("numInputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
)
}
}
}

}

object CHRangeMetricsUpdater {
val INCLUDING_PROCESSORS: Array[String] = Array("SourceFromRange")
val CH_PLAN_NODE_NAME: Array[String] = Array("SourceFromRange")
}
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos().size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,7 +1586,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(conf._2)(plans.head.getSplitInfos().size)
assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
Expand All @@ -1610,7 +1610,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos().size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down Expand Up @@ -1718,7 +1718,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
assertResult(conf._2)(scanExec(1).getSplitInfos().size)
assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
override def genPartitions(
wsCtx: WholeStageTransformContext,
splitInfos: Seq[Seq[SplitInfo]],
scans: Seq[BasicScanExecTransformer]): Seq[BaseGlutenPartition] = {
leafs: Seq[LeafTransformSupport]): Seq[BaseGlutenPartition] = {
taiyang-li marked this conversation as resolved.
Show resolved Hide resolved
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray

Expand Down
Loading
Loading