Skip to content

Commit da3db9b

Browse files
committed
PARQUET-2171. Vector IO: review comments
1 parent 5b750f1 commit da3db9b

File tree

5 files changed

+38
-26
lines changed

5 files changed

+38
-26
lines changed

parquet-hadoop/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,6 @@
7979
<version>${hadoop.version}</version>
8080
<scope>provided</scope>
8181
</dependency>
82-
<!-- <dependency>
83-
<groupId>org.apache.hadoop.extensions</groupId>
84-
<artifactId>hadoop-api-shim</artifactId>
85-
<scope>provided</scope>
86-
</dependency>-->
8782
<dependency>
8883
<groupId>org.apache.parquet</groupId>
8984
<artifactId>parquet-jackson</artifactId>

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.parquet.util.DynMethods;
3434

3535
/**
36-
* Package-private binding utils.
36+
* Binding utils.
3737
*/
3838
public final class BindingUtils {
3939
private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class);
@@ -127,9 +127,7 @@ public static IOException unwrapInnerException(final Throwable e) {
127127
} else if (cause instanceof UncheckedIOException) {
128128
// this is always an IOException
129129
return ((UncheckedIOException) cause).getCause();
130-
} else if (cause instanceof CompletionException) {
131-
return unwrapInnerException(cause);
132-
} else if (cause instanceof ExecutionException) {
130+
} else if (cause instanceof CompletionException || cause instanceof ExecutionException) {
133131
return unwrapInnerException(cause);
134132
} else if (cause instanceof RuntimeException) {
135133
throw (RuntimeException) cause;

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,8 @@ public static void readVectoredRanges(
168168
ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference();
169169
parquetFileRange.setDataReadFuture(fileRange.getData());
170170
});
171-
//throw new RuntimeException("readVectoredRanges");
172-
173171
}
174172

175-
176173
/**
177174
* Read data in a list of wrapped file ranges, extracting the inner
178175
* instances and then executing.

parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@
9595
import org.slf4j.Logger;
9696
import org.slf4j.LoggerFactory;
9797

98+
/**
99+
* Verify that data can be written and read back again.
100+
* The test suite is parameterized on vector IO being disabled/enabled.
101+
* This verifies that the vector IO code path is correct, and that
102+
* the default path continues to work.
103+
*/
98104
@RunWith(Parameterized.class)
99105
public class TestParquetFileWriter {
100106

@@ -136,16 +142,24 @@ public static List<Boolean> params() {
136142
/**
137143
* Read type: true for vectored IO.
138144
*/
139-
private final boolean readType;
145+
private final boolean vectoredRead;
140146

141-
public TestParquetFileWriter(boolean readType) {
142-
this.readType = readType;
147+
/**
148+
* Instantiate.
149+
* @param vectoredRead use vector IO for reading.
150+
*/
151+
public TestParquetFileWriter(boolean vectoredRead) {
152+
this.vectoredRead = vectoredRead;
143153
}
144154

155+
/**
156+
* Get the configuration for the tests.
157+
* @return a configuration which may have vector IO set.
158+
*/
145159
private Configuration getTestConfiguration() {
146160
Configuration conf = new Configuration();
147161
// set the vector IO option
148-
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType);
162+
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, vectoredRead);
149163
return conf;
150164
}
151165

@@ -277,7 +291,7 @@ public void testWriteReadWithRecordReader() throws Exception {
277291
testFile.delete();
278292

279293
Path path = new Path(testFile.toURI());
280-
Configuration configuration = new Configuration();
294+
Configuration configuration = getTestConfiguration();
281295

282296
ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
283297
w.start();
@@ -371,7 +385,7 @@ public void testBloomFilterWriteRead() throws Exception {
371385
File testFile = temp.newFile();
372386
testFile.delete();
373387
Path path = new Path(testFile.toURI());
374-
Configuration configuration = new Configuration();
388+
Configuration configuration = getTestConfiguration();
375389
configuration.set("parquet.bloom.filter.column.names", "foo");
376390
String[] colPath = {"foo"};
377391
ColumnDescriptor col = schema.getColumnDescription(colPath);
@@ -406,7 +420,7 @@ public void testWriteReadDataPageV2() throws Exception {
406420
testFile.delete();
407421

408422
Path path = new Path(testFile.toURI());
409-
Configuration configuration = new Configuration();
423+
Configuration configuration = getTestConfiguration();
410424

411425
ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
412426
w.start();
@@ -511,14 +525,15 @@ public void testAlignmentWithPadding() throws Exception {
511525
FileSystem fs = path.getFileSystem(conf);
512526
long fileLen = fs.getFileStatus(path).getLen();
513527

514-
FSDataInputStream data = fs.open(path);
515-
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
516-
long footerLen = BytesUtils.readIntLittleEndian(data);
528+
long footerLen;
529+
try (FSDataInputStream data = fs.open(path)) {
530+
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
531+
footerLen = BytesUtils.readIntLittleEndian(data);
532+
}
517533
long startFooter = fileLen - footerLen - 8;
518534

519535
assertEquals("Footer should start after second row group without padding",
520536
secondRowGroupEnds, startFooter);
521-
522537
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path);
523538
assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
524539
assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
@@ -581,6 +596,8 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception {
581596
Configuration conf = getTestConfiguration();
582597
// Disable writing out checksums as hardcoded byte offsets in assertions below expect it
583598
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
599+
// close any filesystems to ensure that the the FS used by the writer picks up the configuration
600+
FileSystem.closeAll();
584601

585602
// uses the test constructor
586603
ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50);
@@ -620,9 +637,11 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception {
620637
FileSystem fs = path.getFileSystem(conf);
621638
long fileLen = fs.getFileStatus(path).getLen();
622639

623-
FSDataInputStream data = fs.open(path);
624-
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
625-
long footerLen = BytesUtils.readIntLittleEndian(data);
640+
long footerLen;
641+
try (FSDataInputStream data = fs.open(path)) {
642+
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
643+
footerLen = BytesUtils.readIntLittleEndian(data);
644+
}
626645
long startFooter = fileLen - footerLen - 8;
627646

628647
assertEquals("Footer should start after second row group without padding",
@@ -855,6 +874,8 @@ public void testWriteReadStatisticsAllNulls() throws Exception {
855874
configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
856875
GroupWriteSupport.setSchema(schema, configuration);
857876

877+
// close any filesystems to ensure that the the FS used by the writer picks up the configuration
878+
FileSystem.closeAll();
858879
ParquetWriter<Group> writer = new ParquetWriter<Group>(path, configuration, new GroupWriteSupport());
859880

860881
Group r1 = new SimpleGroup(schema);

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,7 @@
549549
<exclude>org.apache.parquet.conf.PlainParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
550550
<exclude>org.apache.parquet.conf.ParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
551551
<exclude>org.apache.parquet.hadoop.util.SerializationUtil#readObjectFromConfAsBase64(java.lang.String,org.apache.parquet.conf.ParquetConfiguration)</exclude>
552+
<exclude>org.apache.parquet.hadoop.util.vectorio.BindingUtils#awaitFuture(java.util.concurrent.Future,long,java.util.concurrent.TimeUnit)</exclude>
552553
<exclude>org.apache.parquet.conf.HadoopParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
553554
<exclude>org.apache.parquet.avro.AvroParquetReader#builder(org.apache.parquet.io.InputFile,org.apache.parquet.conf.ParquetConfiguration)</exclude>
554555
<exclude>org.apache.parquet.hadoop.thrift.TBaseWriteSupport#setThriftClass(org.apache.parquet.conf.ParquetConfiguration,java.lang.Class)</exclude>

0 commit comments

Comments
 (0)