|
16 | 16 | */ |
17 | 17 | package org.apache.gluten.functions |
18 | 18 |
|
19 | | -import org.apache.gluten.execution.ProjectExecTransformer |
| 19 | +import org.apache.gluten.execution.{BatchScanExecTransformer, ProjectExecTransformer} |
20 | 20 |
|
21 | 21 | import org.apache.spark.sql.execution.ProjectExec |
22 | | -import org.apache.spark.sql.types.Decimal |
| 22 | +import org.apache.spark.sql.internal.SQLConf.TimestampTypes |
| 23 | +import org.apache.spark.sql.types.{DataType, Decimal, StructType} |
23 | 24 |
|
24 | 25 | import java.sql.Timestamp |
25 | 26 |
|
@@ -489,4 +490,78 @@ class DateFunctionsValidateSuite extends FunctionsValidateSuite { |
489 | 490 | } |
490 | 491 | } |
491 | 492 | } |
| 493 | + |
| 494 | + testWithMinSparkVersion("cast string to timestamp_ntz", "3.4") { |
| 495 | + val inputs: Seq[String] = Seq( |
| 496 | + "1970-01-01", |
| 497 | + "1970-01-01 00:00:00-02:00", |
| 498 | + "1970-01-01 00:00:00 +02:00", |
| 499 | + "2000-01-01", |
| 500 | + "1970-01-01 00:00:00", |
| 501 | + "2000-01-01 12:21:56", |
| 502 | + "2015-03-18T12:03:17Z", |
| 503 | + "2015-03-18 12:03:17", |
| 504 | + "2015-03-18T12:03:17", |
| 505 | + "2015-03-18 12:03:17.123", |
| 506 | + "2015-03-18T12:03:17.123", |
| 507 | + "2015-03-18T12:03:17.456", |
| 508 | + "2015-03-18 12:03:17.456" |
| 509 | + ) |
| 510 | + |
| 511 | + inputs.foreach { |
| 512 | + s => |
| 513 | + val query = s"select cast('$s' as timestamp_ntz)" |
| 514 | + runQueryAndCompare(query) { |
| 515 | + checkGlutenPlan[ProjectExecTransformer] |
| 516 | + } |
| 517 | + } |
| 518 | + } |
| 519 | + |
| 520 | + testWithMinSparkVersion("read as timestamp_ntz", "3.4") { |
| 521 | + val inputs: Seq[String] = Seq( |
| 522 | + "1970-01-01", |
| 523 | + "1970-01-01 00:00:00-02:00", |
| 524 | + "1970-01-01 00:00:00 +02:00", |
| 525 | + "2000-01-01", |
| 526 | + "1970-01-01 00:00:00", |
| 527 | + "2000-01-01 12:21:56", |
| 528 | + "2015-03-18T12:03:17Z", |
| 529 | + "2015-03-18 12:03:17", |
| 530 | + "2015-03-18T12:03:17", |
| 531 | + "2015-03-18 12:03:17.123", |
| 532 | + "2015-03-18T12:03:17.123", |
| 533 | + "2015-03-18T12:03:17.456", |
| 534 | + "2015-03-18 12:03:17.456" |
| 535 | + ) |
| 536 | + |
| 537 | + withTempPath { |
| 538 | + dir => |
| 539 | + withSQLConf("spark.sql.timestampType" -> TimestampTypes.TIMESTAMP_NTZ.toString) { |
| 540 | + val path = dir.getAbsolutePath |
| 541 | + val inputDF = spark.createDataset(inputs).toDF("input") |
| 542 | + val df = inputDF.selectExpr("cast(input as timestamp_ntz) as ts") |
| 543 | + // TODO: The Parquet writer creates TIMESTAMP(MICROS,true), but for timestamp_ntz type, |
| 544 | + // the 'isAdjustedToUTC' should be false. Spark will fail to read this file as |
| 545 | + // timestamp_ntz values. |
| 546 | + df.coalesce(1).write.mode("overwrite").parquet(path) |
| 547 | + |
| 548 | + val dataType = Class |
| 549 | + .forName("org.apache.spark.sql.types.TimestampNTZType$") |
| 550 | + .getField("MODULE$") |
| 551 | + .get(null) |
| 552 | + .asInstanceOf[DataType] |
| 553 | + val schema = new StructType().add("ts", dataType) |
| 554 | + val readDf = spark.read.schema(schema).parquet(path) |
| 555 | + readDf.collect() |
| 556 | + assert( |
| 557 | + readDf.queryExecution.executedPlan.exists( |
| 558 | + f => f.isInstanceOf[BatchScanExecTransformer])) |
| 559 | + |
| 560 | + // Ensures the fallback of unsupported function works. |
| 561 | + readDf.createOrReplaceTempView("view") |
| 562 | + val testDf = spark.sql("select hour(ts) from view") |
| 563 | + testDf.collect() |
| 564 | + } |
| 565 | + } |
| 566 | + } |
492 | 567 | } |
0 commit comments