diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala index 56b8f056bc25..572d0cd50a6e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala @@ -57,6 +57,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") + // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data") // for local test } override protected def beforeEach(): Unit = { @@ -139,7 +140,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -151,8 +152,8 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 1) - assert(addFiles.head.rows == 600572) + assertResult(1)(addFiles.size) + assertResult(600572)(addFiles.head.rows) } spark.sql("drop table lineitem_mergetree_hdfs") } @@ -224,7 +225,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -232,24 +233,22 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 1) - assert(addFiles.head.rows == 600572) + assertResult(1)(addFiles.size) + assertResult(600572)(addFiles.head.rows) } spark.sql("drop table lineitem_mergetree_orderbykey_hdfs") } @@ -386,51 +385,49 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.length == 4) - assert(result(0).getString(0).equals("A")) - assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 7578058.0) + assertResult(4)(result.length) + assertResult("A")(result(0).getString(0)) + assertResult("F")(result(0).getString(1)) + assertResult(7578058.0)(result(0).getDouble(2)) - assert(result(2).getString(0).equals("N")) - assert(result(2).getString(1).equals("O")) - assert(result(2).getDouble(2) == 7454519.0) + assertResult("N")(result(2).getString(0)) + assertResult("O")(result(2).getString(1)) + assertResult(7454519.0)(result(2).getDouble(2)) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - assert(mergetreeScan.metrics("numFiles").value == 6) + assertResult(6)(mergetreeScan.metrics("numFiles").value) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert( + .mkString(",")) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + .mkString(",")) + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 6) - assert(addFiles.map(_.rows).sum == 750735) + assertResult(6)(addFiles.size) + assertResult(750735)(addFiles.map(_.rows).sum) } spark.sql("drop table lineitem_mergetree_partition_hdfs") } @@ -503,36 +500,35 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined) if (sparkVersion.equals("3.2")) { assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assert( + assertResult("l_partkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_partkey")) + .mkString(",")) } assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 12) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(12)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } spark.sql("drop table lineitem_mergetree_bucket_hdfs") } @@ -585,39 +581,38 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 12) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(12)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } val result = spark.read .format("clickhouse") .load(dataPath) .count() - assert(result == 600572) + assertResult(600572)(result) } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index c5dc3a23754e..30f443265cae 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "error") + // .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/data") // for local test } override protected def beforeEach(): Unit = { @@ -152,7 +153,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -164,8 +165,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 1) - assert(addFiles.head.rows == 600572) + assertResult(1)(addFiles.size) + assertResult(600572)(addFiles.head.rows) } spark.sql("drop table lineitem_mergetree_s3") // clean up } @@ -237,7 +238,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) @@ -245,24 +246,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_shipdate,l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_shipdate,l_orderkey")) - assert( + .mkString(",")) + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_shipdate")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 1) - assert(addFiles.head.rows == 600572) + assertResult(1)(addFiles.size) + assertResult(600572)(addFiles.head.rows) } spark.sql("drop table lineitem_mergetree_orderbykey_s3") } @@ -399,51 +398,49 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite runTPCHQueryBySQL(1, sqlStr, compareResult = false) { df => val result = df.collect() - assert(result.length == 4) - assert(result(0).getString(0).equals("A")) - assert(result(0).getString(1).equals("F")) - assert(result(0).getDouble(2) == 7578058.0) + assertResult(4)(result.length) + assertResult("A")(result(0).getString(0)) + assertResult("F")(result(0).getString(1)) + assertResult(7578058.0)(result(0).getDouble(2)) - assert(result(2).getString(0).equals("N")) - assert(result(2).getString(1).equals("O")) - assert(result(2).getDouble(2) == 7454519.0) + assertResult("N")(result(2).getString(0)) + assertResult("O")(result(2).getString(1)) + assertResult(7454519.0)(result(2).getDouble(2)) val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) - assert(mergetreeScan.metrics("numFiles").value == 6) + assertResult(6)(mergetreeScan.metrics("numFiles").value) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert( + .mkString(",")) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .primaryKeyOption .get - .mkString(",") - .equals("l_orderkey")) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + .mkString(",")) + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 6) - assert(addFiles.map(_.rows).sum == 750735) + assertResult(6)(addFiles.size) + assertResult(750735)(addFiles.map(_.rows).sum) } spark.sql("drop table lineitem_mergetree_partition_s3") @@ -517,36 +514,35 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined) if (sparkVersion.equals("3.2")) { assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assert( + assertResult("l_partkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_partkey")) + .mkString(",")) } assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 12) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(12)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } spark.sql("drop table lineitem_mergetree_bucket_s3") } @@ -599,39 +595,38 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty) - assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty) - assert( + assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined) + assertResult("l_orderkey")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption .get - .mkString(",") - .equals("l_orderkey")) + .mkString(",")) assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty) - assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 1) - assert( + assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) + assertResult("l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) - .partitionColumns(0) - .equals("l_returnflag")) + .partitionColumns + .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assert(addFiles.size == 12) - assert(addFiles.map(_.rows).sum == 600572) + assertResult(12)(addFiles.size) + assertResult(600572)(addFiles.map(_.rows).sum) } val result = spark.read .format("clickhouse") .load(dataPath) .count() - assert(result == 600572) + assertResult(600572)(result) } test("test mergetree insert with optimize basic") { @@ -639,8 +634,8 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite val dataPath = s"s3a://$BUCKET_NAME/$tableName" withSQLConf( - ("spark.databricks.delta.optimize.minFileSize" -> "200000000"), - ("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true") + "spark.databricks.delta.optimize.minFileSize" -> "200000000", + "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert" -> "true" ) { spark.sql(s""" |DROP TABLE IF EXISTS $tableName; @@ -654,7 +649,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite |""".stripMargin) val ret = spark.sql(s"select count(*) from $tableName").collect() - assert(ret.apply(0).get(0) == 600572) + assertResult(600572)(ret.apply(0).get(0)) assert( !new File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists()) } @@ -713,22 +708,22 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite |""".stripMargin withSQLConf( - ("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> "true")) { + "spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index" -> "true") { runTPCHQueryBySQL(6, sqlStr) { df => val scanExec = collect(df.queryExecution.executedPlan) { case f: FileSourceScanExecTransformer => f } - assert(scanExec.size == 1) + assertResult(1)(scanExec.size) - val mergetreeScan = scanExec(0) + val mergetreeScan = scanExec.head assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) val plans = collect(df.queryExecution.executedPlan) { case scanExec: BasicScanExecTransformer => scanExec } - assert(plans.size == 1) - assert(plans(0).getSplitInfos.size == 1) + assertResult(1)(plans.size) + assertResult(1)(plans.head.getSplitInfos.size) } } } diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 2bbb2945334b..1e3ac8d88ea9 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20240616 -CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da +CH_BRANCH=rebase_ch/20240620 +CH_COMMIT=f9c3886a767 diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp index 259af5698aa1..c1f2391a282c 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp @@ -87,8 +87,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter( metadata_snapshot = storage->getInMemoryMetadataPtr(); header = metadata_snapshot->getSampleBlock(); const DB::Settings & settings = context->getSettingsRef(); - squashing_transform - = std::make_unique(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); + squashing = std::make_unique(header, settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes); if (!partition_dir.empty()) extractPartitionValues(partition_dir, partition_values); @@ -105,25 +104,33 @@ SparkMergeTreeWriter::SparkMergeTreeWriter( merge_limit_parts = limit_cnt_field.get() <= 0 ? merge_limit_parts : limit_cnt_field.get(); } -void SparkMergeTreeWriter::write(DB::Block & block) +void SparkMergeTreeWriter::write(const DB::Block & block) { auto new_block = removeColumnSuffix(block); if (auto converter = ActionsDAG::makeConvertingActions( new_block.getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), DB::ActionsDAG::MatchColumnsMode::Position)) ExpressionActions(converter).execute(new_block); - if (auto add_block = squashing_transform->add(new_block)) + bool has_part = chunkToPart(squashing->add({new_block.getColumns(), new_block.rows()})); + + if (has_part && merge_after_insert) + checkAndMerge(); +} + +bool SparkMergeTreeWriter::chunkToPart(Chunk && chunk) +{ + if (chunk.hasChunkInfo()) { - bool has_part = blockToPart(add_block); - if (has_part && merge_after_insert) - checkAndMerge(); + Chunk squash_chunk = DB::Squashing::squash(std::move(chunk)); + Block result = header.cloneWithColumns(squash_chunk.getColumns()); + return blockToPart(result); } + return false; } bool SparkMergeTreeWriter::blockToPart(Block & block) { - auto blocks_with_partition - = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, metadata_snapshot, context); + auto blocks_with_partition = MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, metadata_snapshot, context); if (blocks_with_partition.empty()) return false; @@ -180,12 +187,7 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t before_write_memory) void SparkMergeTreeWriter::finalize() { - if (auto block = squashing_transform->add({})) - { - if (block.rows()) - blockToPart(block); - } - + chunkToPart(squashing->flush()); if (merge_after_insert) finalizeMerge(); diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h index 5c4b66403303..2b07521ede3a 100644 --- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h @@ -17,7 +17,7 @@ #pragma once #include -#include +#include #include #include #include @@ -59,13 +59,15 @@ class SparkMergeTreeWriter const String & partition_dir_ = "", const String & bucket_dir_ = ""); - void write(DB::Block & block); + void write(const DB::Block & block); void finalize(); std::vector getAllPartInfo(); private: - void - writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part, DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); + void writeTempPart( + MergeTreeDataWriter::TemporaryPart & temp_part, + DB::BlockWithPartition & block_with_partition, + const DB::StorageMetadataPtr & metadata_snapshot); DB::MergeTreeDataWriter::TemporaryPart writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & metadata_snapshot); void checkAndMerge(bool force = false); @@ -75,6 +77,7 @@ class SparkMergeTreeWriter void saveMetadata(); void commitPartToRemoteStorageIfNeeded(); void finalizeMerge(); + bool chunkToPart(Chunk && chunk); bool blockToPart(Block & block); CustomStorageMergeTreePtr storage = nullptr; @@ -87,7 +90,7 @@ class SparkMergeTreeWriter String bucket_dir; DB::ContextPtr context; - std::unique_ptr squashing_transform; + std::unique_ptr squashing; int part_num = 1; ConcurrentDeque new_parts; std::unordered_map partition_values;