Skip to content

Commit

Permalink
[GLUTEN-3620][VL] Support Range operator for Velox Backend (#8161)
Browse files Browse the repository at this point in the history
Closes #3620
  • Loading branch information
ArnavBalyan authored Jan 23, 2025
1 parent 2e27a52 commit 55c065b
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -931,4 +931,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

override def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): ColumnarRangeBaseExec =
throw new GlutenNotSupportException("ColumnarRange is not supported in ch backend.")

}
Original file line number Diff line number Diff line change
Expand Up @@ -576,4 +576,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true

override def needPreComputeRangeFrameBoundary(): Boolean = true

override def supportRangeExec(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ object VeloxRuleApi {
RasOffload.from[LimitExec](OffloadOthers()),
RasOffload.from[GenerateExec](OffloadOthers()),
RasOffload.from[EvalPythonExec](OffloadOthers()),
RasOffload.from[SampleExec](OffloadOthers())
RasOffload.from[SampleExec](OffloadOthers()),
RasOffload.from[RangeExec](OffloadOthers())
)
offloads.foreach(
offload =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,4 +838,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
}

override def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): ColumnarRangeBaseExec =
ColumnarRangeExec(start, end, step, numSlices, numElements, outputAttributes, child)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.vectorized.ArrowWritableColumnVector

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* ColumnarRangeExec is a concrete implementation of ColumnarRangeBaseExec that executes the Range
* operation and supports columnar processing. It generates columnar batches for the specified
* range.
*
* @param start
* Starting value of the range.
* @param end
* Ending value of the range.
* @param step
* Step size for the range.
* @param numSlices
* Number of slices for partitioning the range.
* @param numElements
* Total number of elements in the range.
* @param outputAttributes
* Attributes defining the output schema of the operator.
* @param child
* Child SparkPlan nodes for this operator, if any.
*/
case class ColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]
) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, outputAttributes, child) {

override def batchType(): Convention.BatchType = {
ArrowJavaBatch
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
if (start == end || (start < end ^ 0 < step)) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex {
(partitionIndex, _) =>
val batchSize = 1000
val safePartitionStart = (partitionIndex) * numElements / numSlices * step + start
val safePartitionEnd = (partitionIndex + 1) * numElements / numSlices * step + start

def getSafeMargin(value: BigInt): Long =
if (value.isValidLong) value.toLong
else if (value > 0) Long.MaxValue
else Long.MinValue

val partitionStart = getSafeMargin(safePartitionStart)
val partitionEnd = getSafeMargin(safePartitionEnd)

/**
* Generates the columnar batches for the specified range. Each batch contains a subset
* of the range values, managed using Arrow column vectors.
*/
val iterator = new Iterator[ColumnarBatch] {
var current = safePartitionStart

override def hasNext: Boolean = {
if (step > 0) {
current < safePartitionEnd
} else {
current > safePartitionEnd
}
}

override def next(): ColumnarBatch = {
val numRows = math.min(
((safePartitionEnd - current) / step).toInt.max(1),
batchSize
)

val vectors = ArrowWritableColumnVector.allocateColumns(numRows, schema)

for (i <- 0 until numRows) {
val value = current + i * step
vectors(0).putLong(i, getSafeMargin(value))
}
vectors.foreach(_.setValueCount(numRows))
current += numRows * step

val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
batch
}
}
Iterators
.wrap(iterator)
.recyclePayload(
batch => {
batch.close()
})
.create()

}
}
}

override protected def doExecute(): RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException("doExecute is not supported for this operator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,30 +56,41 @@ trait BackendSettingsApi {
options: Map[String, String]): ValidationResult = ValidationResult.succeeded

def supportNativeWrite(fields: Array[StructField]): Boolean = true

def supportNativeMetadataColumns(): Boolean = false

def supportNativeRowIndexColumn(): Boolean = false

def supportExpandExec(): Boolean = false

def supportSortExec(): Boolean = false

def supportSortMergeJoinExec(): Boolean = true

def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
false
}

def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = {
false
}

def supportColumnarShuffleExec(): Boolean = {
GlutenConfig.get.enableColumnarShuffle
}

def enableJoinKeysRewrite(): Boolean = true

def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
case _: InnerLike | RightOuter | FullOuter => true
case _ => false
}

def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
case _: InnerLike | LeftOuter | FullOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}

def supportStructType(): Boolean = false

def structFieldToLowerCase(): Boolean = true
Expand All @@ -90,6 +101,7 @@ trait BackendSettingsApi {
def recreateJoinExecOnFallback(): Boolean = false

def excludeScanExecFromCollapsedStage(): Boolean = false

def rescaleDecimalArithmetic: Boolean = false

def allowDecimalArithmetic: Boolean = true
Expand Down Expand Up @@ -140,4 +152,7 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false

def needPreComputeRangeFrameBoundary(): Boolean = false

def supportRangeExec(): Boolean = false

}
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,14 @@ trait SparkPlanExecApi {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): ColumnarRangeBaseExec

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}

/**
* Base class for RangeExec transformation, can be implemented by the by supported backends.
* Currently velox is supported.
*/
abstract class ColumnarRangeBaseExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan])
extends LeafExecNode
with ValidatablePlan {

override def output: Seq[Attribute] = {
outputAttributes
}

override def rowType0(): Convention.RowType = Convention.RowType.None

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

/**
* Companion object for ColumnarRangeBaseExec, provides factory methods to create instance from
* existing RangeExec plan.
*/
object ColumnarRangeBaseExec {
def from(rangeExec: RangeExec): ColumnarRangeBaseExec = {
BackendsApiManager.getSparkPlanExecApiInstance
.genColumnarRangeExec(
rangeExec.start,
rangeExec.end,
rangeExec.step,
rangeExec.numSlices,
rangeExec.numElements,
rangeExec.output,
rangeExec.children
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ object OffloadOthers {
child,
plan.evalType)
}
case plan: RangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
plan.start,
plan.end,
plan.step,
plan.numSlices,
plan.numElements,
plan.output,
plan.children
)
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec() => fail(p)
case p: TakeOrderedAndProjectExec if !settings.supportColumnarShuffleExec() => fail(p)
case p: RangeExec if !settings.supportRangeExec() => fail(p)
case _ => pass()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class VeloxTestSettings extends BackendTestSettings {
)
// Double precision loss: https://github.com/facebookincubator/velox/pull/6051#issuecomment-1731028215.
.exclude("SPARK-22271: mean overflows and returns null for some decimal variables")
// Rewrite this test since it checks the physical operator which is changed in Gluten
.exclude("SPARK-27439: Explain result should match collected result after view change")

enableSuite[GlutenDataFrameNaFunctionsSuite]
.exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,8 @@ class VeloxTestSettings extends BackendTestSettings {
// The describe issue is just fixed by https://github.com/apache/spark/pull/40914.
// We can enable the below test for spark 3.4 and higher versions.
.excludeGlutenTest("describe")
// Rewrite this test since it checks the physical operator which is changed in Gluten
.exclude("SPARK-27439: Explain result should match collected result after view change")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFunctionsSuite]
Expand Down Expand Up @@ -1177,6 +1179,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenSQLRangeExecSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Loading

0 comments on commit 55c065b

Please sign in to comment.