Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-pick upstream PR for concat_ws #433

Closed
wants to merge 10 commits into from
Closed
12 changes: 7 additions & 5 deletions velox/common/base/Exceptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ template <typename Exception, typename StringType>
static_assert(
!std::is_same_v<StringType, std::string>,
"BUG: we should not pass std::string by value to veloxCheckFail");
LOG(ERROR) << "Line: " << args.file << ":" << args.line
<< ", Function:" << args.function
<< ", Expression: " << args.expression << " " << s
<< ", Source: " << args.errorSource
<< ", ErrorCode: " << args.errorCode;
if constexpr (!std::is_same_v<Exception, VeloxUserError>) {
LOG(ERROR) << "Line: " << args.file << ":" << args.line
<< ", Function:" << args.function
<< ", Expression: " << args.expression << " " << s
<< ", Source: " << args.errorSource
<< ", ErrorCode: " << args.errorCode;
}

throw Exception(
args.file,
Expand Down
24 changes: 21 additions & 3 deletions velox/docs/functions/spark/math.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,29 @@ Mathematical Functions

.. spark:function:: rand() -> double

Returns a random value with independent and identically distributed uniformly distributed values in [0, 1). ::
Returns a random value with uniformly distributed values in [0, 1). ::

SELECT rand(); -- 0.9629742951434543
SELECT rand(0); -- 0.7604953758285915
SELECT rand(null); -- 0.7604953758285915

.. spark:function:: rand(seed, partitionIndex) -> double

Returns a random value with uniformly distributed values in [0, 1) using a seed formed
by combining user-specified ``seed`` and framework provided ``partitionIndex``. The
framework is responsible for deterministic partitioning of the data and assigning unique
``partitionIndex`` to each thread (in a deterministic way).
``seed`` must be constant. NULL ``seed`` is identical to zero ``seed``. ``partitionIndex``
cannot be NULL. ::

SELECT rand(0); -- 0.5488135024422883
SELECT rand(NULL); -- 0.5488135024422883

.. spark:function:: random() -> double

An alias for ``rand()``.

.. spark:function:: random(seed, partitionIndex) -> double

An alias for ``rand(seed, partitionIndex)``.

.. spark:function:: remainder(n, m) -> [same as n]

Expand Down
22 changes: 16 additions & 6 deletions velox/expression/CastExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ template <TypeKind ToKind, TypeKind FromKind, bool Truncate, bool AllowDecimal>
void applyCastKernel(
vector_size_t row,
const SimpleVector<typename TypeTraits<FromKind>::NativeType>* input,
FlatVector<typename TypeTraits<ToKind>::NativeType>* result) {
FlatVector<typename TypeTraits<ToKind>::NativeType>* result,
const std::string& sessionTzName) {
if constexpr (ToKind == TypeKind::VARCHAR || ToKind == TypeKind::VARBINARY) {
std::string output;
if (input->type()->isDecimal()) {
Expand All @@ -61,6 +62,11 @@ void applyCastKernel(
auto writer = exec::StringWriter<>(result, row);
writer.copy_from(output);
writer.finalize();
} else if constexpr (
FromKind == TypeKind::TIMESTAMP && ToKind == TypeKind::DATE) {
auto output = util::Converter<ToKind, void, Truncate, AllowDecimal>::cast(
input->valueAt(row), sessionTzName);
result->set(row, output);
} else {
if (input->type()->isDecimal()) {
auto output = util::Converter<ToKind, void, Truncate, AllowDecimal>::cast(
Expand Down Expand Up @@ -221,17 +227,21 @@ void applyCastPrimitives(
const auto& queryConfig = context.execCtx()->queryCtx()->queryConfig();
const bool isCastIntAllowDecimal = queryConfig.isCastIntAllowDecimal();
auto* inputSimpleVector = input.as<SimpleVector<From>>();
std::string sessionTzName = "";
if (queryConfig.adjustTimestampToTimezone()) {
sessionTzName = queryConfig.sessionTimezone();
}

if (!queryConfig.isCastToIntByTruncate()) {
context.applyToSelectedNoThrow(rows, [&](int row) {
try {
// Passing a false truncate flag
if (isCastIntAllowDecimal) {
applyCastKernel<ToKind, FromKind, false, true>(
row, inputSimpleVector, resultFlatVector);
row, inputSimpleVector, resultFlatVector, sessionTzName);
} else {
applyCastKernel<ToKind, FromKind, false, false>(
row, inputSimpleVector, resultFlatVector);
row, inputSimpleVector, resultFlatVector, sessionTzName);
}
} catch (const VeloxRuntimeError& re) {
VELOX_FAIL(
Expand All @@ -253,10 +263,10 @@ void applyCastPrimitives(
// Passing a true truncate flag
if (isCastIntAllowDecimal) {
applyCastKernel<ToKind, FromKind, true, true>(
row, inputSimpleVector, resultFlatVector);
row, inputSimpleVector, resultFlatVector, sessionTzName);
} else {
applyCastKernel<ToKind, FromKind, true, false>(
row, inputSimpleVector, resultFlatVector);
row, inputSimpleVector, resultFlatVector, sessionTzName);
}
} catch (const VeloxRuntimeError& re) {
VELOX_FAIL(
Expand All @@ -279,7 +289,7 @@ void applyCastPrimitives(
if constexpr (ToKind == TypeKind::TIMESTAMP) {
// If user explicitly asked us to adjust the timezone.
if (queryConfig.adjustTimestampToTimezone()) {
auto sessionTzName = queryConfig.sessionTimezone();
// auto sessionTzName = queryConfig.sessionTimezone();
if (!sessionTzName.empty()) {
// locate_zone throws runtime_error if the timezone couldn't be found
// (so we're safe to dereference the pointer).
Expand Down
1 change: 0 additions & 1 deletion velox/expression/ComplexWriterTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ class MapWriter {

std::tuple<PrimitiveWriter<K, false>, PrimitiveWriter<V>> operator[](
vector_size_t index) {
static_assert(std_interface, "operator [] not allowed for this map");
VELOX_DCHECK_LT(index, length_, "out of bound access");
return {
PrimitiveWriter<K, false>{keysVector_, innerOffset_ + index},
Expand Down
6 changes: 5 additions & 1 deletion velox/expression/tests/SparkExpressionFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ int main(int argc, char** argv) {
"chr",
"replace",
"might_contain",
"unix_timestamp"};
"unix_timestamp",
// Skip concat_ws as it triggers a test failure due to an incorrect
// expression generation from fuzzer:
// https://github.com/facebookincubator/velox/issues/6590
"concat_ws"};
return FuzzerRunner::run(
FLAGS_only, FLAGS_seed, skipFunctions, FLAGS_special_forms);
}
30 changes: 5 additions & 25 deletions velox/functions/lib/DateTimeFormatter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,6 @@ std::string padContent(
}
}

void validateTimePoint(const std::chrono::time_point<
std::chrono::system_clock,
std::chrono::milliseconds>& timePoint) {
// Due to the limit of std::chrono we can only represent time in
// [-32767-01-01, 32767-12-31] date range
const auto minTimePoint = date::sys_days{
date::year_month_day(date::year::min(), date::month(1), date::day(1))};
const auto maxTimePoint = date::sys_days{
date::year_month_day(date::year::max(), date::month(12), date::day(31))};
if (timePoint < minTimePoint || timePoint > maxTimePoint) {
VELOX_USER_FAIL(
"Cannot format time out of range of [{}-{}-{}, {}-{}-{}]",
(int)date::year::min(),
"01",
"01",
(int)date::year::max(),
"12",
"31");
}
}

size_t countOccurence(const std::string_view& base, const std::string& target) {
int occurrences = 0;
std::string::size_type pos = 0;
Expand Down Expand Up @@ -952,10 +931,11 @@ void parseFromPattern(
std::string DateTimeFormatter::format(
const Timestamp& timestamp,
const date::time_zone* timezone) const {
const std::chrono::
time_point<std::chrono::system_clock, std::chrono::milliseconds>
timePoint(std::chrono::milliseconds(timestamp.toMillis()));
validateTimePoint(timePoint);
Timestamp t = timestamp;
if (timezone != nullptr) {
t.toTimezone(*timezone);
}
const auto timePoint = t.toTimePoint();
const auto daysTimePoint = date::floor<date::days>(timePoint);

const auto durationInTheDay = date::make_time(timePoint - daysTimePoint);
Expand Down
36 changes: 29 additions & 7 deletions velox/functions/prestosql/tests/DateTimeFunctionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ TEST_F(DateTimeFunctionsTest, hour) {

EXPECT_EQ(std::nullopt, hour(std::nullopt));
EXPECT_EQ(13, hour(Timestamp(0, 0)));
EXPECT_EQ(12, hour(Timestamp(-1, Timestamp::kMaxNanos)));
EXPECT_EQ(13, hour(Timestamp(-1, Timestamp::kMaxNanos)));
// Disabled for now because the TZ for Pacific/Apia in 2096 varies between
// systems.
// EXPECT_EQ(21, hour(Timestamp(4000000000, 0)));
Expand Down Expand Up @@ -2529,12 +2529,12 @@ TEST_F(DateTimeFunctionsTest, formatDateTime) {

// Multi-specifier and literal formats
EXPECT_EQ(
"AD 19 1970 4 Thu 1970 1 1 1 AM 2 2 2 2 33 11 5 Asia/Kolkata",
"AD 19 1970 4 Thu 1970 1 1 1 AM 8 8 8 8 3 11 5 Asia/Kolkata",
formatDatetime(
fromTimestampString("1970-01-01 02:33:11.5"),
"G C Y e E y D M d a K h H k m s S zzzz"));
EXPECT_EQ(
"AD 19 1970 4 asdfghjklzxcvbnmqwertyuiop Thu ' 1970 1 1 1 AM 2 2 2 2 33 11 5 1234567890\\\"!@#$%^&*()-+`~{}[];:,./ Asia/Kolkata",
"AD 19 1970 4 asdfghjklzxcvbnmqwertyuiop Thu ' 1970 1 1 1 AM 8 8 8 8 3 11 5 1234567890\\\"!@#$%^&*()-+`~{}[];:,./ Asia/Kolkata",
formatDatetime(
fromTimestampString("1970-01-01 02:33:11.5"),
"G C Y e 'asdfghjklzxcvbnmqwertyuiop' E '' y D M d a K h H k m s S 1234567890\\\"!@#$%^&*()-+`~{}[];:,./ zzzz"));
Expand Down Expand Up @@ -2787,21 +2787,43 @@ TEST_F(DateTimeFunctionsTest, dateFormat) {
EXPECT_EQ("z", dateFormat(fromTimestampString("1970-01-01"), "%z"));
EXPECT_EQ("g", dateFormat(fromTimestampString("1970-01-01"), "%g"));

// With timezone
// With timezone. Indian Standard Time (IST) UTC+5:30.
setQueryTimeZone("Asia/Kolkata");

EXPECT_EQ(
"1970-01-01", dateFormat(fromTimestampString("1970-01-01"), "%Y-%m-%d"));
EXPECT_EQ(
"2000-02-29 12:00:00 AM",
"2000-02-29 05:30:00 AM",
dateFormat(
fromTimestampString("2000-02-29 00:00:00.987"), "%Y-%m-%d %r"));
EXPECT_EQ(
"2000-02-29 00:00:00.987000",
"2000-02-29 05:30:00.987000",
dateFormat(
fromTimestampString("2000-02-29 00:00:00.987"),
"%Y-%m-%d %H:%i:%s.%f"));
EXPECT_EQ(
"-2000-02-29 00:00:00.987000",
"-2000-02-29 05:53:29.987000",
dateFormat(
fromTimestampString("-2000-02-29 00:00:00.987"),
"%Y-%m-%d %H:%i:%s.%f"));

// Same timestamps with a different timezone. Pacific Daylight Time (North
// America) PDT UTC-8:00.
setQueryTimeZone("America/Los_Angeles");

EXPECT_EQ(
"1969-12-31", dateFormat(fromTimestampString("1970-01-01"), "%Y-%m-%d"));
EXPECT_EQ(
"2000-02-28 04:00:00 PM",
dateFormat(
fromTimestampString("2000-02-29 00:00:00.987"), "%Y-%m-%d %r"));
EXPECT_EQ(
"2000-02-28 16:00:00.987000",
dateFormat(
fromTimestampString("2000-02-29 00:00:00.987"),
"%Y-%m-%d %H:%i:%s.%f"));
EXPECT_EQ(
"-2000-02-28 16:07:03.987000",
dateFormat(
fromTimestampString("-2000-02-29 00:00:00.987"),
"%Y-%m-%d %H:%i:%s.%f"));
Expand Down
126 changes: 126 additions & 0 deletions velox/functions/sparksql/DateTimeFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,90 @@ struct UnixTimestampParseWithFormatFunction
bool invalidFormat_{false};
};

/// Parse unix time in seconds to a string in given time format.
template <typename T>
struct FromUnixtimeFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);

const date::time_zone* sessionTimeZone_ = nullptr;
std::shared_ptr<DateTimeFormatter> mysqlDateTime_;
bool isConstantTimeFormat = false;

FOLLY_ALWAYS_INLINE void initialize(
const core::QueryConfig& config,
const arg_type<int64_t>* /*unixtime*/,
const arg_type<Varchar>* timeFormat) {
sessionTimeZone_ = getTimeZoneFromConfig(config);
if (timeFormat != nullptr) {
isConstantTimeFormat = true;
mysqlDateTime_ = buildJodaDateTimeFormatter(
std::string_view(timeFormat->data(), timeFormat->size()));
}
}

FOLLY_ALWAYS_INLINE void call(
out_type<Varchar>& result,
const arg_type<int64_t> second,
const arg_type<Varchar> timeFormat) {
if (!isConstantTimeFormat) {
mysqlDateTime_ = buildJodaDateTimeFormatter(
std::string_view(timeFormat.data(), timeFormat.size()));
}
Timestamp timestamp = Timestamp::fromMillis(1000 * second);
auto formattedResult = mysqlDateTime_->format(timestamp, sessionTimeZone_);
auto resultSize = formattedResult.size();
result.resize(resultSize);
if (resultSize != 0) {
std::memcpy(result.data(), formattedResult.data(), resultSize);
}
}
};

template <typename T>
struct GetTimestampFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);

std::shared_ptr<DateTimeFormatter> formatter_;
bool isConstantTimeFormat_ = false;
std::optional<int64_t> sessionTzID_;

int16_t getTimezoneId(const DateTimeResult& result) {
// If timezone was not parsed, fallback to the session timezone. If there's
// no session timezone, fallback to 0 (GMT).
return result.timezoneId != -1 ? result.timezoneId
: sessionTzID_.value_or(0);
}

FOLLY_ALWAYS_INLINE void initialize(
const core::QueryConfig& config,
const arg_type<Varchar>* /*input*/,
const arg_type<Varchar>* format) {
auto sessionTzName = config.sessionTimezone();
if (!sessionTzName.empty()) {
sessionTzID_ = util::getTimeZoneID(sessionTzName);
}
if (format != nullptr) {
this->formatter_ = buildJodaDateTimeFormatter(
std::string_view(format->data(), format->size()));
isConstantTimeFormat_ = true;
}
}

FOLLY_ALWAYS_INLINE void call(
out_type<Timestamp>& result,
const arg_type<Varchar>& input,
const arg_type<Varchar>& format) {
if (!isConstantTimeFormat_) {
formatter_ = buildJodaDateTimeFormatter(
std::string_view(format.data(), format.size()));
}
auto dateTimeResult =
this->formatter_->parse(std::string_view(input.data(), input.size()));
dateTimeResult.timestamp.toGMT(getTimezoneId(dateTimeResult));
result = dateTimeResult.timestamp;
}
};

template <typename T>
struct MakeDateFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);
Expand Down Expand Up @@ -253,4 +337,46 @@ struct DateDiffFunction {
}
};

template <typename T>
struct DateFormatFunction {
VELOX_DEFINE_FUNCTION_TYPES(T);

const date::time_zone* sessionTimeZone_ = nullptr;
std::shared_ptr<DateTimeFormatter> formatter_;
bool isConstFormat_ = false;

FOLLY_ALWAYS_INLINE void setFormatter(const arg_type<Varchar>* formatString) {
if (formatString != nullptr) {
formatter_ = buildJodaDateTimeFormatter(
std::string_view(formatString->data(), formatString->size()));
isConstFormat_ = true;
}
}

FOLLY_ALWAYS_INLINE void initialize(
const core::QueryConfig& config,
const arg_type<Timestamp>* /*timestamp*/,
const arg_type<Varchar>* formatString) {
sessionTimeZone_ = getTimeZoneFromConfig(config);
setFormatter(formatString);
}

FOLLY_ALWAYS_INLINE void call(
out_type<Varchar>& result,
const arg_type<Timestamp>& timestamp,
const arg_type<Varchar>& formatString) {
if (!isConstFormat_) {
formatter_ = buildJodaDateTimeFormatter(
std::string_view(formatString.data(), formatString.size()));
}

auto formattedResult = formatter_->format(timestamp, sessionTimeZone_);
auto resultSize = formattedResult.size();
result.resize(resultSize);
if (resultSize != 0) {
std::memcpy(result.data(), formattedResult.data(), resultSize);
}
}
};

} // namespace facebook::velox::functions::sparksql
Loading
Loading