Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnavBalyan committed Jan 16, 2025
1 parent ff89539 commit 1c926f4
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -931,4 +931,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

override def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): RangeExecBaseTransformer =
throw new GlutenNotSupportException("ColumnarRange is not supported in ch backend.")

}
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true

override def needPreComputeRangeFrameBoundary(): Boolean = true

override def supportRangeExec(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -827,4 +827,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
}

override def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): RangeExecBaseTransformer =
ColumnarRangeExec(start, end, step, numSlices, numElements, outputAttributes, child)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.vectorized.ArrowWritableColumnVector

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* ColumnarRangeExec is a concrete implementation of RangeExecBaseTransformer that executes the
* Range operation and supports columnar processing. It generates columnar batches for the specified
* range.
*
* @param start
* Starting value of the range.
* @param end
* Ending value of the range.
* @param step
* Step size for the range.
* @param numSlices
* Number of slices for partitioning the range.
* @param numElements
* Total number of elements in the range.
* @param outputAttributes
* Attributes defining the output schema of the operator.
* @param child
* Child SparkPlan nodes for this operator, if any.
*/
case class ColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]
) extends RangeExecBaseTransformer(
start,
end,
step,
numSlices,
numElements,
outputAttributes,
child) {

override def batchType(): Convention.BatchType = {
ArrowJavaBatch
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
if (start == end || (start < end ^ 0 < step)) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex {
(partitionIndex, _) =>
val batchSize = 1000
val safePartitionStart = (partitionIndex) * numElements / numSlices * step + start
val safePartitionEnd = (partitionIndex + 1) * numElements / numSlices * step + start

def getSafeMargin(value: BigInt): Long =
if (value.isValidLong) value.toLong
else if (value > 0) Long.MaxValue
else Long.MinValue

val partitionStart = getSafeMargin(safePartitionStart)
val partitionEnd = getSafeMargin(safePartitionEnd)

/**
* Generates the columnar batches for the specified range. Each batch contains a subset
* of the range values, managed using Arrow column vectors.
*/
val iterator = new Iterator[ColumnarBatch] {
var current = safePartitionStart

override def hasNext: Boolean = {
if (step > 0) {
current < safePartitionEnd
} else {
current > safePartitionEnd
}
}

override def next(): ColumnarBatch = {
val numRows = math.min(
((safePartitionEnd - current) / step).toInt.max(1),
batchSize
)

val vectors = ArrowWritableColumnVector.allocateColumns(numRows, schema)

for (i <- 0 until numRows) {
val value = current + i * step
vectors(0).putLong(i, getSafeMargin(value))
}
vectors.foreach(_.setValueCount(numRows))
current += numRows * step

val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
batch
}
}
Iterators
.wrap(iterator)
.recyclePayload(
batch => {
batch.close()
})
.create()

}
}
}

override protected def doExecute(): RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException("doExecute is not supported for this operator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,8 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false

def needPreComputeRangeFrameBoundary(): Boolean = false

def supportRangeExec(): Boolean = {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,14 @@ trait SparkPlanExecApi {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

def genColumnarRangeExec(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): RangeExecBaseTransformer

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}

/**
* Base class for RangeExec transformation, can be implemented by the by supported backends.
* Currently velox is supported.
*/
abstract class RangeExecBaseTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan])
extends LeafExecNode
with ValidatablePlan {

override def output: Seq[Attribute] = {
outputAttributes
}

override protected def doValidateInternal(): ValidationResult = {
val isSupported = BackendsApiManager.getSettings.supportRangeExec()

if (!isSupported) {
return ValidationResult.failed(
s"RangeExec is not supported by the current backend."
)
}
ValidationResult.succeeded
}

override def rowType0(): Convention.RowType = Convention.RowType.None

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

/**
* Companion object for RangeExecBaseTransformer, provides factory methods to create instance from
* existing RangeExec plan.
*/
object RangeExecBaseTransformer {
def from(rangeExec: RangeExec): RangeExecBaseTransformer = {
BackendsApiManager.getSparkPlanExecApiInstance
.genColumnarRangeExec(
rangeExec.start,
rangeExec.end,
rangeExec.step,
rangeExec.numSlices,
rangeExec.numElements,
rangeExec.output,
rangeExec.children
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ object OffloadOthers {
child,
plan.evalType)
}
case plan: RangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
plan.start,
plan.end,
plan.step,
plan.numSlices,
plan.numElements,
plan.output,
plan.children
)
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ class VeloxTestSettings extends BackendTestSettings {
)
// Double precision loss: https://github.com/facebookincubator/velox/pull/6051#issuecomment-1731028215.
.exclude("SPARK-22271: mean overflows and returns null for some decimal variables")
// Rewrite this test since it checks the physical operator which is changed in Gluten
.exclude("SPARK-27439: Explain result should match collected result after view change")

enableSuite[GlutenDataFrameNaFunctionsSuite]
.exclude(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ class VeloxTestSettings extends BackendTestSettings {
// The describe issue is just fixed by https://github.com/apache/spark/pull/40914.
// We can enable the below test for spark 3.4 and higher versions.
.excludeGlutenTest("describe")
// Rewrite this test since it checks the physical operator which is changed in Gluten
.exclude("SPARK-27439: Explain result should match collected result after view change")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFunctionsSuite]
Expand Down Expand Up @@ -1187,6 +1189,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenSQLRangeExecSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = VeloxSQLQueryTestSettings
}
Expand Down
Loading

0 comments on commit 1c926f4

Please sign in to comment.