Skip to content

Commit

Permalink
PARQUET-2171. Vector IO: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
steveloughran committed Jan 15, 2024
1 parent cc2c9df commit a064890
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 25 deletions.
5 changes: 0 additions & 5 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.hadoop.extensions</groupId>
<artifactId>hadoop-api-shim</artifactId>
<scope>provided</scope>
</dependency>-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.parquet.util.DynMethods;

/**
* Package-private binding utils.
* Binding utils.
*/
public final class BindingUtils {
private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class);
Expand Down Expand Up @@ -127,9 +127,7 @@ public static IOException unwrapInnerException(final Throwable e) {
} else if (cause instanceof UncheckedIOException) {
// this is always an IOException
return ((UncheckedIOException) cause).getCause();
} else if (cause instanceof CompletionException) {
return unwrapInnerException(cause);
} else if (cause instanceof ExecutionException) {
} else if (cause instanceof CompletionException || cause instanceof ExecutionException) {
return unwrapInnerException(cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,8 @@ public static void readVectoredRanges(
ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference();
parquetFileRange.setDataReadFuture(fileRange.getData());
});
//throw new RuntimeException("readVectoredRanges");

}


/**
* Read data in a list of wrapped file ranges, extracting the inner
* instances and then executing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Verify that data can be written and read back again.
* The test suite is parameterized on vector IO being disabled/enabled.
* This verifies that the vector IO code path is correct, and that
* the default path continues to work.
*/
@RunWith(Parameterized.class)
public class TestParquetFileWriter {

Expand Down Expand Up @@ -152,16 +158,24 @@ public static List<Boolean> params() {
/**
* Read type: true for vectored IO.
*/
private final boolean readType;
private final boolean vectoredRead;

public TestParquetFileWriter(boolean readType) {
this.readType = readType;
/**
* Instantiate.
* @param vectoredRead use vector IO for reading.
*/
public TestParquetFileWriter(boolean vectoredRead) {
this.vectoredRead = vectoredRead;
}

/**
* Get the configuration for the tests.
* @return a configuration which may have vector IO set.
*/
private Configuration getTestConfiguration() {
Configuration conf = new Configuration();
// set the vector IO option
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType);
conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, vectoredRead);
return conf;
}

Expand Down Expand Up @@ -297,7 +311,7 @@ public void testWriteReadWithRecordReader() throws Exception {
testFile.delete();

Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();
Configuration configuration = getTestConfiguration();

ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
w.start();
Expand Down Expand Up @@ -395,7 +409,7 @@ public void testBloomFilterWriteRead() throws Exception {
File testFile = temp.newFile();
testFile.delete();
Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();
Configuration configuration = getTestConfiguration();
configuration.set("parquet.bloom.filter.column.names", "foo");
String[] colPath = {"foo"};
ColumnDescriptor col = schema.getColumnDescription(colPath);
Expand Down Expand Up @@ -436,7 +450,7 @@ public void testWriteReadDataPageV2() throws Exception {
testFile.delete();

Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();
Configuration configuration = getTestConfiguration();

ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
w.start();
Expand Down Expand Up @@ -592,9 +606,11 @@ public void testAlignmentWithPadding() throws Exception {
FileSystem fs = path.getFileSystem(conf);
long fileLen = fs.getFileStatus(path).getLen();

FSDataInputStream data = fs.open(path);
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
long footerLen = BytesUtils.readIntLittleEndian(data);
long footerLen;
try (FSDataInputStream data = fs.open(path)) {
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
footerLen = BytesUtils.readIntLittleEndian(data);
}
long startFooter = fileLen - footerLen - 8;

assertEquals("Footer should start after second row group without padding", secondRowGroupEnds, startFooter);
Expand Down Expand Up @@ -677,6 +693,8 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception {
Configuration conf = getTestConfiguration();
// Disable writing out checksums as hardcoded byte offsets in assertions below expect it
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
// close any filesystems to ensure that the the FS used by the writer picks up the configuration
FileSystem.closeAll();

// uses the test constructor
ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50);
Expand Down Expand Up @@ -716,9 +734,11 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception {
FileSystem fs = path.getFileSystem(conf);
long fileLen = fs.getFileStatus(path).getLen();

FSDataInputStream data = fs.open(path);
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
long footerLen = BytesUtils.readIntLittleEndian(data);
long footerLen;
try (FSDataInputStream data = fs.open(path)) {
data.seek(fileLen - 8); // 4-byte offset + "PAR1"
footerLen = BytesUtils.readIntLittleEndian(data);
}
long startFooter = fileLen - footerLen - 8;

assertEquals("Footer should start after second row group without padding", secondRowGroupEnds, startFooter);
Expand Down Expand Up @@ -975,6 +995,8 @@ public void testWriteReadStatisticsAllNulls() throws Exception {
configuration.setBoolean("parquet.strings.signed-min-max.enabled", true);
GroupWriteSupport.setSchema(schema, configuration);

// close any filesystems to ensure that the the FS used by the writer picks up the configuration
FileSystem.closeAll();
ParquetWriter<Group> writer = new ParquetWriter<Group>(path, configuration, new GroupWriteSupport());

Group r1 = new SimpleGroup(schema);
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@
<exclude>org.apache.parquet.conf.PlainParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
<exclude>org.apache.parquet.conf.ParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
<exclude>org.apache.parquet.hadoop.util.SerializationUtil#readObjectFromConfAsBase64(java.lang.String,org.apache.parquet.conf.ParquetConfiguration)</exclude>
<exclude>org.apache.parquet.hadoop.util.vectorio.BindingUtils#awaitFuture(java.util.concurrent.Future,long,java.util.concurrent.TimeUnit)</exclude>
<exclude>org.apache.parquet.conf.HadoopParquetConfiguration#getClass(java.lang.String,java.lang.Class,java.lang.Class)</exclude>
<exclude>org.apache.parquet.avro.AvroParquetReader#builder(org.apache.parquet.io.InputFile,org.apache.parquet.conf.ParquetConfiguration)</exclude>
<exclude>org.apache.parquet.hadoop.thrift.TBaseWriteSupport#setThriftClass(org.apache.parquet.conf.ParquetConfiguration,java.lang.Class)</exclude>
Expand Down

0 comments on commit a064890

Please sign in to comment.