Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116) (#8544)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116)

* Fix ut due to ClickHouse/ClickHouse#73651

* In C++, The declaration order determines the construction order: member variables are constructed in the order they are declared, and the order in the initialization list does not affect this. The destruction order is the reverse of the construction order: the last constructed member is the first to be destructed.

since output depends on write_buffer, we need declare write_buffer first.

* Another way to fix ClickHouse/ClickHouse#73651, this also fix "write into hdfs in spark 3.5"

* fix style

---------

Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 28e1ac5 commit ac8e03a
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,8 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
updateInputMetrics,
updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull)

context.addTaskFailureListener(
(ctx, _) => {
if (ctx.isInterrupted()) {
iter.cancel()
}
})
context.addTaskFailureListener((ctx, _) => { iter.cancel() })

context.addTaskCompletionListener[Unit](_ => iter.close())
new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ object RuntimeSettings {
.doc("https://clickhouse.com/docs/en/operations/settings/query-complexity#settings-max_bytes_before_external_sort")
.longConf
.createWithDefault(0)

// TODO: support check value
val OUTPUT_FORMAT_COMPRESSION_LEVEL =
buildConf(runtimeSettings("output_format_compression_level"))
.doc(s"""https://clickhouse.com/docs/en/operations/settings/settings#output_format_compression_level
| Notes: we always use Snappy compression, and Snappy doesn't support compression level.
| Currently, we ONLY set it in UT.
|""".stripMargin)
.longConf
.createWithDefault(Integer.MIN_VALUE & 0xffffffffL)
// .checkValue(v => v >= 0, "COMPRESSION LEVEL must be greater than 0")
// scalastyle:on line.size.limit

/** Gluten Configuration */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class CHColumnarWriteFilesRDD(
* otherwise, we need to access ColumnarBatch row by row, which is not efficient.
*/
val writeResults = CHExecUtil.c2r(resultColumnarBatch).map(_.copy()).toSeq
// TODO: we need close iterator here before processing the result.
// TODO: task commit time
// TODO: get the schema from result ColumnarBatch and verify it.
assert(!iter.hasNext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.GlutenException

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{functions, DataFrame, Row}
import org.apache.spark.sql.execution.LocalTableScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -1464,16 +1465,14 @@ class GlutenClickHouseExcelFormatSuite
fileName
}

/** TODO: fix the issue and test in spark 3.5 */
testSparkVersionLE33("write into hdfs") {
test("write into hdfs") {

/**
* There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't
* close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor
* is destroyed, but before that, the file moved by spark committer.
*/
val tableName = "write_into_hdfs"
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/"
val format = "parquet"
val sql =
s"""
Expand All @@ -1485,4 +1484,51 @@ class GlutenClickHouseExcelFormatSuite
testFileFormatBase(tablePath, format, sql, df => {})
}
}

// TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
testWithSpecifiedSparkVersion(
"write failed if set wrong snappy compression codec level",
Some("3.5")) {
// TODO: remove duplicated test codes
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/"
val format = "parquet"
val sql =
s"""
| select *
| from $format.`$tablePath`
| where long_field > 30
|""".stripMargin

withSQLConf(
(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"),
(
RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key,
RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.defaultValue.get.toString)
) {
testFileFormatBase(tablePath, format, sql, df => {})
}

// we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2
withSQLConf(
(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"),
(RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, "3")
) {
val sparkError = intercept[SparkException] {
testFileFormatBase(tablePath, format, sql, df => {})
}

// throw at org.apache.spark.sql.execution.CHColumnarWriteFilesRDD
val causeOuter = sparkError.getCause
assert(causeOuter.isInstanceOf[SparkException])
assert(causeOuter.getMessage.contains("Task failed while writing rows to output path: hdfs"))

// throw at the writing file
val causeInner = causeOuter.getCause
assert(causeInner.isInstanceOf[GlutenException])
assert(
causeInner.getMessage.contains(
"Invalid: Codec 'snappy' doesn't support setting a compression level"))
}

}
}
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20250115
CH_COMMIT=8e0d5eaf0fc
CH_BRANCH=rebase_ch/20250116
CH_COMMIT=a260339b40c
22 changes: 16 additions & 6 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
#include <Storages/Output/WriteBufferBuilder.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <arrow/util/compression.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <sys/resource.h>
#include <Poco/Logger.h>
Expand Down Expand Up @@ -91,6 +92,7 @@ extern const SettingsDouble max_bytes_ratio_before_external_sort;
extern const SettingsBool query_plan_merge_filters;
extern const SettingsBool compile_expressions;
extern const SettingsShortCircuitFunctionEvaluation short_circuit_function_evaluation;
extern const SettingsUInt64 output_format_compression_level;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -640,23 +642,31 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
settings.set("input_format_parquet_enable_row_group_prefetch", false);
settings.set("output_format_parquet_use_custom_encoder", false);

//1.
// TODO: we need set Setting::max_threads to 1 by default, but now we can't get correct metrics for the some query if we set it to 1.
// settings[Setting::max_threads] = 1;

/// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539
/// if true, we can't get correct metrics for the query
/// 2. After https://github.com/ClickHouse/ClickHouse/pull/71539
/// Set false to query_plan_merge_filters.
/// If true, we can't get correct metrics for the query
settings[Setting::query_plan_merge_filters] = false;

/// 3. After https://github.com/ClickHouse/ClickHouse/pull/70598.
/// Set false to compile_expressions to avoid dead loop.
/// TODO: FIXME set true again.
/// We now set BuildQueryPipelineSettings according to config.
// TODO: FIXME. Set false after https://github.com/ClickHouse/ClickHouse/pull/70598.
settings[Setting::compile_expressions] = false;
settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE;
///

// After https://github.com/ClickHouse/ClickHouse/pull/73422
// Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0
/// 4. After https://github.com/ClickHouse/ClickHouse/pull/73422
/// Since we already set max_bytes_before_external_sort, set max_bytes_ratio_before_external_sort to 0
settings[Setting::max_bytes_ratio_before_external_sort] = 0.;

/// 5. After https://github.com/ClickHouse/ClickHouse/pull/73651.
/// See following settings, we always use Snappy compression for Parquet, however after https://github.com/ClickHouse/ClickHouse/pull/73651,
/// output_format_compression_level is set to 3, which is wrong, since snappy does not support it.
settings[Setting::output_format_compression_level] = arrow::util::kUseDefaultCompressionLevel;

for (const auto & [key, value] : spark_conf_map)
{
// Firstly apply spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to settings
Expand Down
12 changes: 11 additions & 1 deletion cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,24 @@ class SubstraitFileSink final : public DB::SinkToStorage
}
void onFinish() override
{
if (output_format_) [[unlikely]]
if (output_format_)
{
output_format_->finalizeOutput();
/// We need close reset output_format_ here before return to spark, because the file is closed in ~WriteBufferFromHDFSImpl().
/// So that Spark Commit protocol can move the file safely.
output_format_.reset();
assert(delta_stats_.row_count > 0);
if (stats_)
stats_->collectStats(relative_path_, partition_id_, delta_stats_);
}
}
void onCancel() noexcept override
{
if (output_format_) {
output_format_->cancel();
output_format_.reset();
}
}
};

class SparkPartitionedBaseSink : public DB::PartitionedSink
Expand Down
7 changes: 6 additions & 1 deletion cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ class OutputFormatFile
public:
struct OutputFormat
{
DB::OutputFormatPtr output;
std::unique_ptr<DB::WriteBuffer> write_buffer;
DB::OutputFormatPtr output;
void finalizeOutput() const
{
output->finalize();
output->flush();
write_buffer->finalize();
}
void cancel()
{
output.reset();
write_buffer->finalize();
}
};
using OutputFormatPtr = std::shared_ptr<OutputFormat>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
# include <Processors/Formats/Impl/ArrowBufferedStreams.h>
# include <Processors/Formats/Impl/CHColumnToArrowColumn.h>
# include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
# include <parquet/arrow/writer.h>

namespace local_engine
{
Expand Down

0 comments on commit ac8e03a

Please sign in to comment.