Skip to content

Commit 6bc0051

Browse files
committed
[FLINK-37840] [table] Row writer should honor null uncompact BigDecimal and Timestamp
1 parent 6642879 commit 6bc0051

File tree

5 files changed

+56
-13
lines changed

5 files changed

+56
-13
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,11 @@ public void setDecimal(int pos, DecimalData value, int precision) {
372372

373373
if (DecimalData.isCompact(precision)) {
374374
// compact format
375-
setLong(pos, value.toUnscaledLong());
375+
if (value == null) {
376+
setNullAt(pos);
377+
} else {
378+
setLong(pos, value.toUnscaledLong());
379+
}
376380
} else {
377381
int fieldOffset = getElementOffset(pos, 8);
378382
int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);
@@ -401,7 +405,11 @@ public void setTimestamp(int pos, TimestampData value, int precision) {
401405
assertIndexIsValid(pos);
402406

403407
if (TimestampData.isCompact(precision)) {
404-
setLong(pos, value.getMillisecond());
408+
if (value == null) {
409+
setNullAt(pos);
410+
} else {
411+
setLong(pos, value.getMillisecond());
412+
}
405413
} else {
406414
int fieldOffset = getElementOffset(pos, 8);
407415
int cursor = (int) (BinarySegmentUtils.getLong(segments, fieldOffset) >>> 32);

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,11 @@ public void setDecimal(int pos, DecimalData value, int precision) {
199199

200200
if (DecimalData.isCompact(precision)) {
201201
// compact format
202-
setLong(pos, value.toUnscaledLong());
202+
if (value == null) {
203+
setNullAt(pos);
204+
} else {
205+
setLong(pos, value.toUnscaledLong());
206+
}
203207
} else {
204208
int fieldOffset = getFieldOffset(pos);
205209
int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);
@@ -229,7 +233,11 @@ public void setTimestamp(int pos, TimestampData value, int precision) {
229233
assertIndexIsValid(pos);
230234

231235
if (TimestampData.isCompact(precision)) {
232-
setLong(pos, value.getMillisecond());
236+
if (value == null) {
237+
setNullAt(pos);
238+
} else {
239+
setLong(pos, value.getMillisecond());
240+
}
233241
} else {
234242
int fieldOffset = getFieldOffset(pos);
235243
int cursor = (int) (segments[0].getLong(fieldOffset) >>> 32);

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,11 @@ public void writeDecimal(int pos, DecimalData value, int precision) {
165165
assert value == null || (value.precision() == precision);
166166

167167
if (DecimalData.isCompact(precision)) {
168-
assert value != null;
169-
writeLong(pos, value.toUnscaledLong());
168+
if (value == null) {
169+
setNullBit(pos);
170+
} else {
171+
writeLong(pos, value.toUnscaledLong());
172+
}
170173
} else {
171174
// grow the global buffer before writing data.
172175
ensureCapacity(16);
@@ -198,7 +201,11 @@ public void writeDecimal(int pos, DecimalData value, int precision) {
198201
@Override
199202
public void writeTimestamp(int pos, TimestampData value, int precision) {
200203
if (TimestampData.isCompact(precision)) {
201-
writeLong(pos, value.getMillisecond());
204+
if (value == null) {
205+
setNullBit(pos);
206+
} else {
207+
writeLong(pos, value.getMillisecond());
208+
}
202209
} else {
203210
// store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond
204211
ensureCapacity(8);

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ void testToArray() {
445445
void testDecimal() {
446446

447447
BinaryArrayData array = new BinaryArrayData();
448-
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
448+
BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
449449

450450
// 1.compact
451451
{
@@ -454,12 +454,16 @@ void testDecimal() {
454454
writer.reset();
455455
writer.writeDecimal(0, DecimalData.fromUnscaledLong(5, precision, scale), precision);
456456
writer.setNullAt(1);
457+
writer.writeDecimal(2, null, precision);
457458
writer.complete();
458459

459460
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.05");
460461
assertThat(array.isNullAt(1)).isTrue();
462+
assertThat(array.isNullAt(2)).isTrue();
461463
array.setDecimal(0, DecimalData.fromUnscaledLong(6, precision, scale), precision);
462464
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("0.06");
465+
array.setDecimal(1, null, precision);
466+
assertThat(array.isNullAt(1)).isTrue();
463467
}
464468

465469
// 2.not compact
@@ -480,6 +484,8 @@ void testDecimal() {
480484
assertThat(array.isNullAt(1)).isTrue();
481485
array.setDecimal(0, decimal2, precision);
482486
assertThat(array.getDecimal(0, precision, scale).toString()).isEqualTo("6.55000");
487+
array.setDecimal(1, null, precision);
488+
assertThat(array.isNullAt(1)).isTrue();
483489
}
484490
}
485491

@@ -533,20 +539,24 @@ void testBinary() {
533539
@Test
534540
void testTimestampData() {
535541
BinaryArrayData array = new BinaryArrayData();
536-
BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8);
542+
BinaryArrayWriter writer = new BinaryArrayWriter(array, 3, 8);
537543

538544
// 1. compact
539545
{
540546
final int precision = 3;
541547
writer.reset();
542548
writer.writeTimestamp(0, TimestampData.fromEpochMillis(123L), precision);
543549
writer.setNullAt(1);
550+
writer.writeTimestamp(2, null, precision);
544551
writer.complete();
545552

546553
assertThat(array.getTimestamp(0, 3).toString()).isEqualTo("1970-01-01T00:00:00.123");
547554
assertThat(array.isNullAt(1)).isTrue();
555+
assertThat(array.isNullAt(2)).isTrue();
548556
array.setTimestamp(0, TimestampData.fromEpochMillis(-123L), precision);
549557
assertThat(array.getTimestamp(0, 3).toString()).isEqualTo("1969-12-31T23:59:59.877");
558+
array.setTimestamp(1, null, precision);
559+
assertThat(array.isNullAt(2)).isTrue();
550560
}
551561

552562
// 2. not compact

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -414,23 +414,27 @@ void testHeader() {
414414

415415
@Test
416416
void testDecimal() {
417-
// 1.compact
417+
// 1. compact
418418
{
419419
int precision = 4;
420420
int scale = 2;
421-
BinaryRowData row = new BinaryRowData(2);
421+
BinaryRowData row = new BinaryRowData(3);
422422
BinaryRowWriter writer = new BinaryRowWriter(row);
423423
writer.writeDecimal(0, DecimalData.fromUnscaledLong(5, precision, scale), precision);
424424
writer.setNullAt(1);
425+
writer.writeDecimal(2, null, precision);
425426
writer.complete();
426427

427428
assertThat(row.getDecimal(0, precision, scale).toString()).isEqualTo("0.05");
428429
assertThat(row.isNullAt(1)).isTrue();
430+
assertThat(row.isNullAt(2)).isTrue();
429431
row.setDecimal(0, DecimalData.fromUnscaledLong(6, precision, scale), precision);
430432
assertThat(row.getDecimal(0, precision, scale).toString()).isEqualTo("0.06");
433+
row.setDecimal(1, null, precision);
434+
assertThat(row.isNullAt(1)).isTrue();
431435
}
432436

433-
// 2.not compact
437+
// 2. not compact
434438
{
435439
int precision = 25;
436440
int scale = 5;
@@ -449,6 +453,8 @@ void testDecimal() {
449453
assertThat(row.isNullAt(1)).isTrue();
450454
row.setDecimal(0, decimal2, precision);
451455
assertThat(row.getDecimal(0, precision, scale).toString()).isEqualTo("6.55000");
456+
row.setDecimal(1, null, precision);
457+
assertThat(row.isNullAt(1)).isTrue();
452458
}
453459
}
454460

@@ -1060,16 +1066,20 @@ void testTimestampData() {
10601066
// 1. compact
10611067
{
10621068
final int precision = 3;
1063-
BinaryRowData row = new BinaryRowData(2);
1069+
BinaryRowData row = new BinaryRowData(3);
10641070
BinaryRowWriter writer = new BinaryRowWriter(row);
10651071
writer.writeTimestamp(0, TimestampData.fromEpochMillis(123L), precision);
10661072
writer.setNullAt(1);
1073+
writer.writeTimestamp(2, null, precision);
10671074
writer.complete();
10681075

10691076
assertThat(row.getTimestamp(0, 3).toString()).isEqualTo("1970-01-01T00:00:00.123");
10701077
assertThat(row.isNullAt(1)).isTrue();
1078+
assertThat(row.isNullAt(2)).isTrue();
10711079
row.setTimestamp(0, TimestampData.fromEpochMillis(-123L), precision);
10721080
assertThat(row.getTimestamp(0, 3).toString()).isEqualTo("1969-12-31T23:59:59.877");
1081+
row.setTimestamp(1, null, precision);
1082+
assertThat(row.isNullAt(2)).isTrue();
10731083
}
10741084

10751085
// 2. not compact

0 commit comments

Comments
 (0)