From 312a15f53a011d1dc4863df196c0169bdf6db629 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Wed, 21 Aug 2024 03:39:59 -0400 Subject: [PATCH] GH-2992: Gate LocalTimestamp references in AvroSchemaConverter (#2993) * PARQUET-2992: Gate LocalTimestamp references in AvroSchemaConverter * GH-2992: Test logical type conversion for different Avro versions --- .../parquet/avro/AvroSchemaConverter.java | 26 ++++++-- .../parquet/avro/TestAvroSchemaConverter.java | 61 ++++++++++++++++++- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index ffaa07683c..033a80d8fd 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -22,6 +22,7 @@ import static java.util.Optional.of; import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT; +import static org.apache.parquet.avro.AvroRecordConverter.getRuntimeAvroVersion; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; @@ -488,15 +489,20 @@ private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) { return timeType(true, MICROS); } else if (logicalType instanceof LogicalTypes.TimestampMillis) { return timestampType(true, MILLIS); - } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { - return timestampType(false, MILLIS); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { return timestampType(true, MICROS); - } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { - return timestampType(false, MICROS); } else if (logicalType.getName().equals(LogicalTypes.uuid().getName()) && writeParquetUUID) { return uuidType(); } + + if (avroVersionSupportsLocalTimestampTypes()) { + if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { + return timestampType(false, MILLIS); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { + return timestampType(false, MICROS); + } + } + return null; } @@ -538,7 +544,7 @@ public Optional visit( LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit(); boolean isAdjustedToUTC = timestampLogicalType.isAdjustedToUTC(); - if (isAdjustedToUTC) { + if (isAdjustedToUTC || !avroVersionSupportsLocalTimestampTypes()) { switch (unit) { case MILLIS: return of(LogicalTypes.timestampMillis()); @@ -605,4 +611,14 @@ private static String namespace(String name, Map names) { Integer nameCount = names.merge(name, 1, (oldValue, value) -> oldValue + 1); return nameCount > 1 ? name + nameCount : null; } + + /* Avro <= 1.9 does not support conversions to LocalTimestamp{Micros, Millis} classes */ + private static boolean avroVersionSupportsLocalTimestampTypes() { + final String avroVersion = getRuntimeAvroVersion(); + + return avroVersion == null + || !(avroVersion.startsWith("1.7.") + || avroVersion.startsWith("1.8.") + || avroVersion.startsWith("1.9.")); + } } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 6965c92ff4..077e9cecd5 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -44,7 +44,9 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.io.Resources; import java.util.Arrays; @@ -53,19 +55,34 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; - +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(AvroRecordConverter.class) public class TestAvroSchemaConverter { private static final Configuration NEW_BEHAVIOR = new Configuration(false); + @Before + public void setupMockito() { + PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS); + } + @BeforeClass public static void setupConf() { NEW_BEHAVIOR.setBoolean("parquet.avro.add-list-element-records", false); @@ -665,6 +682,27 @@ public void testTimestampMillisType() throws Exception { testRoundTripConversion( expected, "message myrecord {\n" + " required int64 timestamp (TIMESTAMP(MILLIS,true));\n" + "}\n"); + // Test that conversions for timestamp types only use APIs that are available in the user's Avro version + for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion); + final Schema converted = new AvroSchemaConverter() + .convert(Types.buildMessage() + .addField(Types.primitive(INT64, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.timestampType( + false, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .length(1) + .named("timestamp_type")) + .named("TestAvro")); + + assertEquals( + avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-millis" : "local-timestamp-millis", + converted + .getField("timestamp_type") + .schema() + .getLogicalType() + .getName()); + } + for (PrimitiveTypeName primitive : new PrimitiveTypeName[] {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) { final PrimitiveType type; @@ -729,6 +767,27 @@ public void testTimestampMicrosType() throws Exception { IllegalArgumentException.class, () -> new AvroSchemaConverter().convert(message(type))); } + + // Test that conversions for timestamp types only use APIs that are available in the user's Avro version + for (String avroVersion : ImmutableSet.of("1.7.0", "1.8.0", "1.9.0", "1.10.0", "1.11.0")) { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn(avroVersion); + final Schema converted = new AvroSchemaConverter() + .convert(Types.buildMessage() + .addField(Types.primitive(INT64, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.timestampType( + false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .length(1) + .named("timestamp_type")) + .named("TestAvro")); + + assertEquals( + avroVersion.matches("1\\.[789]\\.\\d+") ? "timestamp-micros" : "local-timestamp-micros", + converted + .getField("timestamp_type") + .schema() + .getLogicalType() + .getName()); + } } @Test