Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Support from_unixtime function #3176

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ static const std::unordered_set<std::string> kBlackList = {
"concat_ws",
"rand",
"json_array_length",
"from_unixtime",
"repeat",
"translate",
"add_months",
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

set -exu

VELOX_REPO=https://github.com/oap-project/velox.git
VELOX_BRANCH=main
VELOX_REPO=https://github.com/PHILO-HE/velox.git
VELOX_BRANCH=from_unixtime-oap
VELOX_HOME=""

#Set on run gluten on HDFS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ case class FromUnixTimeTransformer(
val secNode = sec.doTransform(args)
val formatNode = format.doTransform(args)

val dataTypes = if (timeZoneId != None) {
Seq(original.sec.dataType, original.format.dataType, StringType)
} else {
Seq(original.sec.dataType, original.format.dataType)
}
val dataTypes = Seq(original.sec.dataType, original.format.dataType)
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val functionId = ExpressionBuilder.newScalarFunction(
functionMap,
Expand All @@ -111,10 +107,6 @@ case class FromUnixTimeTransformer(
val expressionNodes = new java.util.ArrayList[ExpressionNode]()
expressionNodes.add(secNode)
expressionNodes.add(formatNode)
if (timeZoneId != None) {
expressionNodes.add(ExpressionBuilder.makeStringLiteral(timeZoneId.get))
}

val typeNode = ConverterUtils.getTypeNode(original.dataType, original.nullable)
ExpressionBuilder.makeScalarFunction(functionId, expressionNodes, typeNode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-34739,SPARK-35889: add a year-month interval to a timestamp")
.exclude("SPARK-34761,SPARK-35889: add a day-time interval to a timestamp")
.exclude("Gluten - TIMESTAMP_MICROS")
.exclude("Gluten - from_unixtime")
.exclude("Gluten - unix_timestamp")
.exclude("Gluten - to_unix_timestamp")
enableSuite[GlutenDecimalExpressionSuite].exclude("MakeDecimal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite because Spark collect causes long overflow.
.exclude("TIMESTAMP_MICROS")
// Replaced by a gluten test to pass timezone through config.
.exclude("from_unixtime")
// Replaced by a gluten test to pass timezone through config.
.exclude("unix_timestamp")
// Replaced by a gluten test to pass timezone through config.
.exclude("to_unix_timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.{GlutenTestConstants, GlutenTestsTrait}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
import org.apache.spark.sql.GlutenTestsTrait
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, TimeZoneUTC}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, IntegerType, StringType}
import org.apache.spark.sql.types.{DateType, IntegerType, LongType, StringType}
import org.apache.spark.unsafe.types.UTF8String

import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -60,7 +61,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
// checkResult(Int.MinValue.toLong - 100)
}

test(GlutenTestConstants.GLUTEN_TEST + "TIMESTAMP_MICROS") {
test(GLUTEN_TEST + "TIMESTAMP_MICROS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(MicrosToTimestamp(Literal(value)), value.longValue())
}
Expand Down Expand Up @@ -88,7 +89,78 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
"Europe/Brussels")
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)

test(GlutenTestConstants.GLUTEN_TEST + "unix_timestamp") {
val outstandingTimezonesIdsCityNameOnly: Seq[String] = Seq(
// Velox doesn't support timezones like UTC.
// "UTC",
"Africa/Dakar",
LA.getId,
"Asia/Urumqi",
"Asia/Hong_Kong",
"Europe/Brussels")
val outstandingZoneIdsCityNameOnly: Seq[ZoneId] =
outstandingTimezonesIdsCityNameOnly.map(getZoneId)

test(GLUTEN_TEST + "from_unixtime") {
withDefaultTimeZone(UTC) {
// Unlike original Spark test, we moved the below statement here to make timezone
// set in config.
// Velox's corresponding function doesn't support timezone form like "-08:00", so
// we only test outstandingZoneIdsCityNameOnly.
for (zid <- outstandingZoneIdsCityNameOnly) {
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
val timeZoneId = Option(zid.getId)
val tz = TimeZone.getTimeZone(zid)
sdf1.setTimeZone(tz)
sdf2.setTimeZone(tz)

checkEvaluation(
FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(0)))
checkEvaluation(
FromUnixTime(Literal(1000L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(1000000)))
checkEvaluation(
FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
sdf2.format(new Timestamp(-1000000)))
checkEvaluation(
FromUnixTime(
Literal.create(null, LongType),
Literal.create(null, StringType),
timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
null)

// SPARK-28072 The codegen path for non-literal input should also work
checkEvaluation(
expression = FromUnixTime(
BoundReference(ordinal = 0, dataType = LongType, nullable = true),
BoundReference(ordinal = 1, dataType = StringType, nullable = true),
timeZoneId),
expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
inputRow = InternalRow(0L, UTF8String.fromString(fmt1))
)
}
}
// Test escaping of format
GenerateUnsafeProjection.generate(FromUnixTime(Literal(0L), Literal("\""), UTC_OPT) :: Nil)
}
}
}

test(GLUTEN_TEST + "unix_timestamp") {
withDefaultTimeZone(UTC) {
for (zid <- outstandingZoneIds) {
Seq("legacy", "corrected").foreach {
Expand Down Expand Up @@ -189,7 +261,7 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
UnixTimestamp(Literal("2015-07-24"), Literal("\""), UTC_OPT) :: Nil)
}

test(GlutenTestConstants.GLUTEN_TEST + "to_unix_timestamp") {
test(GLUTEN_TEST + "to_unix_timestamp") {
withDefaultTimeZone(UTC) {
for (zid <- outstandingZoneIds) {
Seq("legacy", "corrected").foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SPARK-38195: add a quantity of interval units to a timestamp")
.exclude("SPARK-38284: difference between two timestamps in units")
.exclude("Gluten - TIMESTAMP_MICROS")
.exclude("Gluten - from_unixtime")
.exclude("Gluten - unix_timestamp")
.exclude("Gluten - to_unix_timestamp")
enableSuite[GlutenDecimalExpressionSuite].exclude("MakeDecimal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Has exception in fallback execution when we use resultDF.collect in evaluation.
.exclude("TIMESTAMP_MICROS")
// Replaced by a gluten test to pass timezone through config.
.exclude("from_unixtime")
// Replaced by a gluten test to pass timezone through config.
.exclude("unix_timestamp")
// Replaced by a gluten test to pass timezone through config.
.exclude("to_unix_timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,77 @@ class GlutenDateExpressionsSuite extends DateExpressionsSuite with GlutenTestsTr
"Europe/Brussels")
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(getZoneId)

val outstandingTimezonesIdsCityNameOnly: Seq[String] = Seq(
// Velox doesn't support timezones like UTC.
// "UTC",
"Africa/Dakar",
LA.getId,
"Asia/Urumqi",
"Asia/Hong_Kong",
"Europe/Brussels")
val outstandingZoneIdsCityNameOnly: Seq[ZoneId] =
outstandingTimezonesIdsCityNameOnly.map(getZoneId)

test(GLUTEN_TEST + "from_unixtime") {
withDefaultTimeZone(UTC) {
// Unlike original Spark test, we moved the below statement here to make timezone
// set in config.
// Velox's corresponding function doesn't support timezone form like "-08:00", so
// we only test outstandingZoneIdsCityNameOnly.
for (zid <- outstandingZoneIdsCityNameOnly) {
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy,
SQLConf.SESSION_LOCAL_TIMEZONE.key -> zid.getId) {
val fmt1 = "yyyy-MM-dd HH:mm:ss"
val sdf1 = new SimpleDateFormat(fmt1, Locale.US)
val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS"
val sdf2 = new SimpleDateFormat(fmt2, Locale.US)
val timeZoneId = Option(zid.getId)
val tz = TimeZone.getTimeZone(zid)
sdf1.setTimeZone(tz)
sdf2.setTimeZone(tz)

checkEvaluation(
FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(0)))
checkEvaluation(
FromUnixTime(Literal(1000L), Literal(fmt1), timeZoneId),
sdf1.format(new Timestamp(1000000)))
checkEvaluation(
FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId),
sdf2.format(new Timestamp(-1000000)))
checkEvaluation(
FromUnixTime(
Literal.create(null, LongType),
Literal.create(null, StringType),
timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId),
null)
checkEvaluation(
FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId),
null)

// SPARK-28072 The codegen path for non-literal input should also work
checkEvaluation(
expression = FromUnixTime(
BoundReference(ordinal = 0, dataType = LongType, nullable = true),
BoundReference(ordinal = 1, dataType = StringType, nullable = true),
timeZoneId),
expected = UTF8String.fromString(sdf1.format(new Timestamp(0))),
inputRow = InternalRow(0L, UTF8String.fromString(fmt1))
)
}
}
// Test escaping of format
GenerateUnsafeProjection.generate(FromUnixTime(Literal(0L), Literal("\""), UTC_OPT) :: Nil)
}
}
}

test(GLUTEN_TEST + "unix_timestamp") {
Seq("legacy", "corrected").foreach {
legacyParserPolicy =>
Expand Down