Skip to content

Commit

Permalink
[Gluten-301] Correct metrics time (oap-project#304)
Browse files Browse the repository at this point in the history
Correct the scan time of the metrics
  • Loading branch information
zzcclp authored Aug 1, 2022
1 parent 99ff947 commit 5c6b803
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ class CloseableCHColumnBatchIterator(itr: Iterator[ColumnarBatch],
pipelineTime: Option[SQLMetric] = None
) extends Iterator[ColumnarBatch] with Logging {
var cb: ColumnarBatch = null
var scanTime = 0L

override def hasNext: Boolean = {
val beforeTime = System.nanoTime()
val res = itr.hasNext
pipelineTime.map(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime))
scanTime += System.nanoTime() - beforeTime
if (!res) {
pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(scanTime))
}
res
}

Expand All @@ -49,7 +53,7 @@ class CloseableCHColumnBatchIterator(itr: Iterator[ColumnarBatch],
val beforeTime = System.nanoTime()
closeCurrentBatch()
cb = itr.next()
pipelineTime.map(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime))
scanTime += System.nanoTime() - beforeTime
cb
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,18 @@ object DSV2BenchmarkTest {

val sessionBuilderTmp = SparkSession
.builder()
.appName("Gluten-Benchmark")
.appName("Gluten-TPCH-Benchmark")

val libPath = "/home/myubuntu/Works/c_cpp_projects/Kyligence-ClickHouse-1/" +
"cmake-build-release/utils/local-engine/libch.so"
val sessionBuilder = if (!configed) {
val sessionBuilderTmp1 = sessionBuilderTmp
.master("local[8]")
.master("local[6]")
.config("spark.driver.memory", "30G")
.config("spark.driver.memoryOverhead", "10G")
.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
.config("spark.default.parallelism", 1)
.config("spark.sql.shuffle.partitions", 8)
.config("spark.sql.shuffle.partitions", 6)
.config("spark.sql.adaptive.enabled", "false")
.config("spark.sql.files.maxPartitionBytes", 1024 << 10 << 10) // default is 128M
.config("spark.sql.files.openCostInBytes", 1024 << 10 << 10) // default is 4M
Expand Down Expand Up @@ -119,13 +119,14 @@ object DSV2BenchmarkTest {
.config("spark.gluten.sql.columnar.iterator", "true")
.config("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.config("spark.gluten.sql.enable.native.validation", "false")
.config("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true")
// .config("spark.gluten.sql.columnar.extension.scan.rdd", "false")
// .config("spark.gluten.sql.columnar.sort", "false")
// .config("spark.sql.codegen.wholeStage", "false")
.config("spark.sql.autoBroadcastJoinThreshold", "10MB")
.config("spark.sql.exchange.reuse", "true")
.config("spark.gluten.sql.columnar.forceshuffledhashjoin", "true")
.config("spark.gluten.sql.columnar.coalesce.batches", "true")
.config("spark.gluten.sql.columnar.coalesce.batches", "false")
// .config("spark.gluten.sql.columnar.filescan", "true")
// .config("spark.sql.optimizeNullAwareAntiJoin", "false")
// .config("spark.sql.join.preferSortMergeJoin", "false")
Expand Down Expand Up @@ -198,9 +199,9 @@ object DSV2BenchmarkTest {
// createTempView(spark, "/data1/test_output/tpch-data-sf10", "parquet")
// createGlobalTempView(spark)
// testJoinIssue(spark)
testTPCHOne(spark, executedCnt)
// testTPCHOne(spark, executedCnt)
// testSepScanRDD(spark, executedCnt)
// testTPCHAll(spark)
testTPCHAll(spark)
// benchmarkTPCH(spark, executedCnt)

System.out.println("waiting for finishing")
Expand Down Expand Up @@ -232,7 +233,6 @@ object DSV2BenchmarkTest {
| AND l_shipdate < date'1994-01-01' + interval 1 year
| AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
| AND l_quantity < 24;
|
|""".stripMargin) // .show(30, false)
df.explain(false)
val plan = df.queryExecution.executedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
extends Iterator[ColumnarBatch]
with Logging {
var cb: ColumnarBatch = _
var scanTime = 0L

override def hasNext: Boolean = {
val beforeTime = System.nanoTime()
val res = itr.hasNext
pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime))
scanTime += System.nanoTime() - beforeTime
if (!res) {
pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(scanTime))
}
res
}

Expand All @@ -50,7 +54,7 @@ class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
val beforeTime = System.nanoTime()
closeCurrentBatch()
cb = itr.next()
pipelineTime.foreach(t => t += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeTime))
scanTime += System.nanoTime() - beforeTime
cb
}

Expand Down

0 comments on commit 5c6b803

Please sign in to comment.