Skip to content

Commit

Permalink
PARQUET-2171. Support vectored IO in Hadoop stream reads
Browse files Browse the repository at this point in the history
HADOOP-18103 introduced high performance vectored read API in
hadoop filesystems, including the local filesystem and S3A.

This patch supports the vectored IO API in ParquetFileReader,
switching to it when the hadoop version supports it and the parquet
library has it explicitly enabled.

To use vectored IO, set
parquet.hadoop.vectored.io.enabled=true

The API is invoked through reflection so that on Hadoop
releases with the feature (3.3.5+) the Vectored IO operations
are handed off to the Hadoop input stream.

On older versions the feature is not available -the
parquet library will compile and run as normal.

Some existing tests have been parameterized to verify that
behavior is consistent with/without vector IO enabled;
On hadoop2 profile test runs, this implicitly verifies that
compatibility is maintained.
  • Loading branch information
mukund-thakur authored and steveloughran committed Sep 13, 2023
1 parent f8465a2 commit cb4eee0
Show file tree
Hide file tree
Showing 20 changed files with 1,866 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.io;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* Class to define a file range for a parquet file and to
* hold future data for any ongoing read for that range.
*/
public class ParquetFileRange {

/**
* Start position in file.
*/
private final long offset;

/**
* Length of data to be read from position.
*/
private final int length;

/**
* A future object to hold future for ongoing read.
*/
private CompletableFuture<ByteBuffer> dataReadFuture;

public ParquetFileRange(long offset, int length) {
this.offset = offset;
this.length = length;
}

public long getOffset() {
return offset;
}

public int getLength() {
return length;
}

public CompletableFuture<ByteBuffer> getDataReadFuture() {
return dataReadFuture;
}

public void setDataReadFuture(CompletableFuture<ByteBuffer> dataReadFuture) {
this.dataReadFuture = dataReadFuture;
}

@Override
public String toString() {
return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;

/**
* {@code SeekableInputStream} is an interface with the methods needed by
Expand Down Expand Up @@ -105,4 +107,21 @@ public abstract class SeekableInputStream extends InputStream {
*/
public abstract void readFully(ByteBuffer buf) throws IOException;

/**
* Read a set of file ranges in a vectored manner.
* @throws UnsupportedOperationException if not available in this class/runtime.
*/
public void readVectored(List<ParquetFileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {

throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
}

/**
* Is the {@link #readVectored(List, IntFunction)} method available.
*/
public boolean readVectoredAvailable() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@
import java.security.PrivilegedAction;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.parquet.Exceptions.throwIfInstance;

public class DynMethods {

private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);

/**
* Convenience wrapper class around {@link java.lang.reflect.Method}.
*
Expand Down Expand Up @@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class<?>... argClasses)
Class<?> targetClass = Class.forName(className, true, loader);
impl(targetClass, methodName, argClasses);
} catch (ClassNotFoundException e) {
// not the right implementation
// class not found on supplied classloader.
LOG.debug("failed to load class {}", className, e);
}
return this;
}
Expand Down Expand Up @@ -281,6 +287,7 @@ public Builder impl(Class<?> targetClass, String methodName, Class<?>... argClas
targetClass.getMethod(methodName, argClasses), name);
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
}
return this;
}
Expand Down Expand Up @@ -311,6 +318,8 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, targetClass, e);
}
return this;
}
Expand All @@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, className, e);

}
return this;
}
Expand All @@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class<?>... argCl
Class<?> targetClass = Class.forName(className, true, loader);
hiddenImpl(targetClass, methodName, argClasses);
} catch (ClassNotFoundException e) {
// not the right implementation
// class not found on supplied classloader.
LOG.debug("failed to load class {}", className, e);
}
return this;
}
Expand Down Expand Up @@ -388,6 +401,7 @@ public Builder hiddenImpl(Class<?> targetClass, String methodName, Class<?>... a
this.method = new UnboundMethod(hidden, name);
} catch (SecurityException | NoSuchMethodException e) {
// unusable or not the right implementation
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
}
return this;
}
Expand Down
8 changes: 8 additions & 0 deletions parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,11 @@ If `false`, key material is stored in separate new files, created in the same fo
**Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits.
**Default value:** `128`

---

**Property:** `parquet.hadoop.vectored.io.enabled`
**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature.
If `true` then an attempt will be made to dynamically load the relevant classes;
if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases.
**Default value:** `false`

5 changes: 5 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.hadoop.extensions</groupId>
<artifactId>hadoop-api-shim</artifactId>
<scope>provided</scope>
</dependency>-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.util.HadoopCodecs;

import java.util.Map;
Expand All @@ -37,6 +38,7 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
Expand All @@ -54,6 +56,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
boolean usePageChecksumVerification,
boolean useBloomFilter,
boolean useOffHeapDecryptBuffer,
boolean useHadoopVectoredIO,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -64,7 +67,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax,
FileDecryptionProperties fileDecryptionProperties) {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, codecFactory, allocator,
usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIO, recordFilter, metadataFilter, codecFactory, allocator,
maxAllocationSize, properties, fileDecryptionProperties
);
this.conf = conf;
Expand Down Expand Up @@ -111,6 +114,7 @@ public Builder(Configuration conf, Path filePath) {
usePageChecksumVerification));
useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true));
useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false));
useHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT));
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
Expand All @@ -128,7 +132,7 @@ public ParquetReadOptions build() {
}
return new HadoopReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties, conf, fileDecryptionProperties);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ParquetReadOptions {
private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
private static final boolean HADOOP_VECTORED_IO_ENABLED_DEFAULT = false;
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
Expand All @@ -54,6 +55,7 @@ public class ParquetReadOptions {
private final boolean usePageChecksumVerification;
private final boolean useBloomFilter;
private final boolean useOffHeapDecryptBuffer;
private final boolean useHadoopVectoredIO;
private final FilterCompat.Filter recordFilter;
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
private final CompressionCodecFactory codecFactory;
Expand All @@ -70,6 +72,7 @@ public class ParquetReadOptions {
boolean usePageChecksumVerification,
boolean useBloomFilter,
boolean useOffHeapDecryptBuffer,
boolean useHadoopVectoredIO,
FilterCompat.Filter recordFilter,
ParquetMetadataConverter.MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
Expand All @@ -85,6 +88,7 @@ public class ParquetReadOptions {
this.usePageChecksumVerification = usePageChecksumVerification;
this.useBloomFilter = useBloomFilter;
this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer;
this.useHadoopVectoredIO = useHadoopVectoredIO;
this.recordFilter = recordFilter;
this.metadataFilter = metadataFilter;
this.codecFactory = codecFactory;
Expand Down Expand Up @@ -126,6 +130,10 @@ public boolean usePageChecksumVerification() {
return usePageChecksumVerification;
}

public boolean useHadoopVectoredIO() {
return useHadoopVectoredIO;
}

public FilterCompat.Filter getRecordFilter() {
return recordFilter;
}
Expand Down Expand Up @@ -173,6 +181,7 @@ public static class Builder {
protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT;
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
protected boolean useHadoopVectoredIo = HADOOP_VECTORED_IO_ENABLED_DEFAULT;
protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT;
Expand Down Expand Up @@ -226,6 +235,11 @@ public Builder useRecordFilter() {
return this;
}

public Builder useHadoopVectoredIo(boolean useHadoopVectoredIo) {
this.useHadoopVectoredIo = useHadoopVectoredIo;
return this;
}

public Builder useColumnIndexFilter(boolean useColumnIndexFilter) {
this.useColumnIndexFilter = useColumnIndexFilter;
return this;
Expand Down Expand Up @@ -320,6 +334,7 @@ public Builder copy(ParquetReadOptions options) {
useDictionaryFilter(options.useDictionaryFilter);
useRecordFilter(options.useRecordFilter);
withRecordFilter(options.recordFilter);
useHadoopVectoredIo(options.useHadoopVectoredIO);
withMetadataFilter(options.metadataFilter);
withCodecFactory(options.codecFactory);
withAllocator(options.allocator);
Expand All @@ -338,7 +353,7 @@ public ParquetReadOptions build() {

return new ParquetReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter,
codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties);
}
}
Expand Down
Loading

0 comments on commit cb4eee0

Please sign in to comment.