Skip to content

Commit

Permalink
[HUDI-7443] Fix decimal conversion with legacy bytes type (#10756)
Browse files Browse the repository at this point in the history
  • Loading branch information
stream2000 authored Feb 27, 2024
1 parent d3ccd09 commit 20e2348
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,11 @@ private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object struc
HiveDecimal dec = (HiveDecimal) fieldOI.getPrimitiveJavaObject(structFieldData);
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) schema.getLogicalType();
BigDecimal bd = new BigDecimal(dec.toString()).setScale(decimal.getScale());
return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal);
if (schema.getType() == Schema.Type.BYTES) {
return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, decimal);
} else {
return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal);
}
case CHAR:
HiveChar ch = (HiveChar) fieldOI.getPrimitiveJavaObject(structFieldData);
return new Utf8(ch.getStrippedValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean
case STRING:
return new Text(value.toString());
case BYTES:
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
return toHiveDecimalWritable(((ByteBuffer) value).array(), schema);
}
return new BytesWritable(((ByteBuffer) value).array());
case INT:
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) {
Expand Down Expand Up @@ -248,11 +251,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean
}
case FIXED:
if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(),
decimal.getScale());
return HiveDecimalUtils.enforcePrecisionScale(writable,
new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
return toHiveDecimalWritable(((GenericFixed) value).bytes(), schema);
}
return new BytesWritable(((GenericFixed) value).bytes());
default:
Expand Down Expand Up @@ -319,4 +318,11 @@ private static Schema appendNullSchemaFields(Schema schema, List<String> newFiel
}
return appendFieldsToSchema(schema, newFields);
}

private static HiveDecimalWritable toHiveDecimalWritable(byte[] bytes, Schema schema) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema);
HiveDecimalWritable writable = new HiveDecimalWritable(bytes, decimal.getScale());
return HiveDecimalUtils.enforcePrecisionScale(writable,
new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale()));
}
}

0 comments on commit 20e2348

Please sign in to comment.