diff --git a/pom.xml b/pom.xml
index ddcac283d4..4dd60284f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -738,14 +738,14 @@ under the License.
scala-maven-plugin
- -deprecation
- -unchecked
- -feature
- -Xlint:_
- -Ywarn-dead-code
+
+
+
+
+
-Ywarn-numeric-widen
- -Ywarn-value-discard
- -Ywarn-unused:imports,patvars,privates,locals,params,-implicits
+
+
-Xfatal-warnings
diff --git a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala
index ac6a89ca3b..6618a7ddb0 100644
--- a/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/parquet/SourceFilterSerde.scala
@@ -80,8 +80,8 @@ object SourceFilterSerde extends Logging {
// refer to org.apache.spark.sql.catalyst.CatalystTypeConverters.CatalystTypeConverter#toScala
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
- case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
- case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
+ case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
+ case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala b/spark/src/main/scala/org/apache/comet/serde/literals.scala
index 312f12a4c5..d75b68c23b 100644
--- a/spark/src/main/scala/org/apache/comet/serde/literals.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala
@@ -72,8 +72,8 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging {
exprBuilder.setIsNull(false)
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
- case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
- case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
+ case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte].toInt)
+ case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short].toInt)
case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType | _: TimestampType | _: TimestampNTZType =>
exprBuilder.setLongVal(value.asInstanceOf[Long])
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
index 8c779e8dc9..a6feee4eea 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
@@ -66,7 +66,7 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres
override def next(): ColumnarBatch = {
val batch = batches.next()
- numOutputRows += batch.numRows()
+ numOutputRows += batch.numRows().toLong
batch
}
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
index 95770592fd..8da97271a2 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometBroadcastExchangeExec.scala
@@ -146,7 +146,7 @@ case class CometBroadcastExchangeExec(
longMetric("numOutputRows") += numRows
if (numRows >= maxBroadcastRows) {
throw QueryExecutionErrors.cannotBroadcastTableOverMaxTableRowsError(
- maxBroadcastRows,
+ maxBroadcastRows.toLong,
numRows)
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
index 09794e8e26..5dd45a3345 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometCollectLimitExec.scala
@@ -88,7 +88,7 @@ case class CometCollectLimitExec(
outputPartitioning,
serializer,
metrics)
- metrics("numPartitions").set(dep.partitioner.numPartitions)
+ metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
new CometShuffledBatchRDD(dep, readMetrics)
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
index d965a6ff7b..c6f1dd14d5 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
@@ -82,7 +82,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
- numOutputRows += batch.numRows()
+ numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}
}
@@ -120,7 +120,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
.flatMap(CometUtils.decodeBatches(_, this.getClass.getSimpleName))
.flatMap { batch =>
numInputBatches += 1
- numOutputRows += batch.numRows()
+ numOutputRows += batch.numRows().toLong
batch.rowIterator().asScala.map(toUnsafe)
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
index d4cb11ac62..70cd6e26a5 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala
@@ -242,7 +242,7 @@ case class CometScanExec(
driverMetrics("staticFilesSize") = filesSize
}
if (relation.partitionSchema.nonEmpty) {
- driverMetrics("numPartitions") = partitions.length
+ driverMetrics("numPartitions") = partitions.length.toLong
}
}
@@ -284,7 +284,7 @@ case class CometScanExec(
override def next(): ColumnarBatch = {
val batch = batches.next()
- numOutputRows += batch.numRows()
+ numOutputRows += batch.numRows().toLong
batch
}
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
index bcf8918575..630a33172f 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala
@@ -83,7 +83,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
val startNs = System.nanoTime()
val batch = iter.next()
conversionTime += System.nanoTime() - startNs
- numInputRows += batch.numRows()
+ numInputRows += batch.numRows().toLong
numOutputBatches += 1
batch
}
@@ -123,7 +123,7 @@ case class CometSparkToColumnarExec(child: SparkPlan)
CometArrowConverters.rowToArrowBatchIter(
sparkBatches,
schema,
- maxRecordsPerBatch,
+ maxRecordsPerBatch.toLong,
timeZoneId,
context)
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
index aa89dec137..027aacb602 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala
@@ -96,7 +96,7 @@ case class CometTakeOrderedAndProjectExec(
outputPartitioning,
serializer,
metrics)
- metrics("numPartitions").set(dep.partitioner.numPartitions)
+ metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
new CometShuffledBatchRDD(dep, readMetrics)
}
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala
index 1283a745a6..0a9c0bed12 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala
@@ -111,7 +111,7 @@ class CometBlockStoreShuffleReader[K, C](
// Update the context task metrics for each record read.
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
- readMetrics.incRecordsRead(record._2.numRows())
+ readMetrics.incRecordsRead(record._2.numRows().toLong)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
index 1f7d37a108..14a6dbe589 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
@@ -137,7 +137,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
- metrics("numPartitions").set(dep.partitioner.numPartitions)
+ metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
@@ -151,7 +151,7 @@ case class CometShuffleExchangeExec(
outputPartitioning,
serializer,
metrics)
- metrics("numPartitions").set(dep.partitioner.numPartitions)
+ metrics("numPartitions").set(dep.partitioner.numPartitions.toLong)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
@@ -385,7 +385,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
// end up being almost the same regardless of the index. substantially scrambling the
// seed by hashing will help. Refer to SPARK-21782 for more details.
val partitionId = TaskContext.get().partitionId()
- var position = new XORShiftRandom(partitionId).nextInt(numPartitions)
+ var position = new XORShiftRandom(partitionId.toLong).nextInt(numPartitions)
(_: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of partitions
position += 1
@@ -432,7 +432,7 @@ object CometShuffleExchangeExec extends ShimCometShuffleExchangeExec {
row: InternalRow): UnsafeExternalRowSorter.PrefixComputer.Prefix = {
// The hashcode generated from the binary form of a [[UnsafeRow]] should not be null.
result.isNull = false
- result.value = row.hashCode()
+ result.value = row.hashCode().toLong
result
}
}
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 27c9124f11..e4b6c12b13 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -1162,7 +1162,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
- .range(num)
+ .range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 04203f7545..73273c786a 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1801,7 +1801,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
- .range(numRows)
+ .range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
@@ -1838,7 +1838,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
- .range(numRows)
+ .range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
@@ -1869,7 +1869,7 @@ class CometExecSuite extends CometTestBase {
withTable("t1") {
val numRows = 10
spark
- .range(numRows)
+ .range(numRows.toLong)
.selectExpr("if (id % 2 = 0, null, id) AS a", s"$numRows - id AS b")
.repartition(3) // Force repartition to test data will come to single partition
.write
diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
index f21b59ac1a..8f54da2d29 100644
--- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala
@@ -416,15 +416,15 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
+ record.add(1, i.toByte.toInt)
+ record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
+ record.add(8, (-i).toByte.toInt)
+ record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
@@ -639,8 +639,8 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
+ record.add(1, i.toByte.toInt)
+ record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
@@ -1575,15 +1575,15 @@ abstract class ParquetReadSuite extends CometTestBase {
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
+ record.add(1, i.toByte.toInt)
+ record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
+ record.add(8, (-i).toByte.toInt)
+ record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
@@ -1672,7 +1672,7 @@ abstract class ParquetReadSuite extends CometTestBase {
val record = new SimpleGroup(schema)
opt match {
case Some(i) =>
- record.add(0, i.toShort)
+ record.add(0, i.toShort.toInt)
record.add(1, i)
record.add(2, i.toLong)
case _ =>
@@ -1765,7 +1765,7 @@ abstract class ParquetReadSuite extends CometTestBase {
}
private def withId(id: Int) =
- new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+ new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id.toLong).build()
// Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct")
test("array of nested struct with and without field id") {
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index d4b7b029a3..06381a7dfb 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -21,6 +21,7 @@ package org.apache.spark.sql
import java.util.concurrent.atomic.AtomicInteger
+import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@@ -425,6 +426,7 @@ abstract class CometTestBase
case None => f(spark.read.format("parquet").load(path))
}
+ @nowarn("cat=deprecation")
protected def createParquetWriter(
schema: MessageType,
path: Path,
@@ -434,7 +436,6 @@ abstract class CometTestBase
pageRowCountLimit: Int = ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT,
rowGroupSize: Long = 1024 * 1024L): ParquetWriter[Group] = {
val hadoopConf = spark.sessionState.newHadoopConf()
-
ExampleParquetWriter
.builder(path)
.withDictionaryEncoding(dictionaryEnabled)
@@ -557,15 +558,15 @@ abstract class CometTestBase
opt match {
case Some(i) =>
record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
+ record.add(1, i.toByte.toInt)
+ record.add(2, i.toShort.toInt)
record.add(3, i)
record.add(4, i.toLong)
record.add(5, i.toFloat)
record.add(6, i.toDouble)
record.add(7, i.toString * 48)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
+ record.add(8, (-i).toByte.toInt)
+ record.add(9, (-i).toShort.toInt)
record.add(10, -i)
record.add(11, (-i).toLong)
record.add(12, i.toString)
@@ -586,15 +587,15 @@ abstract class CometTestBase
val i = rand.nextLong()
val record = new SimpleGroup(schema)
record.add(0, i % 2 == 0)
- record.add(1, i.toByte)
- record.add(2, i.toShort)
+ record.add(1, i.toByte.toInt)
+ record.add(2, i.toShort.toInt)
record.add(3, i.toInt)
record.add(4, i)
record.add(5, java.lang.Float.intBitsToFloat(i.toInt))
record.add(6, java.lang.Double.longBitsToDouble(i))
record.add(7, i.toString * 24)
- record.add(8, (-i).toByte)
- record.add(9, (-i).toShort)
+ record.add(8, (-i).toByte.toInt)
+ record.add(9, (-i).toShort.toInt)
record.add(10, (-i).toInt)
record.add(11, -i)
record.add(12, i.toString)
@@ -643,7 +644,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
- Some(getValue(i, div))
+ Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
@@ -697,7 +698,7 @@ abstract class CometTestBase
if (rand.nextBoolean()) {
None
} else {
- Some(getValue(i, div))
+ Some(getValue(i.toLong, div.toLong))
}
}
expected.foreach { opt =>
@@ -875,7 +876,7 @@ abstract class CometTestBase
val div = if (dictionaryEnabled) 10 else n // maps value to a small range for dict to kick in
val expected = (0 until n).map { i =>
- Some(getValue(i, div))
+ Some(getValue(i.toLong, div.toLong))
}
expected.foreach { opt =>
val timestampFormats = List(
@@ -923,7 +924,7 @@ abstract class CometTestBase
def makeDecimalRDD(num: Int, decimal: DecimalType, useDictionary: Boolean): DataFrame = {
val div = if (useDictionary) 5 else num // narrow the space to make it dictionary encoded
spark
- .range(num)
+ .range(num.toLong)
.map(_ % div)
// Parquet doesn't allow column names with spaces, have to add an alias here.
// Minus 500 here so that negative decimals are also tested.
@@ -1103,8 +1104,8 @@ abstract class CometTestBase
val record = new SimpleGroup(schema)
opt match {
case Some(i) =>
- record.add(0, i.toByte)
- record.add(1, i.toShort)
+ record.add(0, i.toByte.toInt)
+ record.add(1, i.toShort.toInt)
record.add(2, i)
record.add(3, i.toLong)
record.add(4, rand.nextFloat())
diff --git a/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala b/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala
index e25d4e51e4..4087bbd28c 100644
--- a/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/GenTPCHData.scala
@@ -65,7 +65,9 @@ object GenTPCHData {
// Install the data generators in all nodes
// TODO: think a better way to install on each worker node
// such as https://stackoverflow.com/a/40876671
- spark.range(0, workers, 1, workers).foreach(worker => installDBGEN(baseDir)(worker))
+ spark
+ .range(0L, workers.toLong, 1L, workers)
+ .foreach(worker => installDBGEN(baseDir)(worker))
s"${baseDir}/dbgen"
} else {
config.dbgenDir
@@ -91,7 +93,7 @@ object GenTPCHData {
// Clean up
if (defaultDbgenDir != null) {
- spark.range(0, workers, 1, workers).foreach { _ =>
+ spark.range(0L, workers.toLong, 1L, workers).foreach { _ =>
val _ = FileUtils.deleteQuietly(defaultDbgenDir)
}
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
index 1efd3974ed..872e5e9e73 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala
@@ -66,7 +66,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate ${aggregateFunction.toString}",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -104,7 +104,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate ${aggregateFunction.toString} on decimal",
- values,
+ values.toLong,
output = output)
val df = makeDecimalDataFrame(values, dataType, false);
@@ -145,7 +145,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " +
s"single aggregate ${aggregateFunction.toString}",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -186,7 +186,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " +
s"multiple aggregates ${aggregateFunction.toString}",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
index c6fe55b56b..af6648241a 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArithmeticBenchmark.scala
@@ -37,7 +37,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
val dataType = IntegerType
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -78,7 +78,7 @@ object CometArithmeticBenchmark extends CometBenchmarkBase {
useDictionary: Boolean): Unit = {
val benchmark = new Benchmark(
s"Binary op ${dataType.sql}, dictionary = $useDictionary",
- values,
+ values.toLong,
output = output)
val df = makeDecimalDataFrame(values, dataType, useDictionary)
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
index 1cbe27be91..22bb5b0b0b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala
@@ -81,7 +81,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
withTempTable(tbl) {
import spark.implicits._
spark
- .range(values)
+ .range(values.toLong)
.map(_ => if (useDictionary) Random.nextLong % 5 else Random.nextLong)
.createOrReplaceTempView(tbl)
runBenchmark(benchmarkName)(f(values))
@@ -168,7 +168,7 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
val div = if (useDictionary) 5 else values
spark
- .range(values)
+ .range(values.toLong)
.map(_ % div)
.select((($"value" - 500) / 100.0) cast decimal as Symbol("dec"))
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
index 0dddfb36a5..4495c9d075 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometConditionalExpressionBenchmark.scala
@@ -32,7 +32,7 @@ import org.apache.comet.CometConf
object CometConditionalExpressionBenchmark extends CometBenchmarkBase {
def caseWhenExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Case When Expr", values, output = output)
+ val benchmark = new Benchmark("Case When Expr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -65,7 +65,7 @@ object CometConditionalExpressionBenchmark extends CometBenchmarkBase {
}
def ifExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("If Expr", values, output = output)
+ val benchmark = new Benchmark("If Expr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
index 0af1ecade5..d9e49a6ca3 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala
@@ -39,7 +39,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl"))
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
- runWithComet(s"Date Truncate $isDictionary - $level", values) {
+ runWithComet(s"Date Truncate $isDictionary - $level", values.toLong) {
spark.sql(s"select trunc(dt, '$level') from parquetV1Table").noop()
}
}
@@ -68,7 +68,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase {
"WEEK",
"QUARTER").foreach { level =>
val isDictionary = if (useDictionary) "(Dictionary)" else ""
- runWithComet(s"Timestamp Truncate $isDictionary - $level", values) {
+ runWithComet(s"Timestamp Truncate $isDictionary - $level", values.toLong) {
spark.sql(s"select date_trunc('$level', ts) from parquetV1Table").noop()
}
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
index 3ee37bd668..dd557ff3f0 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
@@ -71,7 +71,10 @@ object CometExecBenchmark extends CometBenchmarkBase {
def numericFilterExecBenchmark(values: Int, fractionOfZeros: Double): Unit = {
val percentageOfZeros = fractionOfZeros * 100
val benchmark =
- new Benchmark(s"Project + Filter Exec ($percentageOfZeros% zeros)", values, output = output)
+ new Benchmark(
+ s"Project + Filter Exec ($percentageOfZeros% zeros)",
+ values.toLong,
+ output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -115,7 +118,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
}
def subqueryExecBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Subquery", values, output = output)
+ val benchmark = new Benchmark("Subquery", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -155,7 +158,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
}
def sortExecBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Sort Exec", values, output = output)
+ val benchmark = new Benchmark("Sort Exec", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -185,7 +188,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
}
def expandExecBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expand Exec", values, output = output)
+ val benchmark = new Benchmark("Expand Exec", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -232,7 +235,7 @@ object CometExecBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"BloomFilterAggregate Exec (cardinality $cardinality)",
- values,
+ values.toLong,
output = output)
val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
index 2ca924821c..43971ea9cc 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometPredicateExpressionBenchmark.scala
@@ -32,7 +32,7 @@ import org.apache.comet.CometConf
object CometPredicateExpressionBenchmark extends CometBenchmarkBase {
def inExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("in Expr", values, output = output)
+ val benchmark = new Benchmark("in Expr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
index a5db4f290d..536d02f663 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometReadBenchmark.scala
@@ -52,7 +52,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
// Benchmarks running through spark sql.
val sqlBenchmark =
- new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values, output = output)
+ new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -101,7 +101,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def encryptedScanBenchmark(values: Int, dataType: DataType): Unit = {
// Benchmarks running through spark sql.
val sqlBenchmark =
- new Benchmark(s"SQL Single ${dataType.sql} Encrypted Column Scan", values, output = output)
+ new Benchmark(
+ s"SQL Single ${dataType.sql} Encrypted Column Scan",
+ values.toLong,
+ output = output)
val encoder = Base64.getEncoder
val footerKey =
@@ -189,7 +192,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def decimalScanBenchmark(values: Int, precision: Int, scale: Int): Unit = {
val sqlBenchmark = new Benchmark(
s"SQL Single Decimal(precision: $precision, scale: $scale) Column Scan",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -237,7 +240,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def readerBenchmark(values: Int, dataType: DataType): Unit = {
val sqlBenchmark =
- new Benchmark(s"Parquet reader benchmark for $dataType", values, output = output)
+ new Benchmark(s"Parquet reader benchmark for $dataType", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -318,7 +321,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def numericFilterScanBenchmark(values: Int, fractionOfZeros: Double): Unit = {
val percentageOfZeros = fractionOfZeros * 100
val benchmark =
- new Benchmark(s"Numeric Filter Scan ($percentageOfZeros% zeros)", values, output = output)
+ new Benchmark(
+ s"Numeric Filter Scan ($percentageOfZeros% zeros)",
+ values.toLong,
+ output = output)
withTempPath { dir =>
withTempTable("parquetV1Table", "parquetV2Table") {
@@ -365,7 +371,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def stringWithDictionaryScanBenchmark(values: Int): Unit = {
val sqlBenchmark =
- new Benchmark("String Scan with Dictionary Encoding", values, output = output)
+ new Benchmark("String Scan with Dictionary Encoding", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table", "parquetV2Table") {
@@ -424,7 +430,10 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
val percentageOfNulls = fractionOfNulls * 100
val benchmark =
- new Benchmark(s"String with Nulls Scan ($percentageOfNulls%)", values, output = output)
+ new Benchmark(
+ s"String with Nulls Scan ($percentageOfNulls%)",
+ values.toLong,
+ output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -483,7 +492,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
def columnsBenchmark(values: Int, width: Int): Unit = {
val benchmark =
- new Benchmark(s"Single Column Scan from $width columns", values, output = output)
+ new Benchmark(s"Single Column Scan from $width columns", values.toLong, output = output)
withTempPath { dir =>
withTempTable("t1", "parquetV1Table") {
@@ -533,7 +542,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"Large String Filter Scan ($percentageOfZeros% zeros)",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -584,7 +593,7 @@ class CometReadBaseBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"Sorted Lg Str Filter Scan ($percentageOfZeros% zeros)",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
index 103866331c..3e11285284 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala
@@ -71,7 +71,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL ${dataType.sql} shuffle on array ($partitionNum Partition)",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -123,7 +123,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL ${dataType.sql} shuffle on struct ($partitionNum Partition)",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -182,7 +182,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL ${dataType.sql} Dictionary Shuffle($partitionNum Partition)",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -273,7 +273,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL Single ${dataType.sql} Shuffle($partitionNum Partition) $randomTitle",
- values,
+ values.toLong,
output = output)
withTempPath { dir =>
@@ -359,7 +359,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL Wide ($width cols) ${dataType.sql} Shuffle($partitionNum Partition)",
- values,
+ values.toLong,
output = output)
val projection = (1 to width)
@@ -429,7 +429,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
val benchmark =
new Benchmark(
s"SQL Wide ($width cols) ${dataType.sql} Range Partition Shuffle($partitionNum Partition)",
- values,
+ values.toLong,
output = output)
val projection = (1 to width)
diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
index 0546c91738..3e42a2ece0 100644
--- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala
@@ -32,7 +32,7 @@ import org.apache.comet.CometConf
object CometStringExpressionBenchmark extends CometBenchmarkBase {
def subStringExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Substring Expr", values, output = output)
+ val benchmark = new Benchmark("Substring Expr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -62,7 +62,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def stringSpaceExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("StringSpace Expr", values, output = output)
+ val benchmark = new Benchmark("StringSpace Expr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -92,7 +92,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def asciiExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr ascii", values, output = output)
+ val benchmark = new Benchmark("Expr ascii", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -122,7 +122,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def bitLengthExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr bit_length", values, output = output)
+ val benchmark = new Benchmark("Expr bit_length", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -152,7 +152,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def octetLengthExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr octet_length", values, output = output)
+ val benchmark = new Benchmark("Expr octet_length", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -182,7 +182,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def upperExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr upper", values, output = output)
+ val benchmark = new Benchmark("Expr upper", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -213,7 +213,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def lowerExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr lower", values, output = output)
+ val benchmark = new Benchmark("Expr lower", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -243,7 +243,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def chrExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr chr", values, output = output)
+ val benchmark = new Benchmark("Expr chr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -273,7 +273,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def initCapExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr initCap", values, output = output)
+ val benchmark = new Benchmark("Expr initCap", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -303,7 +303,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def trimExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr trim", values, output = output)
+ val benchmark = new Benchmark("Expr trim", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -333,7 +333,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def concatwsExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr concatws", values, output = output)
+ val benchmark = new Benchmark("Expr concatws", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -363,7 +363,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def lengthExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr length", values, output = output)
+ val benchmark = new Benchmark("Expr length", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -393,7 +393,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def repeatExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr repeat", values, output = output)
+ val benchmark = new Benchmark("Expr repeat", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -423,7 +423,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def reverseExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr reverse", values, output = output)
+ val benchmark = new Benchmark("Expr reverse", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -453,7 +453,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def instrExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr instr", values, output = output)
+ val benchmark = new Benchmark("Expr instr", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -483,7 +483,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def replaceExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr replace", values, output = output)
+ val benchmark = new Benchmark("Expr replace", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {
@@ -513,7 +513,7 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
}
def translateExprBenchmark(values: Int): Unit = {
- val benchmark = new Benchmark("Expr translate", values, output = output)
+ val benchmark = new Benchmark("Expr translate", values.toLong, output = output)
withTempPath { dir =>
withTempTable("parquetV1Table") {