diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala index 8fddfc02672a..e866a853a210 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala @@ -1423,7 +1423,8 @@ abstract class DeltaInsertIntoTests( } } - test("insertInto: Timestamp No Timezone round trips across timezones") { + // Cast from TIMESTAMP_NTZ to TIMESTAMP has not been supported. + ignore("insertInto: Timestamp No Timezone round trips across timezones") { val t1 = "timestamp_ntz" withTable(t1) { withTimeZone("GMT-8") { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 1676e91d179f..c2063910ca98 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -109,6 +109,7 @@ object VeloxValidatorApi { StringType | BinaryType | _: DecimalType | DateType | TimestampType | YearMonthIntervalType.DEFAULT | NullType => true + case other if other.typeName == "timestamp_ntz" => true case _ => false } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 2354ebf39faf..e3a207b99ea4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -50,6 +50,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas case _: DoubleType => case _: StringType => case _: TimestampType => + case other if other.typeName == "timestamp_ntz" => case _: DateType => case _: BinaryType => case _: DecimalType => diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala index 29b047095135..95dfd4eaf773 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala @@ -272,7 +272,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl } } - test("fallback with index based schema evolution") { + testWithMinSparkVersion("fallback with index based schema evolution", "3.4") { val query = "SELECT c2 FROM test" Seq("parquet", "orc").foreach { format => @@ -295,9 +295,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl runQueryAndCompare(query) { df => val plan = df.queryExecution.executedPlan - val fallback = parquetUseColumnNames == "false" || - orcUseColumnNames == "false" - assert(collect(plan) { case g: GlutenPlan => g }.isEmpty == fallback) + assert(collect(plan) { case g: GlutenPlan => g }.nonEmpty) } } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala index a5e814dc0fcb..107e7aefab0c 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.config.GlutenConfig import org.apache.spark.SparkConf +import org.apache.spark.sql.Row import java.io.File @@ -465,17 +466,17 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit } } - testWithMinSparkVersion("Fallback for TimestampNTZ type scan", "3.4") { + testWithMinSparkVersion("TimestampNTZ type scan", "3.4") { withTempDir { dir => val path = new File(dir, "ntz_data").toURI.getPath val inputDf = spark.sql("SELECT CAST('2024-01-01 00:00:00' AS TIMESTAMP_NTZ) AS ts_ntz") inputDf.write.format("parquet").save(path) - val df = spark.read.format("parquet").load(path) + val df = spark.read.parquet(path) val executedPlan = getExecutedPlan(df) - assert(!executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer])) - checkAnswer(df, inputDf) + assert(executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer])) + checkAnswer(df, Seq(Row(java.time.LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0)))) } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala index d120842fd292..36557045dbdd 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.functions -import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.execution.{BatchScanExecTransformer, ProjectExecTransformer} import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.types.Decimal @@ -489,4 +489,44 @@ class DateFunctionsValidateSuite extends FunctionsValidateSuite { } } } + + testWithMinSparkVersion("read as timestamp_ntz", "3.4") { + val inputs: Seq[String] = Seq( + "1970-01-01", + "1970-01-01 00:00:00-02:00", + "1970-01-01 00:00:00 +02:00", + "2000-01-01", + "1970-01-01 00:00:00", + "2000-01-01 12:21:56", + "2015-03-18T12:03:17Z", + "2015-03-18 12:03:17", + "2015-03-18T12:03:17", + "2015-03-18 12:03:17.123", + "2015-03-18T12:03:17.123", + "2015-03-18T12:03:17.456", + "2015-03-18 12:03:17.456" + ) + + withTempPath { + dir => + val path = dir.getAbsolutePath + val inputDF = spark.createDataset(inputs).toDF("input") + val df = inputDF.selectExpr("cast(input as timestamp_ntz) as ts") + df.coalesce(1).write.mode("overwrite").parquet(path) + val readDf = spark.read.parquet(path) + readDf.createOrReplaceTempView("view") + + runQueryAndCompare("select * from view") { + checkGlutenPlan[BatchScanExecTransformer] + } + + // Ensures the fallback of unsupported function works. + runQueryAndCompare("select hour(ts) from view") { + df => + assert(collect(df.queryExecution.executedPlan) { + case p if p.isInstanceOf[ProjectExec] => p + }.nonEmpty) + } + } + } } diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 4738d2e3a6a5..41af7615cc24 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -39,6 +39,7 @@ #include "jni/JniFileSystem.h" #include "memory/GlutenBufferedInputBuilder.h" #include "operators/functions/SparkExprToSubfieldFilterParser.h" +#include "operators/plannodes/RowVectorStream.h" #include "shuffle/ArrowShuffleDictionaryWriter.h" #include "udf/UdfLoader.h" #include "utils/Exception.h" @@ -47,7 +48,6 @@ #include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" -#include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -56,6 +56,7 @@ #include "velox/dwio/orc/reader/OrcReader.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/functions/sparksql/types/TimestampNTZRegistration.h" #include "velox/serializers/PrestoSerializer.h" DECLARE_bool(velox_exception_user_stacktrace_enabled); @@ -195,6 +196,7 @@ void VeloxBackend::init( velox::orc::registerOrcReaderFactory(); velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique()); velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared()); + velox::functions::sparksql::registerTimestampNTZType(); // Register Velox functions registerAllFunctions(); @@ -318,13 +320,13 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); - + // Register value-stream connector for runtime iterator-based inputs auto valueStreamDynamicFilterEnabled = backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); velox::connector::registerConnector( std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); - + #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { diff --git a/cpp/velox/substrait/SubstraitParser.cc b/cpp/velox/substrait/SubstraitParser.cc index c67ad56f0932..8cf941a8bc02 100644 --- a/cpp/velox/substrait/SubstraitParser.cc +++ b/cpp/velox/substrait/SubstraitParser.cc @@ -17,9 +17,9 @@ #include "SubstraitParser.h" #include "TypeUtils.h" -#include "velox/common/base/Exceptions.h" - #include "VeloxSubstraitSignature.h" +#include "velox/common/base/Exceptions.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" namespace gluten { @@ -78,6 +78,8 @@ TypePtr SubstraitParser::parseType(const ::substrait::Type& substraitType, bool return DATE(); case ::substrait::Type::KindCase::kTimestampTz: return TIMESTAMP(); + case ::substrait::Type::KindCase::kTimestamp: + return facebook::velox::functions::sparksql::TIMESTAMP_NTZ(); case ::substrait::Type::KindCase::kDecimal: { auto precision = substraitType.decimal().precision(); auto scale = substraitType.decimal().scale(); @@ -356,6 +358,9 @@ int64_t SubstraitParser::getLiteralValue(const ::substrait::Expression::Literal& memcpy(&decimalValue, decimal.c_str(), 16); return static_cast(decimalValue); } + if (literal.has_timestamp()) { + return literal.timestamp(); + } return literal.i64(); } diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc b/cpp/velox/substrait/SubstraitToVeloxExpr.cc index 467df25ca881..f19ff0806be1 100755 --- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc +++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc @@ -17,11 +17,11 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" +#include "velox/type/Timestamp.h" #include "velox/vector/FlatVector.h" #include "velox/vector/VariantToVector.h" -#include "velox/type/Timestamp.h" - using namespace facebook::velox; namespace { @@ -133,6 +133,8 @@ TypePtr getScalarType(const ::substrait::Expression::Literal& literal) { return DATE(); case ::substrait::Expression_Literal::LiteralTypeCase::kTimestampTz: return TIMESTAMP(); + case ::substrait::Expression_Literal::LiteralTypeCase::kTimestamp: + return facebook::velox::functions::sparksql::TIMESTAMP_NTZ(); case ::substrait::Expression_Literal::LiteralTypeCase::kString: return VARCHAR(); case ::substrait::Expression_Literal::LiteralTypeCase::kVarChar: diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index adb7fc5f45b6..ad9f72491967 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -24,10 +24,12 @@ #include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/exec/TableWriter.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" #include "velox/type/Type.h" #include "utils/ConfigExtractor.h" #include "utils/ObjectStore.h" +#include "utils/VeloxArrowUtils.h" #include "utils/VeloxWriterUtils.h" #include "config.pb.h" @@ -1497,6 +1499,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // The columns present in the table, if not available default to the baseSchema. auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + // Spark's TimestampNTZ type is stored as TIMESTAMP in file. + if (tableSchema) { + tableSchema = asRowType(replaceTimestampNTZ(tableSchema, TIMESTAMP())); + } + connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; auto connectorId = kHiveConnectorId; diff --git a/cpp/velox/utils/VeloxArrowUtils.cc b/cpp/velox/utils/VeloxArrowUtils.cc index 7c9583562571..838132d192b8 100644 --- a/cpp/velox/utils/VeloxArrowUtils.cc +++ b/cpp/velox/utils/VeloxArrowUtils.cc @@ -21,6 +21,7 @@ #include "memory/VeloxColumnarBatch.h" #include "utils/Common.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/arrow/Bridge.h" @@ -28,13 +29,48 @@ namespace gluten { using namespace facebook; +velox::TypePtr replaceTimestampNTZ(const velox::TypePtr& type, const velox::TypePtr& replacementType) { + if (velox::functions::sparksql::isTimestampNTZType(type)) { + return replacementType; + } + + switch (type->kind()) { + case velox::TypeKind::ROW: { + auto rowType = velox::asRowType(type); + std::vector names = rowType->names(); + std::vector types; + for (size_t i = 0; i < rowType->size(); ++i) { + types.push_back(replaceTimestampNTZ(rowType->childAt(i), replacementType)); + } + return ROW(std::move(names), std::move(types)); + } + case velox::TypeKind::ARRAY: { + auto arrayType = std::dynamic_pointer_cast(type); + auto rewrittenElement = replaceTimestampNTZ(arrayType->elementType(), replacementType); + return ARRAY(std::move(rewrittenElement)); + } + case velox::TypeKind::MAP: { + auto mapType = std::dynamic_pointer_cast(type); + auto rewrittenKey = replaceTimestampNTZ(mapType->keyType(), replacementType); + auto rewrittenValue = replaceTimestampNTZ(mapType->valueType(), replacementType); + return MAP(std::move(rewrittenKey), std::move(rewrittenValue)); + } + default: + return type; + } +} + void toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool, struct ArrowSchema* out) { exportToArrow(velox::BaseVector::create(rowType, 0, pool), *out, ArrowUtils::getBridgeOptions()); } std::shared_ptr toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool) { ArrowSchema arrowSchema; - toArrowSchema(rowType, pool, &arrowSchema); + // Arrow does not provide a standard representation for TimestampNTZ type. + // Use BIGINT type to ensure the correct byte size. + velox::TypePtr tableSchema = replaceTimestampNTZ(rowType, velox::BIGINT()); + + toArrowSchema(tableSchema, pool, &arrowSchema); GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema)); return outputSchema; } diff --git a/cpp/velox/utils/VeloxArrowUtils.h b/cpp/velox/utils/VeloxArrowUtils.h index bacfbbd2b8ca..d2f3a21622e5 100644 --- a/cpp/velox/utils/VeloxArrowUtils.h +++ b/cpp/velox/utils/VeloxArrowUtils.h @@ -51,6 +51,11 @@ std::shared_ptr toArrowSchema( const facebook::velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool); +// Rewrites TimestampNTZ to replacementType recursively in Row/Array/Map types. +facebook::velox::TypePtr replaceTimestampNTZ( + const facebook::velox::TypePtr& type, + const facebook::velox::TypePtr& replacementType); + facebook::velox::TypePtr fromArrowSchema(const std::shared_ptr& schema); arrow::Result> toArrowBuffer(facebook::velox::BufferPtr buffer, arrow::MemoryPool* pool); diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 1e3f71c3582e..b9a3e632feb3 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,8 +17,8 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_03_24 +VELOX_REPO=https://github.com/rui-mo/velox-dev.git +VELOX_BRANCH=ts_ntz_gluten VELOX_ENHANCED_BRANCH=ibm-2026_03_24 VELOX_HOME="" RUN_SETUP_SCRIPT=ON diff --git a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala index 0e2ce5e30931..2537d1c1bd59 100644 --- a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala @@ -51,7 +51,9 @@ object SparkArrowUtil { new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC") } case dt if dt.catalogString == "timestamp_ntz" => - new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) + // TODO: There is no standard representation for this type in Arrow. + // Use bigint to ensure the correct byte size. + new ArrowType.Int(8 * 8, true) case YearMonthIntervalType.DEFAULT => new ArrowType.Interval(IntervalUnit.YEAR_MONTH) case _: ArrayType => ArrowType.List.INSTANCE @@ -74,17 +76,6 @@ object SparkArrowUtil { case ArrowType.Binary.INSTANCE => BinaryType case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType - case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND && ts.getTimezone == null => - // TimestampNTZType is only available in Spark 3.4+ - try { - Class - .forName("org.apache.spark.sql.types.TimestampNTZType$") - .getField("MODULE$") - .get(null) - .asInstanceOf[DataType] - } catch { - case _: ClassNotFoundException => TimestampType - } case _: ArrowType.Timestamp => TimestampType case interval: ArrowType.Interval if interval.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType.DEFAULT @@ -168,8 +159,6 @@ object SparkArrowUtil { }.asJava) } - // TimestampNTZ is not supported for native computation, but the Arrow type mapping is needed - // for row-to-columnar transitions when the fallback validator tags NTZ operators. def checkSchema(schema: StructType): Boolean = { try { SparkSchemaUtil.toArrowSchema(schema) diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 8b4d7b374d5b..4d845e89298d 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -340,13 +340,13 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { // TIMESTAMP_NTZ was introduced in Spark 3.4 / Delta 2.4 testWithMinSparkVersion( - "delta: create table with TIMESTAMP_NTZ should fallback and return correct results", + "delta: create table with TIMESTAMP_NTZ and return correct results", "3.4") { withTable("delta_ntz") { spark.sql("CREATE TABLE delta_ntz(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA") spark.sql("""INSERT INTO delta_ntz VALUES |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) - val df = runQueryAndCompare("select * from delta_ntz", noFallBack = false) { _ => } + val df = runQueryAndCompare("select * from delta_ntz") { _ => } checkAnswer( df, Row( diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java new file mode 100644 index 000000000000..1475378a4a3c --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.expression; + +import org.apache.gluten.substrait.type.TimestampTypeNode; +import org.apache.gluten.substrait.type.TypeNode; + +import io.substrait.proto.Expression.Literal.Builder; + +public class TimestampNTZLiteralNode extends LiteralNodeWithValue { + public TimestampNTZLiteralNode(Long value) { + super(value, new TimestampTypeNode(true)); + } + + public TimestampNTZLiteralNode(Long value, TypeNode typeNode) { + super(value, typeNode); + } + + @Override + protected void updateLiteralBuilder(Builder literalBuilder, Long value) { + literalBuilder.setTimestamp(value); + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java new file mode 100644 index 000000000000..83d27cfb09f2 --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.type; + +import io.substrait.proto.Type; + +public class TimestampNTZTypeNode extends TypeNode { + + public TimestampNTZTypeNode(Boolean nullable) { + super(nullable); + } + + @Override + public Type toProtobuf() { + Type.Timestamp.Builder timestampBuilder = Type.Timestamp.newBuilder(); + if (nullable) { + timestampBuilder.setNullability(Type.Nullability.NULLABILITY_NULLABLE); + } else { + timestampBuilder.setNullability(Type.Nullability.NULLABILITY_REQUIRED); + } + + Type.Builder builder = Type.newBuilder(); + builder.setTimestamp(timestampBuilder.build()); + return builder.build(); + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java index 28cb10be27d6..bd7e3566eb5f 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java @@ -81,6 +81,10 @@ public static TypeNode makeTimestamp(Boolean nullable) { return new TimestampTypeNode(nullable); } + public static TypeNode makeTimestampNTZ(Boolean nullable) { + return new TimestampNTZTypeNode(nullable); + } + public static TypeNode makeStruct(Boolean nullable, List types, List names) { return new StructNode(nullable, types, names); } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala index 6db1f188d8bc..4ca05105580d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala @@ -160,6 +160,19 @@ object ConverterUtils extends Logging { (StringType, isNullable(substraitType.getString.getNullability)) case Type.KindCase.BINARY => (BinaryType, isNullable(substraitType.getBinary.getNullability)) + case Type.KindCase.TIMESTAMP => + try { + ( + Class + .forName("org.apache.spark.sql.types.TimestampNTZType$") + .getField("MODULE$") + .get(null) + .asInstanceOf[DataType], + isNullable(substraitType.getTimestamp.getNullability)) + } catch { + case _: ClassNotFoundException => + throw new GlutenNotSupportException(s"Type $substraitType not supported.") + } case Type.KindCase.TIMESTAMP_TZ => (TimestampType, isNullable(substraitType.getTimestampTz.getNullability)) case Type.KindCase.DATE => @@ -226,6 +239,8 @@ object ConverterUtils extends Logging { TypeBuilder.makeDecimal(nullable, precision, scale) case TimestampType => TypeBuilder.makeTimestamp(nullable) + case other if other.typeName == "timestamp_ntz" => + TypeBuilder.makeTimestampNTZ(nullable) case m: MapType => TypeBuilder.makeMap( nullable, @@ -399,6 +414,7 @@ object ConverterUtils extends Logging { case DoubleType => "fp64" case DateType => "date" case TimestampType => "ts" + case other if other.typeName == "timestamp_ntz" => "ts_ntz" case StringType => "str" case BinaryType => "vbin" case DecimalType() => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 714821436584..6d3933f481d5 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -227,9 +227,15 @@ object Validators { case mt: MapType => containsNTZ(mt.keyType) || containsNTZ(mt.valueType) case _ => false } + val isScan = plan match { + case _: BatchScanExec => true + case _: FileSourceScanExec => true + case p if HiveTableScanExecTransformer.isHiveTableScan(p) => true + case _ => false + } val hasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) || plan.children.exists(_.output.exists(a => containsNTZ(a.dataType))) - if (hasNTZ) { + if (!isScan && hasNTZ) { fail(s"${plan.nodeName} has TimestampNTZType in input/output schema") } else { pass() diff --git a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala index a87e2f5431ff..1405a5eea6ad 100644 --- a/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala +++ b/gluten-ut/common/src/test/scala/org/apache/spark/sql/GlutenSQLTestsBaseTrait.scala @@ -94,6 +94,8 @@ object GlutenSQLTestsBaseTrait { .set("spark.sql.warehouse.dir", warehouse) .set("spark.ui.enabled", "false") .set(GlutenConfig.GLUTEN_UI_ENABLED.key, "false") + .set("spark.driver.host", "127.0.0.1") + .set("spark.driver.bindAddress", "127.0.0.1") // Avoid static evaluation by spark catalyst. But there are some UT issues // coming from spark, e.g., expecting SparkException is thrown, but the wrapped // exception is thrown. diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 9c51dee8a6f3..56c5e100ee9d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -78,6 +78,8 @@ class VeloxTestSettings extends BackendTestSettings { // NEW SUITE: disable as it expects exception which doesn't happen when offloaded to gluten .exclude( "INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("UNSUPPORTED_FEATURE - SPARK-36346: can't read Timestamp as TimestampNTZ") enableSuite[GlutenQueryParsingErrorsSuite] enableSuite[GlutenAnsiCastSuiteWithAnsiModeOff] .exclude( @@ -463,6 +465,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV2QuerySuite] // Unsupport spark.sql.files.ignoreCorruptFiles. .exclude("Enabling/disabling ignoreCorruptFiles") @@ -471,6 +475,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] @@ -618,6 +624,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("InMemoryRelation statistics") // Extra ColumnarToRow is needed to transform vanilla columnar data to gluten columnar data. .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan") + // Rewrite for different cache size. + .exclude("SPARK-36120: Support cache/uncache table with TimestampNTZ type") enableSuite[GlutenFileSourceCharVarcharTestSuite] // Following test is excluded as it is overridden in Gluten test suite.. // The overridden tests assert against Velox-specific error messages for char/varchar diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala index f8b61bed484e..0f8ffbb6872f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala @@ -22,6 +22,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation +import java.time.LocalDateTime + class GlutenCachedTableSuite extends CachedTableSuite with GlutenSQLTestsTrait @@ -40,4 +42,17 @@ class GlutenCachedTableSuite assert(cached.stats.sizeInBytes === 1132) } } + + testGluten("SPARK-36120: Support cache/uncache table with TimestampNTZ type") { + val tableName = "ntzCache" + withTable(tableName) { + sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") + checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) + spark.table(tableName).queryExecution.withCachedData.collect { + case cached: InMemoryRelation => + assert(cached.stats.sizeInBytes === 52) + } + sql(s"UNCACHE TABLE $tableName") + } + } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index c5455b6c6b40..0fbf7ef69f16 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -85,6 +85,8 @@ class VeloxTestSettings extends BackendTestSettings { "INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates") // Doesn't support unhex with failOnError=true. .exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("UNSUPPORTED_FEATURE - SPARK-36346: can't read Timestamp as TimestampNTZ") enableSuite[GlutenQueryParsingErrorsSuite] enableSuite[GlutenArithmeticExpressionSuite] .exclude("SPARK-45786: Decimal multiply, divide, remainder, quot") @@ -470,6 +472,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV2QuerySuite] .exclude("row group skipping doesn't overflow when reading into larger type") // Unsupport spark.sql.files.ignoreCorruptFiles. @@ -479,6 +483,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] @@ -504,6 +510,8 @@ class VeloxTestSettings extends BackendTestSettings { // file:/opt/spark331/sql/core/src/test/resources/test-data/timestamp-nanos.parquet // May require for newer spark.test.home .excludeByPrefix("SPARK-40819") + // Different exceptions between Spark and Velox. + .exclude("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") enableSuite[GlutenParquetThriftCompatibilitySuite] // Rewrite for file locating. .exclude("Read Parquet file generated by parquet-thrift") @@ -651,6 +659,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("InMemoryRelation statistics") // Extra ColumnarToRow is needed to transform vanilla columnar data to gluten columnar data. .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan") + // Rewrite for different cache size. + .exclude("SPARK-36120: Support cache/uncache table with TimestampNTZ type") enableSuite[GlutenFileSourceCharVarcharTestSuite] .exclude("length check for input string values: nested in array") .exclude("length check for input string values: nested in array") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala index f8b61bed484e..0f8ffbb6872f 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala @@ -22,6 +22,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation +import java.time.LocalDateTime + class GlutenCachedTableSuite extends CachedTableSuite with GlutenSQLTestsTrait @@ -40,4 +42,17 @@ class GlutenCachedTableSuite assert(cached.stats.sizeInBytes === 1132) } } + + testGluten("SPARK-36120: Support cache/uncache table with TimestampNTZ type") { + val tableName = "ntzCache" + withTable(tableName) { + sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") + checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) + spark.table(tableName).queryExecution.withCachedData.collect { + case cached: InMemoryRelation => + assert(cached.stats.sizeInBytes === 52) + } + sql(s"UNCACHE TABLE $tableName") + } + } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 6d30450e626c..4eecb9cf36df 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -85,6 +85,8 @@ class VeloxTestSettings extends BackendTestSettings { "INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates") // Doesn't support unhex with failOnError=true. .exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("UNSUPPORTED_FEATURE - SPARK-36346: can't read Timestamp as TimestampNTZ") enableSuite[GlutenQueryParsingErrorsSuite] enableSuite[GlutenArithmeticExpressionSuite] .exclude("SPARK-45786: Decimal multiply, divide, remainder, quot") @@ -413,6 +415,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV2QuerySuite] .exclude("row group skipping doesn't overflow when reading into larger type") // Unsupport spark.sql.files.ignoreCorruptFiles. @@ -422,6 +426,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] @@ -447,6 +453,8 @@ class VeloxTestSettings extends BackendTestSettings { // file:/opt/spark331/sql/core/src/test/resources/test-data/timestamp-nanos.parquet // May require for newer spark.test.home .excludeByPrefix("SPARK-40819") + // Different exceptions between Spark and Velox. + .exclude("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") enableSuite[GlutenParquetThriftCompatibilitySuite] // Rewrite for file locating. .exclude("Read Parquet file generated by parquet-thrift") @@ -604,6 +612,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("InMemoryRelation statistics") // Extra ColumnarToRow is needed to transform vanilla columnar data to gluten columnar data. .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan") + // Rewrite for different cache size. + .exclude("SPARK-36120: Support cache/uncache table with TimestampNTZ type") enableSuite[GlutenFileSourceCharVarcharTestSuite] .exclude("length check for input string values: nested in array") .exclude("length check for input string values: nested in array") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala index 0afabae6e5fd..3244f8cc2e16 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.internal.SQLConf +import java.time.LocalDateTime + class GlutenCachedTableSuite extends CachedTableSuite with GlutenSQLTestsTrait @@ -155,4 +157,17 @@ class GlutenCachedTableSuite uncacheTable("t2") } } + + testGluten("SPARK-36120: Support cache/uncache table with TimestampNTZ type") { + val tableName = "ntzCache" + withTable(tableName) { + sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") + checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) + spark.table(tableName).queryExecution.withCachedData.collect { + case cached: InMemoryRelation => + assert(cached.stats.sizeInBytes === 52) + } + sql(s"UNCACHE TABLE $tableName") + } + } } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index d0716932b756..75bfab56c574 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -271,6 +271,8 @@ class VeloxTestSettings extends BackendTestSettings { "INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates") // Doesn't support unhex with failOnError=true. .exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("UNSUPPORTED_FEATURE - SPARK-36346: can't read Timestamp as TimestampNTZ") enableSuite[GlutenQueryParsingErrorsSuite] enableSuite[GlutenQueryContextSuite] enableSuite[GlutenQueryExecutionAnsiErrorsSuite] @@ -597,6 +599,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV2QuerySuite] .exclude("row group skipping doesn't overflow when reading into larger type") // Unsupport spark.sql.files.ignoreCorruptFiles. @@ -606,6 +610,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] @@ -631,6 +637,8 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-40819") .excludeByPrefix("SPARK-46056") // TODO: fix in Spark-4.0 .exclude("CANNOT_MERGE_SCHEMAS: Failed merging schemas") + // Different exceptions between Spark and Velox. + .exclude("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") enableSuite[GlutenParquetThriftCompatibilitySuite] // Rewrite for file locating. .exclude("Read Parquet file generated by parquet-thrift") @@ -873,6 +881,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan") // Rewritten because native raise_error throws Spark exception .exclude("SPARK-52684: Atomicity of cache table on error") + // Rewrite for different cache size. + .exclude("SPARK-36120: Support cache/uncache table with TimestampNTZ type") enableSuite[GlutenFileSourceCharVarcharTestSuite] enableSuite[GlutenDSV2CharVarcharTestSuite] enableSuite[GlutenColumnExpressionSuite] diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala index f43881265940..0124a5c1de8c 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.internal.SQLConf +import java.time.LocalDateTime + class GlutenCachedTableSuite extends CachedTableSuite with GlutenSQLTestsTrait @@ -164,4 +166,17 @@ class GlutenCachedTableSuite uncacheTable("t2") } } + + testGluten("SPARK-36120: Support cache/uncache table with TimestampNTZ type") { + val tableName = "ntzCache" + withTable(tableName) { + sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") + checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) + spark.table(tableName).queryExecution.withCachedData.collect { + case cached: InMemoryRelation => + assert(cached.stats.sizeInBytes === 52) + } + sql(s"UNCACHE TABLE $tableName") + } + } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 47a1ff3d66e7..e31eb4ef0a69 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -282,6 +282,8 @@ class VeloxTestSettings extends BackendTestSettings { "INCONSISTENT_BEHAVIOR_CROSS_VERSION: compatibility with Spark 2.4/3.2 in reading/writing dates") // Doesn't support unhex with failOnError=true. .exclude("CONVERSION_INVALID_INPUT: to_binary conversion function hex") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("UNSUPPORTED_FEATURE - SPARK-36346: can't read Timestamp as TimestampNTZ") enableSuite[GlutenQueryParsingErrorsSuite] enableSuite[GlutenQueryContextSuite] enableSuite[GlutenQueryExecutionAnsiErrorsSuite] @@ -564,6 +566,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV2QuerySuite] .exclude("row group skipping doesn't overflow when reading into larger type") // Unsupport spark.sql.files.ignoreCorruptFiles. @@ -573,6 +577,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite because the filter after datasource is not needed. .exclude( "SPARK-26677: negated null-safe equality comparison should not filter matched row groups") + // Velox currently does not distinguish `isAdjustedToUTC` in Parquet. + .exclude("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") enableSuite[GlutenParquetV1SchemaPruningSuite] enableSuite[GlutenParquetV2SchemaPruningSuite] enableSuite[GlutenParquetRebaseDatetimeV1Suite] @@ -598,6 +604,8 @@ class VeloxTestSettings extends BackendTestSettings { .excludeByPrefix("SPARK-40819") .excludeByPrefix("SPARK-46056") // TODO: fix in Spark-4.0 .exclude("CANNOT_MERGE_SCHEMAS: Failed merging schemas") + // Different exceptions between Spark and Velox. + .exclude("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") enableSuite[GlutenParquetThriftCompatibilitySuite] // Rewrite for file locating. .exclude("Read Parquet file generated by parquet-thrift") @@ -847,6 +855,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on InMemoryTableScan") // Rewritten because native raise_error throws Spark exception .exclude("SPARK-52684: Atomicity of cache table on error") + // Rewrite for different cache size. + .exclude("SPARK-36120: Support cache/uncache table with TimestampNTZ type") enableSuite[GlutenCacheTableInKryoSuite] enableSuite[GlutenFileSourceCharVarcharTestSuite] enableSuite[GlutenDSV2CharVarcharTestSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala index f43881265940..0124a5c1de8c 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.internal.SQLConf +import java.time.LocalDateTime + class GlutenCachedTableSuite extends CachedTableSuite with GlutenSQLTestsTrait @@ -164,4 +166,17 @@ class GlutenCachedTableSuite uncacheTable("t2") } } + + testGluten("SPARK-36120: Support cache/uncache table with TimestampNTZ type") { + val tableName = "ntzCache" + withTable(tableName) { + sql(s"CACHE TABLE $tableName AS SELECT TIMESTAMP_NTZ'2021-01-01 00:00:00'") + checkAnswer(spark.table(tableName), Row(LocalDateTime.parse("2021-01-01T00:00:00"))) + spark.table(tableName).queryExecution.withCachedData.collect { + case cached: InMemoryRelation => + assert(cached.stats.sizeInBytes === 52) + } + sql(s"UNCACHE TABLE $tableName") + } + } }