diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index e34f571961d69..fa6863b8774ea 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT 4.0.0 diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 139b8b3131ff0..21ae342a22637 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.delta.DeltaLogFileIndex import org.apache.spark.sql.delta.rules.CHOptimizeMetadataOnlyDeltaQuery import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.V2CommandExec import org.apache.spark.util.SparkPlanRules @@ -132,6 +133,7 @@ object CHRuleApi { c => intercept( SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session))) + injector.injectPost(c => GlutenNoopWriterRule.apply(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala index 1ebdded5e1892..dc01cf7c409c2 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala @@ -3362,5 +3362,20 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends GlutenClickHouseTPCHAbstr val sql = "select * from test_filter where (c1, c2) in (('a1', 'b1'), ('a2', 'b2'))" compareResultsAgainstVanillaSpark(sql, true, { _ => }) } + + test("GLUTEN-8343: Cast number to decimal") { + val create_table_sql = "create table test_tbl_8343(id bigint, d bigint, f double) using parquet" + val insert_data_sql = + "insert into test_tbl_8343 values(1, 55, 55.12345), (2, 137438953483, 137438953483.12345), (3, -12, -12.123), (4, 0, 0.0001), (5, NULL, NULL), (6, %d, NULL), (7, %d, NULL)" + .format(Double.MaxValue.longValue(), Double.MinValue.longValue()) + val query_sql = + "select cast(d as decimal(1, 0)), cast(d as decimal(9, 1)), cast((f-55.12345) as decimal(9,1)), cast(f as decimal(4,2)), " + + "cast(f as decimal(32, 3)), cast(f as decimal(2, 1)), cast(d as decimal(38,3)) from test_tbl_8343" + spark.sql(create_table_sql); + spark.sql(insert_data_sql); + compareResultsAgainstVanillaSpark(query_sql, true, { _ => }) + spark.sql("drop table test_tbl_8343") + } + } // scalastyle:on line.size.limit diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 240b2218641cd..5c96fa6246b36 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT 4.0.0 diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 4f19375e85ce1..f3c75cd983187 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -36,6 +36,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.WriteFilesExec +import org.apache.spark.sql.execution.datasources.noop.GlutenNoopWriterRule import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.Exchange import org.apache.spark.sql.execution.joins.BaseJoinExec @@ -110,6 +111,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => each(c.session))) injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPost(c => GlutenNoopWriterRule(c.session)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) @@ -188,6 +190,7 @@ object VeloxRuleApi { .getExtendedColumnarPostRules() .foreach(each => injector.injectPostTransform(c => each(c.session))) injector.injectPostTransform(c => ColumnarCollapseTransformStages(c.glutenConf)) + injector.injectPostTransform(c => GlutenNoopWriterRule(c.session)) injector.injectPostTransform(c => RemoveGlutenTableCacheColumnarToRow(c.session)) injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectPostTransform(_ => RemoveFallbackTagRule()) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala index 056d19e55070f..435fd239b3643 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala @@ -30,6 +30,7 @@ import org.apache.gluten.vectorized.{ArrowColumnarRow, ArrowWritableColumnVector import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, NamedExpression, NaNvl, ScalaUDF} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.hive.HiveUdfUtil @@ -75,6 +76,14 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)( override def output: Seq[Attribute] = child.output ++ replacedAliasUdf.map(_.toAttribute) + override def doCanonicalize(): ColumnarPartialProjectExec = { + val canonicalized = original.canonicalized.asInstanceOf[ProjectExec] + this.copy( + original = canonicalized, + child = child.canonicalized + )(replacedAliasUdf.map(QueryPlan.normalizeExpressions(_, child.output))) + } + override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType override def rowType0(): Convention.RowType = Convention.RowType.None diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 3e28ad21e9ad6..79ca4eb69bb15 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20250107 -CH_COMMIT=01d2a08fb01 +CH_BRANCH=rebase_ch/20250110 +CH_COMMIT=eafc5ef70b3 diff --git a/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp b/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp index 8b5a7eff65dbd..73fc7b5d4e401 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp +++ b/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace DB { @@ -34,6 +35,7 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_COLUMN; extern const int TYPE_MISMATCH; +extern const int NOT_IMPLEMENTED; } } @@ -78,7 +80,7 @@ class FunctionCheckDecimalOverflow : public IFunction DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override { - if (!isDecimal(arguments[0].type) || !isInteger(arguments[1].type) || !isInteger(arguments[2].type)) + if ((!isDecimal(arguments[0].type) && !isNativeNumber(arguments[0].type)) || !isInteger(arguments[1].type) || !isInteger(arguments[2].type)) throw Exception( ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} {} {} of argument of function {}", @@ -86,17 +88,14 @@ class FunctionCheckDecimalOverflow : public IFunction arguments[1].type->getName(), arguments[2].type->getName(), getName()); - UInt32 precision = extractArgument(arguments[1]); UInt32 scale = extractArgument(arguments[2]); - auto return_type = createDecimal(precision, scale); if constexpr (exception_mode == CheckExceptionMode::Null) { if (!arguments[0].type->isNullable()) return std::make_shared(return_type); } - return return_type; } @@ -113,19 +112,15 @@ class FunctionCheckDecimalOverflow : public IFunction using Types = std::decay_t; using FromDataType = typename Types::LeftType; using ToDataType = typename Types::RightType; - - if constexpr (IsDataTypeDecimal) + if constexpr (IsDataTypeDecimal || IsDataTypeNumber) { using FromFieldType = typename FromDataType::FieldType; - using ColVecType = ColumnDecimal; - - if (const ColVecType * col_vec = checkAndGetColumn(src_column.column.get())) + if (const ColumnVectorOrDecimal * col_vec = checkAndGetColumn>(src_column.column.get())) { - executeInternal(*col_vec, result_column, input_rows_count, precision, scale); + executeInternal(*col_vec, result_column, input_rows_count, precision, scale); return true; } } - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal column while execute function {}", getName()); }; @@ -146,17 +141,28 @@ class FunctionCheckDecimalOverflow : public IFunction } private: - template + template + requires(IsDataTypeDecimal && (IsDataTypeDecimal || IsDataTypeNumber)) static void executeInternal( - const ColumnDecimal & col_source, ColumnPtr & result_column, size_t input_rows_count, UInt32 precision, UInt32 scale_to) + const ColumnVectorOrDecimal & col_source, ColumnPtr & result_column, size_t input_rows_count, UInt32 precision, UInt32 scale_to) { using ToFieldType = typename ToDataType::FieldType; using ToColumnType = typename ToDataType::ColumnType; + using T = typename FromDataType::FieldType; ColumnUInt8::MutablePtr col_null_map_to; ColumnUInt8::Container * vec_null_map_to [[maybe_unused]] = nullptr; - auto scale_from = col_source.getScale(); - + UInt32 scale_from = 0; + using ToFieldNativeType = typename ToFieldType::NativeType; + ToFieldNativeType decimal_int_part_max = 0; + ToFieldNativeType decimal_int_part_min = 0; + if constexpr (IsDataTypeDecimal) + scale_from = col_source.getScale(); + else + { + decimal_int_part_max = DecimalUtils::scaleMultiplier(precision - scale_to) - 1; + decimal_int_part_min = 1 - DecimalUtils::scaleMultiplier(precision - scale_to); + } if constexpr (exception_mode == CheckExceptionMode::Null) { col_null_map_to = ColumnUInt8::create(input_rows_count, false); @@ -170,17 +176,17 @@ class FunctionCheckDecimalOverflow : public IFunction auto & datas = col_source.getData(); for (size_t i = 0; i < input_rows_count; ++i) { - // bool overflow = outOfDigits(datas[i], precision, scale_from, scale_to); ToFieldType result; - bool success = convertToDecimalImpl(datas[i], precision, scale_from, scale_to, result); - - if (success) + bool success = convertToDecimalImpl(datas[i], precision, scale_from, scale_to, decimal_int_part_max, decimal_int_part_min, result); + if constexpr (exception_mode == CheckExceptionMode::Null) + { vec_to[i] = static_cast(result); + (*vec_null_map_to)[i] = !success; + } else { - vec_to[i] = static_cast(0); - if constexpr (exception_mode == CheckExceptionMode::Null) - (*vec_null_map_to)[i] = static_cast(1); + if (success) + vec_to[i] = static_cast(result); else throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "Decimal value is overflow."); } @@ -192,20 +198,50 @@ class FunctionCheckDecimalOverflow : public IFunction result_column = std::move(col_to); } - template + template requires(IsDataTypeDecimal) static bool convertToDecimalImpl( - const FromFieldType & decimal, UInt32 precision_to, UInt32 scale_from, UInt32 scale_to, typename ToDataType::FieldType & result) + const FromDataType::FieldType & value, + UInt32 precision_to, + UInt32 scale_from, + UInt32 scale_to, + typename ToDataType::FieldType::NativeType decimal_int_part_max, + typename ToDataType::FieldType::NativeType decimal_int_part_min, + typename ToDataType::FieldType & result) { + using FromFieldType = typename FromDataType::FieldType; if constexpr (std::is_same_v) - return convertDecimalsImpl, ToDataType>(decimal, precision_to, scale_from, scale_to, result); - + return convertDecimalsImpl, ToDataType>(value, precision_to, scale_from, scale_to, result); else if constexpr (std::is_same_v) - return convertDecimalsImpl, ToDataType>(decimal, precision_to, scale_from, scale_to, result); + return convertDecimalsImpl, ToDataType>(value, precision_to, scale_from, scale_to, result); else if constexpr (std::is_same_v) - return convertDecimalsImpl, ToDataType>(decimal, precision_to, scale_from, scale_to, result); + return convertDecimalsImpl, ToDataType>(value, precision_to, scale_from, scale_to, result); + else if constexpr (std::is_same_v) + return convertDecimalsImpl, ToDataType>(value, precision_to, scale_from, scale_to, result); + else if constexpr (IsDataTypeNumber && !std::is_same_v) + return convertNumberToDecimalImpl, ToDataType>(value, scale_to, decimal_int_part_max, decimal_int_part_min, result); else - return convertDecimalsImpl, ToDataType>(decimal, precision_to, scale_from, scale_to, result); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Convert from {} type to decimal type is not implemented.", typeid(value).name()); + } + + template + requires(IsDataTypeNumber && IsDataTypeDecimal) + static inline bool convertNumberToDecimalImpl( + const typename FromDataType::FieldType & value, + UInt32 scale, + typename ToDataType::FieldType::NativeType decimal_int_part_max, + typename ToDataType::FieldType::NativeType decimal_int_part_min, + typename ToDataType::FieldType & result) + { + using FromFieldType = typename FromDataType::FieldType; + using ToFieldNativeType = typename ToDataType::FieldType::NativeType; + ToFieldNativeType int_part = 0; + if constexpr (std::is_same_v || std::is_same_v) + int_part = static_cast(value); + else + int_part = value; + + return int_part >= decimal_int_part_min && int_part <= decimal_int_part_max && tryConvertToDecimal(value, scale, result); } template diff --git a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp index e88db198953e3..b5ac3aedf5482 100644 --- a/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp +++ b/cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp @@ -964,10 +964,10 @@ jobject create(JNIEnv * env, const SparkRowInfo & spark_row_info) { auto * offsets_arr = env->NewLongArray(spark_row_info.getNumRows()); const auto * offsets_src = spark_row_info.getOffsets().data(); - env->SetLongArrayRegion(offsets_arr, 0, spark_row_info.getNumRows(), static_cast(offsets_src)); + env->SetLongArrayRegion(offsets_arr, 0, spark_row_info.getNumRows(), reinterpret_cast(offsets_src)); auto * lengths_arr = env->NewLongArray(spark_row_info.getNumRows()); const auto * lengths_src = spark_row_info.getLengths().data(); - env->SetLongArrayRegion(lengths_arr, 0, spark_row_info.getNumRows(), static_cast(lengths_src)); + env->SetLongArrayRegion(lengths_arr, 0, spark_row_info.getNumRows(), reinterpret_cast(lengths_src)); int64_t address = reinterpret_cast(spark_row_info.getBufferAddress()); int64_t column_number = spark_row_info.getNumCols(); int64_t total_size = spark_row_info.getTotalBytes(); diff --git a/cpp-ch/local-engine/Parser/ExpressionParser.cpp b/cpp-ch/local-engine/Parser/ExpressionParser.cpp index 400d8c28df40d..a50590aaf7782 100644 --- a/cpp-ch/local-engine/Parser/ExpressionParser.cpp +++ b/cpp-ch/local-engine/Parser/ExpressionParser.cpp @@ -313,7 +313,6 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG & DataTypePtr denull_input_type = removeNullable(input_type); DataTypePtr output_type = TypeParser::parseType(substrait_type); DataTypePtr denull_output_type = removeNullable(output_type); - const ActionsDAG::Node * result_node = nullptr; if (substrait_type.has_binary()) { @@ -336,11 +335,15 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG & String function_name = "sparkCastFloatTo" + denull_output_type->getName(); result_node = toFunctionNode(actions_dag, function_name, args); } - else if ((isDecimal(denull_input_type) && substrait_type.has_decimal())) + else if ((isDecimal(denull_input_type) || isNativeNumber(denull_input_type)) && substrait_type.has_decimal()) { - args.emplace_back(addConstColumn(actions_dag, std::make_shared(), substrait_type.decimal().precision())); - args.emplace_back(addConstColumn(actions_dag, std::make_shared(), substrait_type.decimal().scale())); - result_node = toFunctionNode(actions_dag, "checkDecimalOverflowSparkOrNull", args); + int decimal_precision = substrait_type.decimal().precision(); + if (decimal_precision) + { + args.emplace_back(addConstColumn(actions_dag, std::make_shared(), decimal_precision)); + args.emplace_back(addConstColumn(actions_dag, std::make_shared(), substrait_type.decimal().scale())); + result_node = toFunctionNode(actions_dag, "checkDecimalOverflowSparkOrNull", args); + } } else if (isMap(denull_input_type) && isString(denull_output_type)) { diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index b4e0bb9ee8a3d..3cc9cb69db937 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -469,24 +469,22 @@ MergeTreeDataWriter::TemporaryPart SparkMergeTreeDataWriter::writeTempPart( new_data_part->uuid = UUIDHelpers::generateV4(); SyncGuardPtr sync_guard; - if (new_data_part->isStoredOnDisk()) - { - /// The name could be non-unique in case of stale files from previous runs. - String full_path = new_data_part->getDataPartStorage().getFullPath(); - if (new_data_part->getDataPartStorage().exists()) - { - // LOG_WARNING(log, "Removing old temporary directory {}", full_path); - data_part_storage->removeRecursive(); - } + /// The name could be non-unique in case of stale files from previous runs. + String full_path = new_data_part->getDataPartStorage().getFullPath(); - data_part_storage->createDirectories(); + if (new_data_part->getDataPartStorage().exists()) + { + LOG_WARNING(log, "Removing old temporary directory {}", full_path); + data_part_storage->removeRecursive(); + } - if ((*data.getSettings())[MergeTreeSetting::fsync_part_directory]) - { - const auto disk = data_part_volume->getDisk(); - sync_guard = disk->getDirectorySyncGuard(full_path); - } + data_part_storage->createDirectories(); + + if ((*data.getSettings())[MergeTreeSetting::fsync_part_directory]) + { + const auto disk = data_part_volume->getDisk(); + sync_guard = disk->getDirectorySyncGuard(full_path); } /// This effectively chooses minimal compression method: diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 855d2b8371ef4..6f3df78fb61f0 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1039,7 +1039,7 @@ JNIEXPORT jobject Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn local_engine::BlockStripes bs = local_engine::BlockStripeSplitter::split(*block, partition_col_indice_vec, hasBucket, reserve_); auto * addresses = env->NewLongArray(bs.block_addresses.size()); - env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), static_cast(bs.block_addresses.data())); + env->SetLongArrayRegion(addresses, 0, bs.block_addresses.size(), reinterpret_cast(bs.block_addresses.data())); auto * indices = env->NewIntArray(bs.heading_row_indice.size()); env->SetIntArrayRegion(indices, 0, bs.heading_row_indice.size(), bs.heading_row_indice.data()); diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 5a5eeac354805..d52ad74b99120 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -120,7 +120,6 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/RandomPartitioner.cc shuffle/RoundRobinPartitioner.cc shuffle/ShuffleMemoryPool.cc - shuffle/ShuffleReader.cc shuffle/ShuffleWriter.cc shuffle/SinglePartitioner.cc shuffle/Spill.cc diff --git a/cpp/core/benchmarks/CompressionBenchmark.cc b/cpp/core/benchmarks/CompressionBenchmark.cc index e1aa69ff39a7f..e465562145296 100644 --- a/cpp/core/benchmarks/CompressionBenchmark.cc +++ b/cpp/core/benchmarks/CompressionBenchmark.cc @@ -48,7 +48,9 @@ void printTrace(void) { for (i = 0; i < size; i++) printf(" %s\n", strings[i]); puts(""); - free(strings); + if (strings != nullptr) { + free(strings); + } } using arrow::RecordBatchReader; @@ -58,7 +60,7 @@ using gluten::ShuffleWriterOptions; namespace gluten { -#define ALIGNMENT 2 * 1024 * 1024 +#define ALIGNMENT (2 * 1024 * 1024) const int32_t kQatGzip = 0; const int32_t kQatZstd = 1; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index b4359bab41609..4e1d1f09fd1e1 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1132,7 +1132,6 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrapper jlong shuffleReaderHandle) { JNI_METHOD_START auto reader = ObjectStore::retrieve(shuffleReaderHandle); - GLUTEN_THROW_NOT_OK(reader->close()); ObjectStore::release(shuffleReaderHandle); JNI_METHOD_END() } diff --git a/cpp/core/memory/MemoryAllocator.cc b/cpp/core/memory/MemoryAllocator.cc index dd5fb8e1974b7..84708962d01c7 100644 --- a/cpp/core/memory/MemoryAllocator.cc +++ b/cpp/core/memory/MemoryAllocator.cc @@ -157,6 +157,7 @@ bool StdMemoryAllocator::reallocate(void* p, int64_t size, int64_t newSize, void } bool StdMemoryAllocator::reallocateAligned(void* p, uint64_t alignment, int64_t size, int64_t newSize, void** out) { + GLUTEN_CHECK(p != nullptr, "reallocate with nullptr"); if (newSize <= 0) { return false; } @@ -179,6 +180,7 @@ bool StdMemoryAllocator::reallocateAligned(void* p, uint64_t alignment, int64_t } bool StdMemoryAllocator::free(void* p, int64_t size) { + GLUTEN_CHECK(p != nullptr, "free with nullptr"); std::free(p); bytes_ -= size; return true; diff --git a/cpp/core/shuffle/ShuffleReader.cc b/cpp/core/shuffle/ShuffleReader.cc deleted file mode 100644 index ced80b3de13f4..0000000000000 --- a/cpp/core/shuffle/ShuffleReader.cc +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "ShuffleReader.h" -#include "arrow/ipc/reader.h" -#include "arrow/record_batch.h" -#include "utils/Macros.h" - -#include - -#include "ShuffleSchema.h" - -namespace gluten { - -ShuffleReader::ShuffleReader(std::unique_ptr factory) : factory_(std::move(factory)) {} - -std::shared_ptr ShuffleReader::readStream(std::shared_ptr in) { - return std::make_shared(factory_->createDeserializer(in)); -} - -arrow::Status ShuffleReader::close() { - return arrow::Status::OK(); -} - -arrow::MemoryPool* ShuffleReader::getPool() const { - return factory_->getPool(); -} - -int64_t ShuffleReader::getDecompressTime() const { - return factory_->getDecompressTime(); -} - -ShuffleWriterType ShuffleReader::getShuffleWriterType() const { - return factory_->getShuffleWriterType(); -} - -int64_t ShuffleReader::getDeserializeTime() const { - return factory_->getDeserializeTime(); -} - -} // namespace gluten diff --git a/cpp/core/shuffle/ShuffleReader.h b/cpp/core/shuffle/ShuffleReader.h index 0f985c7da9395..6e2b079fc73b7 100644 --- a/cpp/core/shuffle/ShuffleReader.h +++ b/cpp/core/shuffle/ShuffleReader.h @@ -17,63 +17,22 @@ #pragma once -#include "memory/ColumnarBatch.h" - -#include -#include - -#include "Options.h" #include "compute/ResultIterator.h" -#include "utils/Compression.h" namespace gluten { -class DeserializerFactory { - public: - virtual ~DeserializerFactory() = default; - - virtual std::unique_ptr createDeserializer(std::shared_ptr in) = 0; - - virtual arrow::MemoryPool* getPool() = 0; - - virtual int64_t getDecompressTime() = 0; - - virtual int64_t getDeserializeTime() = 0; - - virtual ShuffleWriterType getShuffleWriterType() = 0; -}; - class ShuffleReader { public: - explicit ShuffleReader(std::unique_ptr factory); - virtual ~ShuffleReader() = default; // FIXME iterator should be unique_ptr or un-copyable singleton - virtual std::shared_ptr readStream(std::shared_ptr in); - - arrow::Status close(); - - int64_t getDecompressTime() const; - - int64_t getIpcTime() const; - - int64_t getDeserializeTime() const; - - arrow::MemoryPool* getPool() const; - - ShuffleWriterType getShuffleWriterType() const; + virtual std::shared_ptr readStream(std::shared_ptr in) = 0; - protected: - arrow::MemoryPool* pool_; - int64_t decompressTime_ = 0; - int64_t deserializeTime_ = 0; + virtual int64_t getDecompressTime() const = 0; - ShuffleWriterType shuffleWriterType_; + virtual int64_t getDeserializeTime() const = 0; - private: - std::shared_ptr schema_; - std::unique_ptr factory_; + virtual arrow::MemoryPool* getPool() const = 0; }; } // namespace gluten diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 20c3dec939a0c..2a2ea929c1ce1 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -251,7 +251,7 @@ std::shared_ptr VeloxRuntime::createShuffleReader( auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); auto ctxVeloxPool = memoryManager()->getLeafMemoryPool(); auto veloxCompressionType = facebook::velox::common::stringToCompressionKind(options.compressionTypeStr); - auto deserializerFactory = std::make_unique( + auto deserializerFactory = std::make_unique( schema, std::move(codec), veloxCompressionType, diff --git a/cpp/velox/memory/VeloxMemoryManager.cc b/cpp/velox/memory/VeloxMemoryManager.cc index 597ce1c9da372..46c5186f5b206 100644 --- a/cpp/velox/memory/VeloxMemoryManager.cc +++ b/cpp/velox/memory/VeloxMemoryManager.cc @@ -126,7 +126,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { uint64_t shrinkCapacity(uint64_t targetBytes, bool allowSpill, bool allowAbort) override { velox::memory::ScopedMemoryArbitrationContext ctx{}; facebook::velox::exec::MemoryReclaimer::Stats status; - velox::memory::MemoryPool* pool; + velox::memory::MemoryPool* pool = nullptr; { std::unique_lock guard{mutex_}; VELOX_CHECK_EQ(candidates_.size(), 1, "ListenableArbitrator should only be used within a single root pool"); @@ -178,7 +178,7 @@ class ListenableArbitrator : public velox::memory::MemoryArbitrator { return freeBytes; } - gluten::AllocationListener* listener_; + gluten::AllocationListener* listener_ = nullptr; const uint64_t memoryPoolInitialCapacity_; // FIXME: Unused. const uint64_t memoryPoolTransferCapacity_; const uint64_t memoryReclaimMaxWaitMs_; diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index 0407be736a703..3aba7cf0fc3c1 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -15,13 +15,13 @@ * limitations under the License. */ -#include "VeloxShuffleReader.h" -#include "GlutenByteStream.h" +#include "shuffle/VeloxShuffleReader.h" #include #include #include "memory/VeloxColumnarBatch.h" +#include "shuffle/GlutenByteStream.h" #include "shuffle/Payload.h" #include "shuffle/Utils.h" #include "utils/Common.h" @@ -576,7 +576,7 @@ std::shared_ptr VeloxRssSortShuffleReaderDeserializer::next() { return std::make_shared(std::move(rowVector)); } -VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( +VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory( const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::common::CompressionKind veloxCompressionType, @@ -598,7 +598,7 @@ VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( initFromSchema(); } -std::unique_ptr VeloxColumnarBatchDeserializerFactory::createDeserializer( +std::unique_ptr VeloxShuffleReaderDeserializerFactory::createDeserializer( std::shared_ptr in) { switch (shuffleWriterType_) { case ShuffleWriterType::kHashShuffle: @@ -635,23 +635,19 @@ std::unique_ptr VeloxColumnarBatchDeserializerFactory::cr } } -arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() { +arrow::MemoryPool* VeloxShuffleReaderDeserializerFactory::getPool() { return memoryPool_; } -ShuffleWriterType VeloxColumnarBatchDeserializerFactory::getShuffleWriterType() { - return shuffleWriterType_; -} - -int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() { +int64_t VeloxShuffleReaderDeserializerFactory::getDecompressTime() { return decompressTime_; } -int64_t VeloxColumnarBatchDeserializerFactory::getDeserializeTime() { +int64_t VeloxShuffleReaderDeserializerFactory::getDeserializeTime() { return deserializeTime_; } -void VeloxColumnarBatchDeserializerFactory::initFromSchema() { +void VeloxShuffleReaderDeserializerFactory::initFromSchema() { GLUTEN_ASSIGN_OR_THROW(auto arrowColumnTypes, toShuffleTypeId(schema_->fields())); isValidityBuffer_.reserve(arrowColumnTypes.size()); for (size_t i = 0; i < arrowColumnTypes.size(); ++i) { @@ -681,7 +677,23 @@ void VeloxColumnarBatchDeserializerFactory::initFromSchema() { } } -VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) - : ShuffleReader(std::move(factory)) {} +VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr factory) + : factory_(std::move(factory)) {} + +std::shared_ptr VeloxShuffleReader::readStream(std::shared_ptr in) { + return std::make_shared(factory_->createDeserializer(in)); +} + +arrow::MemoryPool* VeloxShuffleReader::getPool() const { + return factory_->getPool(); +} + +int64_t VeloxShuffleReader::getDecompressTime() const { + return factory_->getDecompressTime(); +} + +int64_t VeloxShuffleReader::getDeserializeTime() const { + return factory_->getDeserializeTime(); +} } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h index af35f977127f1..8ebdbf2bacabe 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.h +++ b/cpp/velox/shuffle/VeloxShuffleReader.h @@ -17,16 +17,14 @@ #pragma once -#include "operators/serializer/VeloxColumnarBatchSerializer.h" #include "shuffle/Payload.h" #include "shuffle/ShuffleReader.h" #include "shuffle/VeloxSortShuffleWriter.h" -#include "utils/Timer.h" + +#include "velox/serializers/PrestoSerializer.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" -#include - namespace gluten { class VeloxHashShuffleReaderDeserializer final : public ColumnarBatchIterator { @@ -134,9 +132,9 @@ class VeloxRssSortShuffleReaderDeserializer : public ColumnarBatchIterator { std::shared_ptr in_; }; -class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { +class VeloxShuffleReaderDeserializerFactory { public: - VeloxColumnarBatchDeserializerFactory( + VeloxShuffleReaderDeserializerFactory( const std::shared_ptr& schema, const std::shared_ptr& codec, const facebook::velox::common::CompressionKind veloxCompressionType, @@ -147,15 +145,13 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { std::shared_ptr veloxPool, ShuffleWriterType shuffleWriterType); - std::unique_ptr createDeserializer(std::shared_ptr in) override; - - arrow::MemoryPool* getPool() override; + std::unique_ptr createDeserializer(std::shared_ptr in); - int64_t getDecompressTime() override; + arrow::MemoryPool* getPool(); - int64_t getDeserializeTime() override; + int64_t getDecompressTime(); - ShuffleWriterType getShuffleWriterType() override; + int64_t getDeserializeTime(); private: void initFromSchema(); @@ -180,6 +176,17 @@ class VeloxColumnarBatchDeserializerFactory : public DeserializerFactory { class VeloxShuffleReader final : public ShuffleReader { public: - VeloxShuffleReader(std::unique_ptr factory); + VeloxShuffleReader(std::unique_ptr factory); + + std::shared_ptr readStream(std::shared_ptr in) override; + + int64_t getDecompressTime() const override; + + int64_t getDeserializeTime() const override; + + arrow::MemoryPool* getPool() const override; + + private: + std::unique_ptr factory_; }; } // namespace gluten diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 9331a72078584..4fcab8f24271c 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -360,7 +360,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam( + auto deserializerFactory = std::make_unique( schema, std::move(codec), veloxCompressionType, diff --git a/dev/info.sh b/dev/info.sh index 1ced70db3e7c4..fe259ea4389e1 100755 --- a/dev/info.sh +++ b/dev/info.sh @@ -14,7 +14,7 @@ # limitations under the License. set -e -version='1.3.0-SNAPSHOT' +version='1.4.0-SNAPSHOT' cb='```' if [ ! -x "$(command -v cmake)" ]; then diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index c59d6be3a6314..e274d5e39fc91 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -361,7 +361,11 @@ Developers can use `--debug-mode` command line flag to turn on debug mode when n ## Enable HDFS support -After enabling the dynamic loading of libhdfs.so at runtime to support HDFS, if you run the benchmark with an HDFS file, you need to set the classpath for Hadoop. You can do this by running `export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob``. Otherwise, the HDFS connection will fail. If you have replaced ${HADOOP_HOME}/lib/native/libhdfs.so with libhdfs3.so, there is no need to set the `CLASSPATH`. +After enabling the dynamic loading of libhdfs.so at runtime to support HDFS, if you run the benchmark with an HDFS file, you need to set the classpath for Hadoop. You can do this by running +``` +export CLASSPATH=`$HADOOP_HOME/bin/hdfs classpath --glob` +``` +Otherwise, the HDFS connection will fail. If you have replaced ${HADOOP_HOME}/lib/native/libhdfs.so with libhdfs3.so, there is no need to set the `CLASSPATH`. ## Simulate write tasks diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 676c062c1bf44..06ce5b96068cb 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2025_01_09 +VELOX_BRANCH=2025_01_12 VELOX_HOME="" OS=`uname -s` diff --git a/gluten-arrow/pom.xml b/gluten-arrow/pom.xml index ffba2682e9dc1..8aa3cd9d489c0 100644 --- a/gluten-arrow/pom.xml +++ b/gluten-arrow/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-celeborn/pom.xml b/gluten-celeborn/pom.xml index 0eca5da979e1c..6b8d5ea2304a7 100755 --- a/gluten-celeborn/pom.xml +++ b/gluten-celeborn/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index 5e077a8b7db56..977977098ab23 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -4,7 +4,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT 4.0.0 diff --git a/gluten-core/src/main/java/org/apache/gluten/utils/ResourceUtil.java b/gluten-core/src/main/java/org/apache/gluten/utils/ResourceUtil.java index 692a91af2667a..89f48d4984f62 100644 --- a/gluten-core/src/main/java/org/apache/gluten/utils/ResourceUtil.java +++ b/gluten-core/src/main/java/org/apache/gluten/utils/ResourceUtil.java @@ -16,15 +16,20 @@ */ package org.apache.gluten.utils; +import org.apache.gluten.exception.GlutenException; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.URL; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipException; @@ -36,44 +41,73 @@ * and then modified for Gluten's use. */ public class ResourceUtil { - private static final Logger LOG = LoggerFactory.getLogger(ResourceUtil.class); /** - * Get a collection of resource paths by the input RegEx pattern. + * Get a collection of resource paths by the input RegEx pattern in a certain container folder. * - * @param pattern The pattern to match. + * @param container The container folder. E.g., `META-INF`. Should not be left empty, because + * Classloader requires for at a meaningful file name to search inside the loaded jar files. + * @param pattern The pattern to match on the file names. * @return The relative resource paths in the order they are found. */ - public static List getResources(final Pattern pattern) { + public static List getResources(final String container, final Pattern pattern) { + Preconditions.checkArgument( + !container.isEmpty(), + "Resource search should only be used under a certain container folder"); + Preconditions.checkArgument( + !container.startsWith("/") && !container.endsWith("/"), + "Resource container should not start or end with\"/\""); final List buffer = new ArrayList<>(); - final String classPath = System.getProperty("java.class.path"); - final String[] classPathElements = classPath.split(File.pathSeparator); - for (final String element : classPathElements) { - getResources(element, pattern, buffer); + final Enumeration containerUrls; + try { + containerUrls = Thread.currentThread().getContextClassLoader().getResources(container); + } catch (IOException e) { + throw new GlutenException(e); + } + while (containerUrls.hasMoreElements()) { + final URL containerUrl = containerUrls.nextElement(); + getResources(containerUrl, pattern, buffer); } return Collections.unmodifiableList(buffer); } private static void getResources( - final String element, final Pattern pattern, final List buffer) { - final File file = new File(element); - if (!file.exists()) { - LOG.info("Skip non-existing classpath: {}", element); - return; - } - if (file.isDirectory()) { - getResourcesFromDirectory(file, file, pattern, buffer); - } else { - getResourcesFromJarFile(file, pattern, buffer); + final URL containerUrl, final Pattern pattern, final List buffer) { + final String protocol = containerUrl.getProtocol(); + switch (protocol) { + case "file": + final File fileContainer = new File(containerUrl.getPath()); + Preconditions.checkState( + fileContainer.exists() && fileContainer.isDirectory(), + "Specified file container " + containerUrl + " is not a directory or not a file"); + getResourcesFromDirectory(fileContainer, fileContainer, pattern, buffer); + break; + case "jar": + final String jarContainerPath = containerUrl.getPath(); + final Pattern jarContainerPattern = Pattern.compile("file:([^!]+)!/(.+)"); + final Matcher m = jarContainerPattern.matcher(jarContainerPath); + if (!m.matches()) { + throw new GlutenException("Illegal Jar container URL: " + containerUrl); + } + final String jarPath = m.group(1); + final File jarFile = new File(jarPath); + Preconditions.checkState( + jarFile.exists() && jarFile.isFile(), + "Specified Jar container " + containerUrl + " is not a Jar file"); + final String dir = m.group(2); + getResourcesFromJarFile(jarFile, dir, pattern, buffer); + break; + default: + throw new GlutenException("Unrecognizable resource protocol: " + protocol); } } private static void getResourcesFromJarFile( - final File file, final Pattern pattern, final List buffer) { - ZipFile zf; + final File jarFile, final String dir, final Pattern pattern, final List buffer) { + final ZipFile zf; try { - zf = new ZipFile(file); + zf = new ZipFile(jarFile); } catch (final ZipException e) { throw new RuntimeException(e); } catch (final IOException e) { @@ -83,9 +117,14 @@ private static void getResourcesFromJarFile( while (e.hasMoreElements()) { final ZipEntry ze = (ZipEntry) e.nextElement(); final String fileName = ze.getName(); - final boolean accept = pattern.matcher(fileName).matches(); + if (!fileName.startsWith(dir)) { + continue; + } + final String relativeFileName = + new File(dir).toURI().relativize(new File(fileName).toURI()).getPath(); + final boolean accept = pattern.matcher(relativeFileName).matches(); if (accept) { - buffer.add(fileName); + buffer.add(relativeFileName); } } try { diff --git a/gluten-core/src/main/scala/org/apache/gluten/component/Discovery.scala b/gluten-core/src/main/scala/org/apache/gluten/component/Discovery.scala index 2b8f060a69f7a..ffa2ba9b77068 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/component/Discovery.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/component/Discovery.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gluten.component import org.apache.gluten.exception.GlutenException @@ -26,11 +25,8 @@ import org.apache.spark.util.SparkReflectionUtil import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex - - - - // format: off + /** * Gluten's global discovery to find all [[Component]] definitions in the classpath. * @@ -54,12 +50,12 @@ import scala.util.matching.Regex // format: on private object Discovery extends Logging { private val container: String = "META-INF/gluten-components" - private val componentFilePattern: Regex = s"^$container/(.+)$$".r + private val componentFilePattern: Regex = s"^(.+)$$".r def discoverAll(): Seq[Component] = { logInfo("Start discovering components in the current classpath... ") val prev = System.currentTimeMillis() - val allFiles = ResourceUtil.getResources(componentFilePattern.pattern).asScala + val allFiles = ResourceUtil.getResources(container, componentFilePattern.pattern).asScala val duration = System.currentTimeMillis() - prev logInfo(s"Discovered component files: ${allFiles.mkString(", ")}. Duration: $duration ms.") val deDup = mutable.Set[String]() diff --git a/gluten-core/src/test/java/org/apache/gluten/util/ResourceUtilTest.java b/gluten-core/src/test/java/org/apache/gluten/util/ResourceUtilTest.java new file mode 100644 index 0000000000000..570e5a6e4d034 --- /dev/null +++ b/gluten-core/src/test/java/org/apache/gluten/util/ResourceUtilTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.util; + +import org.apache.gluten.utils.ResourceUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.regex.Pattern; + +public class ResourceUtilTest { + @Test + public void testFile() { + // Use the class file of this test to verify the sanity of ResourceUtil. + List classes = + ResourceUtil.getResources( + "org", Pattern.compile("apache/gluten/util/ResourceUtilTest\\.class")); + Assert.assertEquals(1, classes.size()); + Assert.assertEquals("apache/gluten/util/ResourceUtilTest.class", classes.get(0)); + } + + @Test + public void testJar() { + // Use the class file of Spark code to verify the sanity of ResourceUtil. + List classes = + ResourceUtil.getResources("org", Pattern.compile("apache/spark/SparkContext\\.class")); + Assert.assertEquals(1, classes.size()); + Assert.assertEquals("apache/spark/SparkContext.class", classes.get(0)); + } +} diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml index cf2fdd7518fa4..ef30a4f5631f0 100755 --- a/gluten-delta/pom.xml +++ b/gluten-delta/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-hudi/pom.xml b/gluten-hudi/pom.xml index 5865f1f6ece8c..5d20f9e8d943f 100755 --- a/gluten-hudi/pom.xml +++ b/gluten-hudi/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-iceberg/pom.xml b/gluten-iceberg/pom.xml index c8c811674a548..b2bf5f9ebe078 100644 --- a/gluten-iceberg/pom.xml +++ b/gluten-iceberg/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ras/common/pom.xml b/gluten-ras/common/pom.xml index 6990a855508d7..761a1c0b6153e 100644 --- a/gluten-ras/common/pom.xml +++ b/gluten-ras/common/pom.xml @@ -5,7 +5,7 @@ org.apache.gluten gluten-ras - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-ras-common Gluten Ras Common diff --git a/gluten-ras/planner/pom.xml b/gluten-ras/planner/pom.xml index 652977451a757..61c215b0125b2 100644 --- a/gluten-ras/planner/pom.xml +++ b/gluten-ras/planner/pom.xml @@ -5,7 +5,7 @@ org.apache.gluten gluten-ras - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-ras-planner Gluten Ras Planner diff --git a/gluten-ras/pom.xml b/gluten-ras/pom.xml index 570b772235f67..50494e1e9d83c 100644 --- a/gluten-ras/pom.xml +++ b/gluten-ras/pom.xml @@ -17,7 +17,7 @@ org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-ras pom diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml index 8a790aa15337d..7e73b4850cbc3 100644 --- a/gluten-substrait/pom.xml +++ b/gluten-substrait/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT 4.0.0 diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala index 126417bf18a54..54b5a34639913 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec} -import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.vectorized.ColumnarBatch @@ -133,19 +132,33 @@ object GlutenWriterColumnarRules { } } - case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { + private[datasources] def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { + child match { + // if the child is columnar, we can just wrap & transfer the columnar data + case c2r: ColumnarToRowExecBase => + command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) + // If the child is aqe, we make aqe "support columnar", + // then aqe itself will guarantee to generate columnar outputs. + // So FakeRowAdaptor will always consumes columnar data, + // thus avoiding the case of c2r->aqe->r2c->writer + case aqe: AdaptiveSparkPlanExec => + command.withNewChildren( + Array( + FakeRowAdaptor( + AdaptiveSparkPlanExec( + aqe.inputPlan, + aqe.context, + aqe.preprocessingRules, + aqe.isSubquery, + supportsColumnar = true + )))) + case other => command.withNewChildren(Array(FakeRowAdaptor(other))) + } + } - private val NOOP_WRITE = "org.apache.spark.sql.execution.datasources.noop.NoopWrite$" + case class NativeWritePostRule(session: SparkSession) extends Rule[SparkPlan] { override def apply(p: SparkPlan): SparkPlan = p match { - case rc @ AppendDataExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) - case rc @ OverwriteByExpressionExec(_, _, write) - if write.getClass.getName == NOOP_WRITE && - BackendsApiManager.getSettings.enableNativeWriteFiles() => - injectFakeRowAdaptor(rc, rc.child) case rc @ DataWritingCommandExec(cmd, child) => // The same thread can set these properties in the last query submission. val fields = child.output.toStructType.fields @@ -165,30 +178,6 @@ object GlutenWriterColumnarRules { case plan: SparkPlan => plan.withNewChildren(plan.children.map(apply)) } - - private def injectFakeRowAdaptor(command: SparkPlan, child: SparkPlan): SparkPlan = { - child match { - // if the child is columnar, we can just wrap&transfer the columnar data - case c2r: ColumnarToRowExecBase => - command.withNewChildren(Array(FakeRowAdaptor(c2r.child))) - // If the child is aqe, we make aqe "support columnar", - // then aqe itself will guarantee to generate columnar outputs. - // So FakeRowAdaptor will always consumes columnar data, - // thus avoiding the case of c2r->aqe->r2c->writer - case aqe: AdaptiveSparkPlanExec => - command.withNewChildren( - Array( - FakeRowAdaptor( - AdaptiveSparkPlanExec( - aqe.inputPlan, - aqe.context, - aqe.preprocessingRules, - aqe.isSubquery, - supportsColumnar = true - )))) - case other => command.withNewChildren(Array(FakeRowAdaptor(other))) - } - } } def injectSparkLocalProperty(spark: SparkSession, format: Option[String]): Unit = { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala new file mode 100644 index 0000000000000..bedf006510d32 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/noop/GlutenNoopWriterRule.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.noop + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules.injectFakeRowAdaptor +import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, OverwriteByExpressionExec} + +/** + * A rule that injects a FakeRowAdaptor for NoopWrite. + * + * The current V2 Command does not support columnar. Therefore, when its child node is a + * ColumnarNode, Vanilla Spark inserts a ColumnarToRow conversion between V2 Command and its child. + * This rule replaces the inserted ColumnarToRow with a FakeRowAdaptor, effectively bypassing the + * ColumnarToRow operation for NoopWrite. Since NoopWrite does not actually perform any data + * operations, it can accept input data in either row-based or columnar format. + */ +case class GlutenNoopWriterRule(session: SparkSession) extends Rule[SparkPlan] { + override def apply(p: SparkPlan): SparkPlan = p match { + case rc @ AppendDataExec(_, _, NoopWrite) => + injectFakeRowAdaptor(rc, rc.child) + case rc @ OverwriteByExpressionExec(_, _, NoopWrite) => + injectFakeRowAdaptor(rc, rc.child) + case _ => p + } +} diff --git a/gluten-ui/pom.xml b/gluten-ui/pom.xml index a55d104a0b7a9..2c92daff85da8 100644 --- a/gluten-ui/pom.xml +++ b/gluten-ui/pom.xml @@ -6,7 +6,7 @@ org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-ui diff --git a/gluten-uniffle/pom.xml b/gluten-uniffle/pom.xml index efc8ce6555c5b..402f7a94d74f1 100644 --- a/gluten-uniffle/pom.xml +++ b/gluten-uniffle/pom.xml @@ -5,7 +5,7 @@ gluten-parent org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/common/pom.xml b/gluten-ut/common/pom.xml index a61a8c47d147b..11caaa4dd4ff0 100644 --- a/gluten-ut/common/pom.xml +++ b/gluten-ut/common/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml index 89d1ff9bc9aed..ca41d580aa463 100644 --- a/gluten-ut/pom.xml +++ b/gluten-ut/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/gluten-ut/spark32/pom.xml b/gluten-ut/spark32/pom.xml index 1ef95e0ea73d8..8d0d0f4fcae20 100644 --- a/gluten-ut/spark32/pom.xml +++ b/gluten-ut/spark32/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index b985347f20411..8c62e3b0fd9b1 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1123,6 +1123,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") .exclude("SPARK-32717: AQEOptimizer should respect excludedRules configuration") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index f5071d2f3fc4f..62ab868363017 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -166,14 +166,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", // "SPARK-33933", "SPARK-31220", @@ -1053,7 +1051,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] enableSuite[GlutenBucketedReadWithoutHiveSupportSuite] // Exclude the following suite for plan changed from SMJ to SHJ. .exclude("avoid shuffle when join 2 bucketed tables") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 6d3c3e865d587..928dc38985ce4 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -42,7 +49,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1196,6 +1203,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index f8b6092a46f7c..ce9513c8cc9bf 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.log4j.Level @@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1175,6 +1182,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 10abca1c6dd31..0000000000000 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } - } -} diff --git a/gluten-ut/spark33/pom.xml b/gluten-ut/spark33/pom.xml index 539f60a63f1be..f8377195eec9f 100644 --- a/gluten-ut/spark33/pom.xml +++ b/gluten-ut/spark33/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 219eb0d0b97e7..f91841b991c76 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1119,6 +1119,9 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude("SPARK-32932: Do not use local shuffle read at final stage on write command") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index d3bc3846d80fd..72b77ae1f95b8 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -70,7 +70,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenSupportsCatalogOptionsSuite] enableSuite[GlutenTableCapabilityCheckSuite] enableSuite[GlutenWriteDistributionAndOrderingSuite] - enableSuite[GlutenWriterColumnarRulesSuite] enableSuite[GlutenQueryCompilationErrorsDSv2Suite] @@ -191,14 +190,12 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", "SPARK-34533", "SPARK-34781", "SPARK-35585", - "SPARK-32932", "SPARK-33494", "SPARK-33933", "SPARK-31220", diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 441f3a60a3a9a..779d264114cb1 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformerBase} import org.apache.spark.SparkConf @@ -24,14 +25,20 @@ import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -40,7 +47,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1193,6 +1200,86 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5a..f9f0723e00cc1 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,14 +26,20 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +48,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1186,86 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten("SPARK-32932: Do not use local shuffle read at final stage on write command") { + withSQLConf( + SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString, + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true" + ) { + val data = + for ( + i <- 1L to 10L; + j <- 1L to 3L + ) yield (i, j) + + val df = data.toDF("i", "j").repartition($"j") + var noLocalread: Boolean = false + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.executedPlan match { + case plan @ (_: DataWritingCommandExec | _: V2TableWriteExec) => + noLocalread = collect(plan) { + case exec: AQEShuffleReadExec if exec.isLocalRead => exec + }.isEmpty + case _ => // ignore other events + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + + withTable("t") { + df.write.partitionBy("j").saveAsTable("t") + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + } + + // Test DataSource v2 + val format = classOf[NoopDataSource].getName + df.write.format(format).mode("overwrite").save() + sparkContext.listenerBus.waitUntilEmpty() + assert(noLocalread) + noLocalread = false + + spark.listenerManager.unregister(listener) + } + } + + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala deleted file mode 100644 index 10abca1c6dd31..0000000000000 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRulesSuite.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, SaveMode} -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.QueryExecutionListener - -class GlutenWriterColumnarRulesSuite extends GlutenSQLTestsBaseTrait { - - class WriterColumnarListener extends QueryExecutionListener { - var fakeRowAdaptor: Option[FakeRowAdaptor] = None - - override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { - fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } - } - - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} - } - - testGluten("writing to noop") { - withTempDir { - dir => - withSQLConf(GlutenConfig.NATIVE_WRITER_ENABLED.key -> "true") { - spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) - val listener = new WriterColumnarListener - spark.listenerManager.register(listener) - try { - spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() - spark.sparkContext.listenerBus.waitUntilEmpty() - assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") - } finally { - spark.listenerManager.unregister(listener) - } - } - } - } -} diff --git a/gluten-ut/spark34/pom.xml b/gluten-ut/spark34/pom.xml index a9a2f1c8a51e8..e61142b79df82 100644 --- a/gluten-ut/spark34/pom.xml +++ b/gluten-ut/spark34/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 56c95ae1bd05e..9ebcadf531186 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index cc9746dcdb530..94d3a1f6e8906 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, GlutenQueryParsingErrorsSuite} import org.apache.spark.sql.execution.{FallbackStrategiesSuite, GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite} import org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite -import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, GlutenFileFormatWriterSuite, GlutenFileIndexSuite, GlutenFileMetadataStructRowIndexSuite, GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, GlutenOrcV1AggregatePushDownSuite, GlutenOrcV2AggregatePushDownSuite, GlutenParquetCodecSuite, GlutenParquetReadSchemaSuite, GlutenParquetV1AggregatePushDownSuite, GlutenParquetV2AggregatePushDownSuite, GlutenPathFilterStrategySuite, GlutenPathFilterSuite, GlutenPruneFileSourcePartitionsSuite, GlutenV1WriteCommandSuite, GlutenVectorizedOrcReadSchemaSuite, GlutenVectorizedParquetReadSchemaSuite} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite import org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, GlutenCSVv1Suite, GlutenCSVv2Suite} import org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite @@ -182,7 +182,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 49d47fa65b1fc..2bd5a96dadee0 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1182,6 +1187,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5a..6a3d6da27ceff 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/pom.xml b/gluten-ut/spark35/pom.xml index 1750a5e278164..d95e4aeeee050 100644 --- a/gluten-ut/spark35/pom.xml +++ b/gluten-ut/spark35/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index bdb160ad24beb..f482ad921ee3d 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -983,6 +983,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("Change merge join to broadcast join without local shuffle read") .exclude( "Avoid changing merge join to broadcast join if too many empty partitions on build plan") + .exclude( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") .exclude("SPARK-37753: Allow changing outer join to broadcast join even if too many empty partitions on broadcast side") .exclude("SPARK-29544: adaptive skew join with different join types") .exclude("SPARK-34682: AQEShuffleReadExec operating on canonicalized plan") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 71786c91322b3..73c4d43ced531 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -185,7 +185,6 @@ class VeloxTestSettings extends BackendTestSettings { "SPARK-30403", "SPARK-30719", "SPARK-31384", - "SPARK-30953", "SPARK-31658", "SPARK-32717", "SPARK-32649", diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala index 2e5df7b859e34..bd941586d73c3 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/clickhouse/ClickHouseAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.clickhouse +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1197,6 +1202,37 @@ class ClickHouseAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with Glute } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 729a12f56cc5a..6a3d6da27ceff 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql.execution.adaptive.velox +import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, ShuffledHashJoinExecTransformerBase, SortExecTransformer, SortMergeJoinExecTransformer} import org.apache.spark.SparkConf @@ -25,6 +26,9 @@ import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.execution.datasources.noop.NoopDataSource +import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter @@ -33,6 +37,7 @@ import org.apache.spark.sql.functions.when import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData.TestData import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.QueryExecutionListener import org.apache.logging.log4j.Level @@ -41,7 +46,7 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT override def sparkConf: SparkConf = { super.sparkConf - .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "false") + .set(GlutenConfig.COLUMNAR_FORCE_SHUFFLED_HASH_JOIN_ENABLED.key, "false") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") } @@ -1179,6 +1184,37 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT } } + testGluten( + "SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of v2 write commands") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") { + var plan: SparkPlan = null + val listener = new QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + plan = qe.executedPlan + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + spark.listenerManager.register(listener) + withTable("t1") { + val format = classOf[NoopDataSource].getName + Seq((0, 1)).toDF("x", "y").write.format(format).mode("overwrite").save() + + sparkContext.listenerBus.waitUntilEmpty() + assert(plan.isInstanceOf[V2TableWriteExec]) + val childPlan = plan.asInstanceOf[V2TableWriteExec].child + assert(childPlan.isInstanceOf[FakeRowAdaptor]) + assert(childPlan.asInstanceOf[FakeRowAdaptor].child.isInstanceOf[AdaptiveSparkPlanExec]) + + spark.listenerManager.unregister(listener) + } + } + } + testGluten("SPARK-35650: Coalesce number of partitions by AEQ") { withSQLConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { Seq("REPARTITION", "REBALANCE(key)") diff --git a/gluten-ut/test/pom.xml b/gluten-ut/test/pom.xml index fb637d5489374..a641a19b2fafb 100644 --- a/gluten-ut/test/pom.xml +++ b/gluten-ut/test/pom.xml @@ -5,7 +5,7 @@ gluten-ut org.apache.gluten - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala new file mode 100644 index 0000000000000..ebf17444e623b --- /dev/null +++ b/gluten-ut/test/src/test/scala/org/apache/spark/sql/datasources/GlutenNoopWriterRuleSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.datasources + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.utils.{BackendTestUtils, SystemParameters} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{GlutenQueryTest, SaveMode} +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.FakeRowAdaptor +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.util.QueryExecutionListener + +class GlutenNoopWriterRuleSuite extends GlutenQueryTest with SharedSparkSession { + + override def sparkConf: SparkConf = { + val conf = super.sparkConf + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.default.parallelism", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.ui.enabled", "false") + .set("spark.gluten.ui.enabled", "false") + if (BackendTestUtils.isCHBackendLoaded()) { + conf.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath) + } + conf + } + + class WriterColumnarListener extends QueryExecutionListener { + var fakeRowAdaptor: Option[FakeRowAdaptor] = None + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + fakeRowAdaptor = qe.executedPlan.collectFirst { case f: FakeRowAdaptor => f } + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + } + + test("writing to noop") { + withTempDir { + dir => + spark.range(0, 100).write.mode(SaveMode.Overwrite).parquet(dir.getPath) + val listener = new WriterColumnarListener + spark.listenerManager.register(listener) + try { + spark.read.parquet(dir.getPath).write.format("noop").mode(SaveMode.Overwrite).save() + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(listener.fakeRowAdaptor.isDefined, "FakeRowAdaptor is not found.") + } finally { + spark.listenerManager.unregister(listener) + } + } + } +} diff --git a/package/pom.xml b/package/pom.xml index b9c114181bcde..a1df1edc4eb86 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/pom.xml b/pom.xml index 9cdc3f2c492d1..8fa2f355030b1 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ 4.0.0 org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT pom Gluten Parent Pom diff --git a/shims/common/pom.xml b/shims/common/pom.xml index 92cee60b07bdc..f09f196b40115 100644 --- a/shims/common/pom.xml +++ b/shims/common/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten spark-sql-columnar-shims - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/shims/pom.xml b/shims/pom.xml index 4727cf474dcb0..9ca5048cd17fb 100644 --- a/shims/pom.xml +++ b/shims/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten gluten-parent - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/shims/spark32/pom.xml b/shims/spark32/pom.xml index 9c6322ac58919..fee18fdc60e2f 100644 --- a/shims/spark32/pom.xml +++ b/shims/spark32/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten spark-sql-columnar-shims - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/shims/spark33/pom.xml b/shims/spark33/pom.xml index edab2c8705a1c..1e1e1c537e760 100644 --- a/shims/spark33/pom.xml +++ b/shims/spark33/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten spark-sql-columnar-shims - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/shims/spark34/pom.xml b/shims/spark34/pom.xml index dd4b8d156f1b2..825ec2015b111 100644 --- a/shims/spark34/pom.xml +++ b/shims/spark34/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten spark-sql-columnar-shims - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/shims/spark35/pom.xml b/shims/spark35/pom.xml index ed1b599910638..47af80194f841 100644 --- a/shims/spark35/pom.xml +++ b/shims/spark35/pom.xml @@ -20,7 +20,7 @@ org.apache.gluten spark-sql-columnar-shims - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT ../pom.xml diff --git a/tools/gluten-it/common/pom.xml b/tools/gluten-it/common/pom.xml index e6ec5ffe19925..0e97be740b283 100644 --- a/tools/gluten-it/common/pom.xml +++ b/tools/gluten-it/common/pom.xml @@ -7,11 +7,11 @@ org.apache.gluten gluten-it - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-it-common - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT jar diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml index 6a11b987f12cd..9b42b4edf5ba0 100644 --- a/tools/gluten-it/package/pom.xml +++ b/tools/gluten-it/package/pom.xml @@ -5,7 +5,7 @@ org.apache.gluten gluten-it - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT gluten-it-package http://maven.apache.org diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 488ff1c54cd1f..89738668039a6 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -6,7 +6,7 @@ org.apache.gluten gluten-it - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT pom common @@ -23,7 +23,7 @@ 3 0.3.2-incubating 0.9.1 - 1.3.0-SNAPSHOT + 1.4.0-SNAPSHOT 32.0.1-jre 1.1 1.4 diff --git a/tools/workload/benchmark_velox/params.yaml.template b/tools/workload/benchmark_velox/params.yaml.template index 285afb11cb7c7..e206cc3b8ccf5 100644 --- a/tools/workload/benchmark_velox/params.yaml.template +++ b/tools/workload/benchmark_velox/params.yaml.template @@ -2,7 +2,7 @@ gluten_home: /home/sparkuser/gluten # Local path to gluten jar. -gluten_target_jar: /home/sparkuser/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.3.0-SNAPSHOT.jar +gluten_target_jar: /home/sparkuser/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.4.0-SNAPSHOT.jar # Spark app master. master: yarn diff --git a/tools/workload/benchmark_velox/sample/tpch_q1.html b/tools/workload/benchmark_velox/sample/tpch_q1.html index c401c35ec87cd..7782ca1982887 100644 --- a/tools/workload/benchmark_velox/sample/tpch_q1.html +++ b/tools/workload/benchmark_velox/sample/tpch_q1.html @@ -16208,7 +16208,7 @@

Config compare\n", " \n", " spark.executor.extraClassPath\n", - " file:///data0/home/sparkuser/jars/6600a164407ae0e4f5ea5b33dc4b902f23a27730/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.3.0-snapshot.jar\n", + " file:///data0/home/sparkuser/jars/6600a164407ae0e4f5ea5b33dc4b902f23a27730/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.4.0-snapshot.jar\n", " \n", " False\n", " \n", @@ -4673,7 +4673,7 @@ " 0851_0048 \\\n", "callSite.short collect at /tmp/ipykernel_265482/1936321720.py:117 \n", "spark.app.submitTime 1733464301669 \n", - "spark.executor.extraClassPath file:///data0/home/sparkuser/jars/6600a164407ae0e4f5ea5b33dc4b902f23a27730/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.3.0-snapshot.jar \n", + "spark.executor.extraClassPath file:///data0/home/sparkuser/jars/6600a164407ae0e4f5ea5b33dc4b902f23a27730/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.4.0-snapshot.jar \n", "spark.executor.extraJavaOptions -xx:+ignoreunrecognizedvmoptions --add-opens=java.base/java.lang=all-unnamed --add-opens=java.base/java.lang.invoke=all-unnamed --add-opens=java.base/java.lang.reflect=all-unnamed --add-opens=java.base/java.io=all-unnamed --add-opens=java.base/java.net=all-unnamed --add-opens=java.base/java.nio=all-unnamed --add-opens=java.base/java.util=all-unnamed --add-opens=java.base/java.util.concurrent=all-unnamed --add-opens=java.base/java.util.concurrent.atomic=all-unnamed --add-opens=java.base/sun.nio.ch=all-unnamed --add-opens=java.base/sun.nio.cs=all-unnamed --add-opens=java.base/sun.security.action=all-unnamed --add-opens=java.base/sun.util.calendar=all-unnamed --add-opens=java.security.jgss/sun.security.krb5=all-unnamed -xx:+useparalleloldgc -xx:parallelgcthreads=2 -xx:newratio=1 -xx:survivorratio=1 -xx:+usecompressedoops -verbose:gc -xx:+printgcdetails -xx:+printgctimestamps -xx:errorfile=/home/sparkuser/logs/java/hs_err_pid%p.log \n", "spark.executor.memory 10944m \n", "spark.gluten.memory.conservative.task.offHeap.size.in.bytes 10041163776 \n", @@ -4981,4 +4981,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/tools/workload/benchmark_velox/tpc_workload.ipynb b/tools/workload/benchmark_velox/tpc_workload.ipynb index 3f6a24e9b1c8c..601deb8eb1173 100644 --- a/tools/workload/benchmark_velox/tpc_workload.ipynb +++ b/tools/workload/benchmark_velox/tpc_workload.ipynb @@ -21,7 +21,7 @@ "gluten_home='/home/sparkuser/gluten'\n", "\n", "# Local path to gluten jar.\n", - "gluten_target_jar='/home/sparkuser/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.3.0-SNAPSHOT.jar'\n", + "gluten_target_jar='/home/sparkuser/gluten-velox-bundle-spark3.3_2.12-centos_7_x86_64-1.4.0-SNAPSHOT.jar'\n", "\n", "# Spark app master. e.g. 'yarn'\n", "master='yarn'\n", diff --git a/tools/workload/tpcds/run_tpcds/run_tpcds.sh b/tools/workload/tpcds/run_tpcds/run_tpcds.sh index 813184307eee5..cc7aec34f2101 100755 --- a/tools/workload/tpcds/run_tpcds/run_tpcds.sh +++ b/tools/workload/tpcds/run_tpcds/run_tpcds.sh @@ -37,4 +37,4 @@ cat tpcds_parquet.scala | ${SPARK_HOME}/bin/spark-shell \ # e.g. # --conf spark.gluten.loadLibFromJar=true \ # --jars /PATH_TO_GLUTEN_HOME/package/target/thirdparty-lib/gluten-thirdparty-lib-ubuntu-22.04-x86_64.jar, - # /PATH_TO_GLUTEN_HOME/package/target/gluten-velox-bundle-spark3.3_2.12-ubuntu_22.04_x86_64-1.3.0-SNAPSHOT.jar + # /PATH_TO_GLUTEN_HOME/package/target/gluten-velox-bundle-spark3.3_2.12-ubuntu_22.04_x86_64-1.x.x-SNAPSHOT.jar