Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620) (#6150)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20240620)

* Fix Build due to ClickHouse/ClickHouse#61047

* fix style

* Using assertResult instead of assert, so we can know the actual result once failed.

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2024
1 parent 38d7e10 commit 79e1d58
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data") // for local test
}

override protected def beforeEach(): Unit = {
Expand Down Expand Up @@ -139,7 +140,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
Expand All @@ -151,8 +152,8 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
assertResult(1)(addFiles.size)
assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_hdfs")
}
Expand Down Expand Up @@ -224,32 +225,30 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
assertResult("l_shipdate,l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_shipdate,l_orderkey"))
assert(
.mkString(","))
assertResult("l_shipdate")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_shipdate"))
.mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assert(addFiles.size == 1)
assert(addFiles.head.rows == 600572)
assertResult(1)(addFiles.size)
assertResult(600572)(addFiles.head.rows)
}
spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
}
Expand Down Expand Up @@ -386,51 +385,49 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
df =>
val result = df.collect()
assert(result.length == 4)
assert(result(0).getString(0).equals("A"))
assert(result(0).getString(1).equals("F"))
assert(result(0).getDouble(2) == 7578058.0)
assertResult(4)(result.length)
assertResult("A")(result(0).getString(0))
assertResult("F")(result(0).getString(1))
assertResult(7578058.0)(result(0).getDouble(2))

assert(result(2).getString(0).equals("N"))
assert(result(2).getString(1).equals("O"))
assert(result(2).getDouble(2) == 7454519.0)
assertResult("N")(result(2).getString(0))
assertResult("O")(result(2).getString(1))
assertResult(7454519.0)(result(2).getDouble(2))

val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
assert(mergetreeScan.metrics("numFiles").value == 6)
assertResult(6)(mergetreeScan.metrics("numFiles").value)

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(
.mkString(","))
assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.primaryKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
.mkString(","))
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
.partitionColumns
.head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 6)
assert(addFiles.map(_.rows).sum == 750735)
assertResult(6)(addFiles.size)
assertResult(750735)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}
Expand Down Expand Up @@ -503,36 +500,35 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec(0)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
if (sparkVersion.equals("3.2")) {
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
} else {
assert(
assertResult("l_partkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_partkey"))
.mkString(","))
}
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
.partitionColumns
.head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 12)
assert(addFiles.map(_.rows).sum == 600572)
assertResult(12)(addFiles.size)
assertResult(600572)(addFiles.map(_.rows).sum)
}
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}
Expand Down Expand Up @@ -585,39 +581,38 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assert(scanExec.size == 1)
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec(0)
val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
assert(
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
assertResult("l_orderkey")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.orderByKeyOption
.get
.mkString(",")
.equals("l_orderkey"))
.mkString(","))
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1)
assert(
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
assertResult("l_returnflag")(
ClickHouseTableV2
.getTable(fileIndex.deltaLog)
.partitionColumns(0)
.equals("l_returnflag"))
.partitionColumns
.head)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])

assert(addFiles.size == 12)
assert(addFiles.map(_.rows).sum == 600572)
assertResult(12)(addFiles.size)
assertResult(600572)(addFiles.map(_.rows).sum)
}

val result = spark.read
.format("clickhouse")
.load(dataPath)
.count()
assert(result == 600572)
assertResult(600572)(result)
}
}
// scalastyle:off line.size.limit
Loading

0 comments on commit 79e1d58

Please sign in to comment.