Skip to content

Commit

Permalink
PARQUET-2171. Vector IO through self-contained shim classes.
Browse files Browse the repository at this point in the history
Takes the vector IO shim code from the hadoop-shim library and
reimplements in through the DynMethods class.

* Adds low-level tests for the new feature; tests which are
  skipped on hadoop 2 builds
* Parameterizes some of the parquet IO tests to run with/without
  vectorization enabled. The vectorization=enabled run is not
  skipped on hadoop 2, to verify that downgrading to any release
  without vector support (hadoop < 3.3.5) is all good.
* Has had all of the parquet-hadoop tests run with vectorization
  enabled.

I'm trying to get the benchmark tests to work; I don't see the
evidence that they are getting the switch, based on timings and
log. More research needed there.
  • Loading branch information
steveloughran committed Aug 22, 2023
1 parent 9be37b0 commit d38a227
Show file tree
Hide file tree
Showing 19 changed files with 1,636 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,19 @@ public abstract class SeekableInputStream extends InputStream {

/**
* Read a set of file ranges in a vectored manner.
* @throws UnsupportedOperationException if not available in this class/runtime.
*/
public void readVectored(List<ParquetFileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
throw new IOException(String.format("Vectored io is not supported for %s .",
this.getClass().getCanonicalName()));
IntFunction<ByteBuffer> allocate) throws IOException {

throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
}

/**
* Is the {@link #readVectored(List, IntFunction)} method available.
*/
public boolean readVectoredAvailable() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@
import java.security.PrivilegedAction;
import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.parquet.Exceptions.throwIfInstance;

public class DynMethods {

private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);

/**
* Convenience wrapper class around {@link java.lang.reflect.Method}.
*
Expand Down Expand Up @@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class<?>... argClasses)
Class<?> targetClass = Class.forName(className, true, loader);
impl(targetClass, methodName, argClasses);
} catch (ClassNotFoundException e) {
// not the right implementation
// class not found on supplied classloader.
LOG.debug("failed to load class {}", className, e);
}
return this;
}
Expand Down Expand Up @@ -281,6 +287,7 @@ public Builder impl(Class<?> targetClass, String methodName, Class<?>... argClas
targetClass.getMethod(methodName, argClasses), name);
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
}
return this;
}
Expand Down Expand Up @@ -311,6 +318,8 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, targetClass, e);
}
return this;
}
Expand All @@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, className, e);

}
return this;
}
Expand All @@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class<?>... argCl
Class<?> targetClass = Class.forName(className, true, loader);
hiddenImpl(targetClass, methodName, argClasses);
} catch (ClassNotFoundException e) {
// not the right implementation
// class not found on supplied classloader.
LOG.debug("failed to load class {}", className, e);
}
return this;
}
Expand Down Expand Up @@ -388,6 +401,7 @@ public Builder hiddenImpl(Class<?> targetClass, String methodName, Class<?>... a
this.method = new UnboundMethod(hidden, name);
} catch (SecurityException | NoSuchMethodException e) {
// unusable or not the right implementation
LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
}
return this;
}
Expand Down
8 changes: 8 additions & 0 deletions parquet-hadoop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,3 +486,11 @@ If `false`, key material is stored in separate new files, created in the same fo
**Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits.
**Default value:** `128`

---

**Property:** `parquet.hadoop.vectored.io.enabled`
**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature.
If `true` then an attempt will be made to dynamically load the relevant classes;
if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases.
**Default value:** `false`

4 changes: 2 additions & 2 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<!-- <dependency>
<groupId>org.apache.hadoop.extensions</groupId>
<artifactId>hadoop-api-shim</artifactId>
<scope>provided</scope>
</dependency>
</dependency>-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-jackson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -57,14 +56,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shim.functional.FutureIO;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
Expand Down Expand Up @@ -108,6 +105,7 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.hadoop.util.vectorio.BindingUtils;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
Expand Down Expand Up @@ -995,9 +993,6 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE
}
// actually read all the chunks
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
for (ConsecutivePartList consecutiveChunks : allParts) {
consecutiveChunks.readAll(f, builder);
}
readAllPartsVectoredOrNormal(allParts, builder);
for (Chunk chunk : builder.build()) {
readChunkPages(chunk, block, rowGroup);
Expand Down Expand Up @@ -1075,14 +1070,16 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r
}

/**
* Read data in all parts via either vectored IO or serial IO.
* Read data in all parts via either vectored IO or serial IO.
* @param allParts all parts to be read.
* @param builder used to build chunk list to read the pages for the different columns.
* @throws IOException any IOE.
*/
private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
boolean isVectoredIO = options.useHadoopVectoredIO() && partsLengthValidForVectoredIO(allParts);
boolean isVectoredIO = options.useHadoopVectoredIO()
&& f.readVectoredAvailable()
&& partsLengthValidForVectoredIO(allParts);
if (isVectoredIO) {
readVectored(allParts, builder);
} else {
Expand Down Expand Up @@ -1114,9 +1111,10 @@ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts
* @param builder used to build chunk list to read the pages for the different columns.
* @throws IOException any IOE.
*/
private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
List<ParquetFileRange> ranges = new ArrayList<>();
private void readVectored(List<ConsecutivePartList> allParts,
ChunkListBuilder builder) throws IOException {

List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
for (ConsecutivePartList consecutiveChunks : allParts) {
Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
"Invalid length %s for vectored read operation. It must be less than max integer value.",
Expand All @@ -1125,6 +1123,7 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
}
LOG.debug("Doing vectored IO for ranges {}", ranges);
ByteBufferAllocator allocator = options.getAllocator();
//blocking or asynchronous vectored read.
f.readVectored(ranges, allocator::allocate);
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
Expand Down Expand Up @@ -1949,8 +1948,10 @@ public void readFromVectoredRange(ParquetFileRange currRange,
ByteBuffer buffer;
try {
LOG.debug("Waiting for vectored read to finish for range {} ", currRange);
buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(),
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(),
HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(currRange.getLength());
} catch (TimeoutException e) {
String error = String.format("Timeout while fetching result for %s", currRange);
LOG.error(error, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,6 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
*/
public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";

/**
* Key to enable/disable vectored io while reading parquet files.
*/
public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled";

/**
* Default value of parquet.hadoop.vectored.io.enabled is false.
*/
public static final boolean HADOOP_VECTORED_IO_DEFAULT = false;

/**
* key to configure whether row group dictionary filtering is enabled
*/
Expand Down Expand Up @@ -173,6 +163,17 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {

private static final int MIN_FOOTER_CACHE_SIZE = 100;

/**
* Key to enable/disable vectored io while reading parquet files:
* {@value}.
*/
public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled";

/**
* Default value of parquet.hadoop.vectored.io.enabled is {@value}.
*/
public static final boolean HADOOP_VECTORED_IO_DEFAULT = false;

public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) {
ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.parquet.hadoop.util;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.ParquetFileRange;

Expand All @@ -28,11 +29,14 @@
import java.util.List;
import java.util.function.IntFunction;

import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate;

/**
* SeekableInputStream implementation that implements read(ByteBuffer) for
* Hadoop 1 FSDataInputStream.
* It implements {@link #readVectored(List, IntFunction)}) by
* handing off to VectorIOBridge which uses reflection to offer the API if it is
* found.
* The return value of {@link #readVectoredAvailable()} reflects the availability of the
* API.
*/
class H1SeekableInputStream extends DelegatingSeekableInputStream {

Expand Down Expand Up @@ -63,8 +67,13 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {
stream.readFully(bytes, start, len);
}

@Override
public boolean readVectoredAvailable() {
return VectorIOBridge.bridgeAvailable();
}

@Override
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
readVectoredAndPopulate(ranges, allocate, stream);
VectorIOBridge.readVectoredRanges(stream, ranges, allocate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.parquet.hadoop.util;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.ParquetFileRange;

Expand All @@ -29,11 +30,14 @@
import java.util.List;
import java.util.function.IntFunction;

import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate;

/**
* SeekableInputStream implementation for FSDataInputStream that implements
* ByteBufferReadable in Hadoop 2.
* It implements {@link #readVectored(List, IntFunction)}) by
* handing off to VectorIOBridge which uses reflection to offer the API if it is
* found.
* The return value of {@link #readVectoredAvailable()} reflects the availability of the
* API.
*/
class H2SeekableInputStream extends DelegatingSeekableInputStream {

Expand Down Expand Up @@ -88,9 +92,14 @@ public int read(ByteBuffer buf) throws IOException {
}
}

@Override
public boolean readVectoredAvailable() {
return VectorIOBridge.bridgeAvailable();
}

@Override
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {
readVectoredAndPopulate(ranges, allocate, stream);
VectorIOBridge.readVectoredRanges(stream, ranges, allocate);
}

public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
Expand Down

This file was deleted.

Loading

0 comments on commit d38a227

Please sign in to comment.