Skip to content

Commit a6a7e4a

Browse files
mskapilksmarin-ma
authored andcommitted
Add support to read plain encoded INT96 timestamp from Parquet file (#456)
1 parent be32086 commit a6a7e4a

File tree

8 files changed

+93
-14
lines changed

8 files changed

+93
-14
lines changed

velox/dwio/common/DirectDecoder.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,17 @@ class DirectDecoder : public IntDecoder<isSigned> {
9898
} else if constexpr (std::is_same_v<
9999
typename Visitor::DataType,
100100
int128_t>) {
101-
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
101+
if (super::numBytes == 12 /* INT96 */) {
102+
int128_t encoded = super::template readInt<int128_t>();
103+
int32_t days = encoded & ((1ULL << 32) - 1);
104+
uint64_t nanos = static_cast<uint64_t>(encoded >> 32);
105+
106+
auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);
107+
toSkip =
108+
visitor.process(*reinterpret_cast<int128_t*>(&timestamp), atEnd);
109+
} else {
110+
toSkip = visitor.process(super::template readInt<int128_t>(), atEnd);
111+
}
102112
} else {
103113
toSkip = visitor.process(super::template readInt<int64_t>(), atEnd);
104114
}

velox/dwio/common/IntDecoder.h

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ class IntDecoder {
167167
template <typename T>
168168
T readInt();
169169

170+
template <typename T>
171+
T readInt96();
172+
170173
template <typename T>
171174
T readVInt();
172175

@@ -453,12 +456,44 @@ inline T IntDecoder<isSigned>::readInt() {
453456
return readLittleEndianFromBigEndian<T>();
454457
} else {
455458
if constexpr (std::is_same_v<T, int128_t>) {
456-
VELOX_NYI();
459+
if (numBytes == 12) {
460+
// TODO:: Do we need to handle useVInts case?
461+
return readInt96<T>();
462+
} else {
463+
VELOX_NYI();
464+
}
457465
}
458466
return readLongLE();
459467
}
460468
}
461469

470+
template <bool isSigned>
471+
template <typename T>
472+
inline T IntDecoder<isSigned>::readInt96() {
473+
int64_t offset = 0;
474+
unsigned char ch;
475+
476+
// read unsigned byte 64
477+
uint64_t part1 = 0;
478+
for (uint32_t i = 0; i < 8; ++i) {
479+
ch = readByte();
480+
part1 |= (ch & BASE_256_MASK) << offset;
481+
offset += 8;
482+
}
483+
484+
// read signed byte 32
485+
int32_t part2 = 0;
486+
offset = 0;
487+
for (uint32_t i = 0; i < 4; ++i) {
488+
ch = readByte();
489+
part2 |= (ch & BASE_256_MASK) << offset;
490+
offset += 8;
491+
}
492+
493+
int128_t result = part1;
494+
return (result << 32) | part2;
495+
}
496+
462497
template <bool isSigned>
463498
template <typename T>
464499
inline T IntDecoder<isSigned>::readVInt() {

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
414414
// We start from the end to allow in-place expansion.
415415
auto values = dictionary_.values->asMutable<Timestamp>();
416416
auto parquetValues = dictionary_.values->asMutable<char>();
417-
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
418-
static constexpr int64_t kSecondsPerDay = 86400LL;
419-
static constexpr int64_t kNanosPerSecond =
420-
Timestamp::kNanosecondsInMillisecond *
421-
Timestamp::kMillisecondsInSecond;
417+
422418
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
423419
// Convert the timestamp into seconds and nanos since the Unix epoch,
424420
// 00:00:00.000000 on 1 January 1970.
@@ -432,12 +428,8 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
432428
&days,
433429
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
434430
sizeof(int32_t));
435-
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
436-
if (nanos > Timestamp::kMaxNanos) {
437-
seconds += nanos / kNanosPerSecond;
438-
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
439-
}
440-
values[i] = Timestamp(seconds, nanos);
431+
432+
values[i] = Timestamp::fromDaysAndNanos(days, nanos);
441433
}
442434
break;
443435
}
Binary file not shown.
Binary file not shown.

velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,33 @@ TEST_F(ParquetTableScanTest, timestampFilter) {
625625
"testInt128() is not supported");
626626
}
627627

628+
TEST_F(ParquetTableScanTest, timestampINT96) {
629+
auto a = makeFlatVector<Timestamp>({Timestamp(1, 0), Timestamp(2, 0)});
630+
auto expected = makeRowVector({"time"}, {a});
631+
createDuckDbTable("expected", {expected});
632+
633+
auto vector = makeArrayVector<Timestamp>({{}});
634+
loadData(
635+
getExampleFilePath("timestamp_dict_int96.parquet"),
636+
ROW({"time"}, {TIMESTAMP()}),
637+
makeRowVector(
638+
{"time"},
639+
{
640+
vector,
641+
}));
642+
assertSelect({"time"}, "SELECT time from expected");
643+
644+
loadData(
645+
getExampleFilePath("timestamp_plain_int96.parquet"),
646+
ROW({"time"}, {TIMESTAMP()}),
647+
makeRowVector(
648+
{"time"},
649+
{
650+
vector,
651+
}));
652+
assertSelect({"time"}, "SELECT time from expected");
653+
}
654+
628655
int main(int argc, char** argv) {
629656
testing::InitGoogleTest(&argc, argv);
630657
folly::init(&argc, &argv, false);

velox/functions/sparksql/tests/DecimalUtilTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class DecimalUtilTest : public testing::Test {
3030
R expectedResult,
3131
bool expectedOverflow) {
3232
R r;
33-
bool overflow;
33+
bool overflow = false;
3434
DecimalUtil::divideWithRoundUp<R, A, B>(r, a, b, aRescale, overflow);
3535
ASSERT_EQ(overflow, expectedOverflow);
3636
ASSERT_EQ(r, expectedResult);

velox/type/Timestamp.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,21 @@ struct Timestamp {
9696
VELOX_USER_DCHECK_LE(nanos, kMaxNanos, "Timestamp nanos out of range");
9797
}
9898

99+
static Timestamp fromDaysAndNanos(int32_t days, uint64_t nanos) {
100+
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
101+
static constexpr int64_t kSecondsPerDay = 86400LL;
102+
static constexpr int64_t kNanosPerSecond =
103+
Timestamp::kNanosecondsInMillisecond * Timestamp::kMillisecondsInSecond;
104+
105+
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
106+
if (nanos > Timestamp::kMaxNanos) {
107+
seconds += nanos / kNanosPerSecond;
108+
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
109+
}
110+
111+
return Timestamp(seconds, nanos);
112+
}
113+
99114
// Returns the current unix timestamp (ms precision).
100115
static Timestamp now();
101116

0 commit comments

Comments
 (0)