diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java index 5f33844d60c8..22116283d121 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java @@ -289,7 +289,11 @@ private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object struc HiveDecimal dec = (HiveDecimal) fieldOI.getPrimitiveJavaObject(structFieldData); LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) schema.getLogicalType(); BigDecimal bd = new BigDecimal(dec.toString()).setScale(decimal.getScale()); - return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + if (schema.getType() == Schema.Type.BYTES) { + return HoodieAvroUtils.DECIMAL_CONVERSION.toBytes(bd, schema, decimal); + } else { + return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + } case CHAR: HiveChar ch = (HiveChar) fieldOI.getPrimitiveJavaObject(structFieldData); return new Utf8(ch.getStrippedValue()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index ecab6b2e11c1..6d98f0c8f52e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -167,6 +167,9 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean case STRING: return new Text(value.toString()); case BYTES: + if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { + return toHiveDecimalWritable(((ByteBuffer) value).array(), schema); + } return new BytesWritable(((ByteBuffer) value).array()); case INT: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { @@ -248,11 +251,7 @@ public static Writable avroToArrayWritable(Object value, Schema schema, boolean } case FIXED: if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("decimal")) { - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); - HiveDecimalWritable writable = new HiveDecimalWritable(((GenericFixed) value).bytes(), - decimal.getScale()); - return HiveDecimalUtils.enforcePrecisionScale(writable, - new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + return toHiveDecimalWritable(((GenericFixed) value).bytes(), schema); } return new BytesWritable(((GenericFixed) value).bytes()); default: @@ -319,4 +318,11 @@ private static Schema appendNullSchemaFields(Schema schema, List newFiel } return appendFieldsToSchema(schema, newFields); } + + private static HiveDecimalWritable toHiveDecimalWritable(byte[] bytes, Schema schema) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) LogicalTypes.fromSchema(schema); + HiveDecimalWritable writable = new HiveDecimalWritable(bytes, decimal.getScale()); + return HiveDecimalUtils.enforcePrecisionScale(writable, + new DecimalTypeInfo(decimal.getPrecision(), decimal.getScale())); + } }