diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 58aa594d19..64f0f514b7 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -79,11 +79,6 @@ ${hadoop.version} provided - org.apache.parquet parquet-jackson diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java index 176998bbc6..4eecac01a3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java @@ -33,7 +33,7 @@ import org.apache.parquet.util.DynMethods; /** - * Package-private binding utils. + * Binding utils. */ public final class BindingUtils { private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); @@ -127,9 +127,7 @@ public static IOException unwrapInnerException(final Throwable e) { } else if (cause instanceof UncheckedIOException) { // this is always an IOException return ((UncheckedIOException) cause).getCause(); - } else if (cause instanceof CompletionException) { - return unwrapInnerException(cause); - } else if (cause instanceof ExecutionException) { + } else if (cause instanceof CompletionException || cause instanceof ExecutionException) { return unwrapInnerException(cause); } else if (cause instanceof RuntimeException) { throw (RuntimeException) cause; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java index ba94c2e976..86f6063330 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java @@ -168,11 +168,8 @@ public static void readVectoredRanges( ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference(); parquetFileRange.setDataReadFuture(fileRange.getData()); }); - //throw new RuntimeException("readVectoredRanges"); - } - /** * Read data in a list of wrapped file ranges, extracting the inner * instances and then executing. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 74965cbbc6..a21e1afd70 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -110,6 +110,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Verify that data can be written and read back again. + * The test suite is parameterized on vector IO being disabled/enabled. + * This verifies that the vector IO code path is correct, and that + * the default path continues to work. + */ @RunWith(Parameterized.class) public class TestParquetFileWriter { @@ -152,16 +158,24 @@ public static List params() { /** * Read type: true for vectored IO. */ - private final boolean readType; + private final boolean vectoredRead; - public TestParquetFileWriter(boolean readType) { - this.readType = readType; + /** + * Instantiate. + * @param vectoredRead use vector IO for reading. + */ + public TestParquetFileWriter(boolean vectoredRead) { + this.vectoredRead = vectoredRead; } + /** + * Get the configuration for the tests. + * @return a configuration which may have vector IO set. + */ private Configuration getTestConfiguration() { Configuration conf = new Configuration(); // set the vector IO option - conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, vectoredRead); return conf; } @@ -297,7 +311,7 @@ public void testWriteReadWithRecordReader() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); @@ -395,7 +409,7 @@ public void testBloomFilterWriteRead() throws Exception { File testFile = temp.newFile(); testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); configuration.set("parquet.bloom.filter.column.names", "foo"); String[] colPath = {"foo"}; ColumnDescriptor col = schema.getColumnDescription(colPath); @@ -436,7 +450,7 @@ public void testWriteReadDataPageV2() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); @@ -592,9 +606,11 @@ public void testAlignmentWithPadding() throws Exception { FileSystem fs = path.getFileSystem(conf); long fileLen = fs.getFileStatus(path).getLen(); - FSDataInputStream data = fs.open(path); - data.seek(fileLen - 8); // 4-byte offset + "PAR1" - long footerLen = BytesUtils.readIntLittleEndian(data); + long footerLen; + try (FSDataInputStream data = fs.open(path)) { + data.seek(fileLen - 8); // 4-byte offset + "PAR1" + footerLen = BytesUtils.readIntLittleEndian(data); + } long startFooter = fileLen - footerLen - 8; assertEquals("Footer should start after second row group without padding", secondRowGroupEnds, startFooter); @@ -677,6 +693,8 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { Configuration conf = getTestConfiguration(); // Disable writing out checksums as hardcoded byte offsets in assertions below expect it conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + // close any filesystems to ensure that the the FS used by the writer picks up the configuration + FileSystem.closeAll(); // uses the test constructor ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50); @@ -716,9 +734,11 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { FileSystem fs = path.getFileSystem(conf); long fileLen = fs.getFileStatus(path).getLen(); - FSDataInputStream data = fs.open(path); - data.seek(fileLen - 8); // 4-byte offset + "PAR1" - long footerLen = BytesUtils.readIntLittleEndian(data); + long footerLen; + try (FSDataInputStream data = fs.open(path)) { + data.seek(fileLen - 8); // 4-byte offset + "PAR1" + footerLen = BytesUtils.readIntLittleEndian(data); + } long startFooter = fileLen - footerLen - 8; assertEquals("Footer should start after second row group without padding", secondRowGroupEnds, startFooter); @@ -975,6 +995,8 @@ public void testWriteReadStatisticsAllNulls() throws Exception { configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); GroupWriteSupport.setSchema(schema, configuration); + // close any filesystems to ensure that the the FS used by the writer picks up the configuration + FileSystem.closeAll(); ParquetWriter writer = new ParquetWriter(path, configuration, new GroupWriteSupport()); Group r1 = new SimpleGroup(schema); diff --git a/pom.xml b/pom.xml index 0b945076f7..9bfad4120d 100644 --- a/pom.xml +++ b/pom.xml @@ -589,6 +589,7 @@ org.apache.parquet.conf.PlainParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class) org.apache.parquet.conf.ParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class) org.apache.parquet.hadoop.util.SerializationUtil#readObjectFromConfAsBase64(java.lang.String,org.apache.parquet.conf.ParquetConfiguration) + org.apache.parquet.hadoop.util.vectorio.BindingUtils#awaitFuture(java.util.concurrent.Future,long,java.util.concurrent.TimeUnit) org.apache.parquet.conf.HadoopParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class) org.apache.parquet.avro.AvroParquetReader#builder(org.apache.parquet.io.InputFile,org.apache.parquet.conf.ParquetConfiguration) org.apache.parquet.hadoop.thrift.TBaseWriteSupport#setThriftClass(org.apache.parquet.conf.ParquetConfiguration,java.lang.Class)