From 69f54960f36c6aee0873e8e1245a6f664c87efe1 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 28 Jul 2023 17:29:50 +0100 Subject: [PATCH] PARQUET-2171. Vector IO through self-contained shim classes. Takes the vector IO shim code from the hadoop-shim library and reimplements in through the DynMethods class. * Adds low-level tests for the new feature; tests which are skipped on hadoop 2 builds * Parameterizes some of the parquet IO tests to run with/without vectorization enabled. The vectorization=enabled run is not skipped on hadoop 2, to verify that downgrading to any release without vector support (hadoop < 3.3.5) is all good. * Has had all of the parquet-hadoop tests run with vectorization enabled. I'm trying to get the benchmark tests to work; I don't see the evidence that they are getting the switch, based on timings and log. More research needed there. --- .../parquet/io/SeekableInputStream.java | 14 +- .../org/apache/parquet/util/DynMethods.java | 18 +- parquet-hadoop/README.md | 8 + parquet-hadoop/pom.xml | 4 +- .../parquet/hadoop/ParquetFileReader.java | 25 +- .../parquet/hadoop/ParquetInputFormat.java | 21 +- .../hadoop/util/H1SeekableInputStream.java | 15 +- .../hadoop/util/H2SeekableInputStream.java | 15 +- .../hadoop/util/ParquetVectoredIOUtil.java | 64 -- .../hadoop/util/vectorio/BindingUtils.java | 211 +++++++ .../hadoop/util/vectorio/FileRangeBridge.java | 264 ++++++++ .../hadoop/util/vectorio/VectorIOBridge.java | 298 +++++++++ .../hadoop/util/vectorio/package-info.java | 24 + .../TestInputFormatColumnProjection.java | 23 +- .../parquet/hadoop/TestParquetFileWriter.java | 18 +- .../hadoop/example/TestInputOutputFormat.java | 25 +- .../util/vectorio/TestFileRangeBridge.java | 105 ++++ .../util/vectorio/TestVectorIOBridge.java | 593 ++++++++++++++++++ pom.xml | 8 +- 19 files changed, 1636 insertions(+), 117 deletions(-) delete mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ParquetVectoredIOUtil.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java index 25ecbe8652..5e05c4f3aa 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java @@ -109,11 +109,19 @@ public abstract class SeekableInputStream extends InputStream { /** * Read a set of file ranges in a vectored manner. + * @throws UnsupportedOperationException if not available in this class/runtime. */ public void readVectored(List ranges, - IntFunction allocate) throws IOException { - throw new IOException(String.format("Vectored io is not supported for %s .", - this.getClass().getCanonicalName())); + IntFunction allocate) throws IOException { + + throw new UnsupportedOperationException("Vectored IO is not supported for " + this); + } + + /** + * Is the {@link #readVectored(List, IntFunction)} method available. + */ + public boolean readVectoredAvailable() { + return false; } } diff --git a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java index 340baec02e..363afce335 100644 --- a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java +++ b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java @@ -27,10 +27,15 @@ import java.security.PrivilegedAction; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.parquet.Exceptions.throwIfInstance; public class DynMethods { + private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class); + /** * Convenience wrapper class around {@link java.lang.reflect.Method}. * @@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class... argClasses) Class targetClass = Class.forName(className, true, loader); impl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -281,6 +287,7 @@ public Builder impl(Class targetClass, String methodName, Class... argClas targetClass.getMethod(methodName, argClasses), name); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } @@ -311,6 +318,8 @@ public Builder ctorImpl(Class targetClass, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, targetClass, e); } return this; } @@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, className, e); + } return this; } @@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class... argCl Class targetClass = Class.forName(className, true, loader); hiddenImpl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -388,6 +401,7 @@ public Builder hiddenImpl(Class targetClass, String methodName, Class... a this.method = new UnboundMethod(hidden, name); } catch (SecurityException | NoSuchMethodException e) { // unusable or not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index c27c5f2fc1..156dd1b710 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -486,3 +486,11 @@ If `false`, key material is stored in separate new files, created in the same fo **Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits. **Default value:** `128` +--- + +**Property:** `parquet.hadoop.vectored.io.enabled` +**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature. +If `true` then an attempt will be made to dynamically load the relevant classes; +if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases. +**Default value:** `false` + diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 8fa7574c7c..7598af4b44 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -79,11 +79,11 @@ ${hadoop.version} provided - + org.apache.parquet parquet-jackson diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 61ae41118c..c4a2b66ab7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -35,7 +35,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.io.SequenceInputStream; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -57,14 +56,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.shim.functional.FutureIO; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; @@ -108,6 +105,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; +import org.apache.parquet.hadoop.util.vectorio.BindingUtils; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; @@ -995,9 +993,6 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE } // actually read all the chunks ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); - } readAllPartsVectoredOrNormal(allParts, builder); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); @@ -1075,14 +1070,16 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r } /** - * Read data in all parts via either vectored IO or serial IO. + * Read data in all parts via either vectored IO or serial IO. * @param allParts all parts to be read. * @param builder used to build chunk list to read the pages for the different columns. * @throws IOException any IOE. */ private void readAllPartsVectoredOrNormal(List allParts, ChunkListBuilder builder) throws IOException { - boolean isVectoredIO = options.useHadoopVectoredIO() && partsLengthValidForVectoredIO(allParts); + boolean isVectoredIO = options.useHadoopVectoredIO() + && f.readVectoredAvailable() + && partsLengthValidForVectoredIO(allParts); if (isVectoredIO) { readVectored(allParts, builder); } else { @@ -1114,9 +1111,10 @@ private boolean partsLengthValidForVectoredIO(List allParts * @param builder used to build chunk list to read the pages for the different columns. * @throws IOException any IOE. */ - private void readVectored(List allParts, ChunkListBuilder builder) - throws IOException { - List ranges = new ArrayList<>(); + private void readVectored(List allParts, + ChunkListBuilder builder) throws IOException { + + List ranges = new ArrayList<>(allParts.size()); for (ConsecutivePartList consecutiveChunks : allParts) { Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE, "Invalid length %s for vectored read operation. It must be less than max integer value.", @@ -1125,6 +1123,7 @@ private void readVectored(List allParts, ChunkListBuilder b } LOG.debug("Doing vectored IO for ranges {}", ranges); ByteBufferAllocator allocator = options.getAllocator(); + //blocking or asynchronous vectored read. f.readVectored(ranges, allocator::allocate); int k = 0; for (ConsecutivePartList consecutivePart : allParts) { @@ -1949,8 +1948,10 @@ public void readFromVectoredRange(ParquetFileRange currRange, ByteBuffer buffer; try { LOG.debug("Waiting for vectored read to finish for range {} ", currRange); - buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(), + buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(), HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); + // report in a counter the data we just scanned + BenchmarkCounter.incrementBytesRead(currRange.getLength()); } catch (TimeoutException e) { String error = String.format("Timeout while fetching result for %s", currRange); LOG.error(error, e); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index fc60bbafc5..c916b2c7ae 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -124,16 +124,6 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled"; - /** - * Key to enable/disable vectored io while reading parquet files. - */ - public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled"; - - /** - * Default value of parquet.hadoop.vectored.io.enabled is false. - */ - public static final boolean HADOOP_VECTORED_IO_DEFAULT = false; - /** * key to configure whether row group dictionary filtering is enabled */ @@ -173,6 +163,17 @@ public class ParquetInputFormat extends FileInputFormat { private static final int MIN_FOOTER_CACHE_SIZE = 100; + /** + * Key to enable/disable vectored io while reading parquet files: + * {@value}. + */ + public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled"; + + /** + * Default value of parquet.hadoop.vectored.io.enabled is {@value}. + */ + public static final boolean HADOOP_VECTORED_IO_DEFAULT = false; + public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) { ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index c09d6fee65..ccc0fa5841 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.ParquetFileRange; @@ -28,11 +29,14 @@ import java.util.List; import java.util.function.IntFunction; -import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate; - /** * SeekableInputStream implementation that implements read(ByteBuffer) for * Hadoop 1 FSDataInputStream. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H1SeekableInputStream extends DelegatingSeekableInputStream { @@ -63,8 +67,13 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { stream.readFully(bytes, start, len); } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { - readVectoredAndPopulate(ranges, allocate, stream); + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index b8f1eb79da..f6f56650e2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -20,6 +20,7 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.ParquetFileRange; @@ -29,11 +30,14 @@ import java.util.List; import java.util.function.IntFunction; -import static org.apache.parquet.hadoop.util.ParquetVectoredIOUtil.readVectoredAndPopulate; - /** * SeekableInputStream implementation for FSDataInputStream that implements * ByteBufferReadable in Hadoop 2. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H2SeekableInputStream extends DelegatingSeekableInputStream { @@ -88,9 +92,14 @@ public int read(ByteBuffer buf) throws IOException { } } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + @Override public void readVectored(List ranges, IntFunction allocate) throws IOException { - readVectoredAndPopulate(ranges, allocate, stream); + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); } public static void readFully(Reader reader, ByteBuffer buf) throws IOException { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ParquetVectoredIOUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ParquetVectoredIOUtil.java deleted file mode 100644 index 44df3779b4..0000000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ParquetVectoredIOUtil.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.parquet.hadoop.util; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.function.IntFunction; -import java.util.stream.Collectors; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.shim.api.VectorFileRange; -import org.apache.parquet.io.ParquetFileRange; - -import static org.apache.hadoop.fs.shim.api.ShimFactory.shimFSDataInputStream; - -/** - * Utility class to perform vectored IO reads. - */ -public class ParquetVectoredIOUtil { - - /** - * Read data in a list of file ranges. - * - * @param ranges parquet file ranges. - * @param allocate allocate function to allocate memory to hold data. - * @param stream stream from where the data has to be read. - * - * @throws IOException any IOE. - */ - public static void readVectoredAndPopulate(final List ranges, - final IntFunction allocate, - final FSDataInputStream stream) throws IOException { - - // Setting the parquet range as a reference. - List fileRanges = ranges.stream() - .map(range -> VectorFileRange.createFileRange(range.getOffset(), range.getLength(), range)) - .collect(Collectors.toList()); - shimFSDataInputStream(stream).readVectoredRanges(fileRanges, allocate); - // As the range has been set above, there shouldn't be - // NPE or class cast issues. - fileRanges.forEach(fileRange -> { - ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference(); - parquetFileRange.setDataReadFuture(fileRange.getData()); - }); - } -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java new file mode 100644 index 0000000000..176998bbc6 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.util.DynMethods; + +/** + * Package-private binding utils. + */ +public final class BindingUtils { + private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); + + + private BindingUtils() { + } + + /** + * Given a future, evaluate it. + *

+ * Any exception generated in the future is + * extracted and rethrown. + *

+ * + * @param future future to evaluate + * @param timeout timeout to wait + * @param unit time unit. + * @param type of the result. + * + * @return the result, if all went well. + * + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + * @throws TimeoutException the future timed out. + */ + public static T awaitFuture(final Future future, + final long timeout, + final TimeUnit unit) + throws InterruptedIOException, IOException, RuntimeException, + TimeoutException { + try { + LOG.debug("Awaiting future"); + return future.get(timeout, unit); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (ExecutionException e) { + return raiseInnerCause(e); + } + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * if it is an IOE or RTE. + * This will always raise an exception, either the inner IOException, + * an inner RuntimeException, or a new IOException wrapping the raised + * exception. + * + * @param e exception. + * @param type of return value. + * + * @return nothing, ever. + * + * @throws IOException either the inner IOException, or a wrapper around + * any non-Runtime-Exception + * @throws RuntimeException if that is the inner cause. + */ + public static T raiseInnerCause(final ExecutionException e) + throws IOException { + throw unwrapInnerException(e); + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * to an IOException, raising RuntimeExceptions and Errors immediately. + *
    + *
  1. If it is an IOE: Return.
  2. + *
  3. If it is a {@link UncheckedIOException}: return the cause
  4. + *
  5. Completion/Execution Exceptions: extract and repeat
  6. + *
  7. If it is an RTE or Error: throw.
  8. + *
  9. Any other type: wrap in an IOE
  10. + *
+ * + * Recursively handles wrapped Execution and Completion Exceptions in + * case something very complicated has happened. + * + * @param e exception. + * + * @return an IOException extracted or built from the cause. + * + * @throws RuntimeException if that is the inner cause. + * @throws Error if that is the inner cause. + */ + @SuppressWarnings("ChainOfInstanceofChecks") + public static IOException unwrapInnerException(final Throwable e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + return (IOException) cause; + } else if (cause instanceof UncheckedIOException) { + // this is always an IOException + return ((UncheckedIOException) cause).getCause(); + } else if (cause instanceof CompletionException) { + return unwrapInnerException(cause); + } else if (cause instanceof ExecutionException) { + return unwrapInnerException(cause); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else if (cause != null) { + // other type: wrap with a new IOE + return new IOException(cause); + } else { + // this only happens if there was no cause. + return new IOException(e); + } + } + + /** + * Get an invocation from the source class, which will be unavailable() if + * the class is null or the method isn't found. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * + * @return the method or "unavailable" + */ + static DynMethods.UnboundMethod loadInvocation( + Class source, + Class returnType, String name, + Class... parameterTypes) { + + if (source != null) { + final DynMethods.UnboundMethod m = new DynMethods + .Builder(name) + .impl(source, name, parameterTypes) + .orNoop() + .build(); + if (m.isNoop()) { + // this is a sign of a mismatch between this class's expected + // signatures and actual ones. + // log at debug. + LOG.debug("Failed to load method {} from {}", name, source); + } else { + LOG.debug("Found method {} from {}", name, source); + } + return m; + } else { + return noop(name); + } + } + + /** + * Create a no-op method. + * + * @param name method name + * + * @return a no-op method. + */ + static DynMethods.UnboundMethod noop(final String name) { + return new DynMethods.Builder(name) + .orNoop().build(); + } + + /** + * Given a sequence of methods, verify that they are all available. + * + * @param methods methods + * + * @return true if they are all implemented + */ + static boolean implemented(DynMethods.UnboundMethod... methods) { + for (DynMethods.UnboundMethod method : methods) { + if (method.isNoop()) { + return false; + } + } + return true; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java new file mode 100644 index 0000000000..1937976626 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.implemented; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Class to bridge to the FileRange class through reflection. + * A singleton is created when the class is loaded, so there is no need + * to repeat the reflection process on every API call. + *

+ * The Hadoop FileRange class is an interface with getters and setters; + * to instantiate the static method {@code createFileRange()} is used. + */ +final class FileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(FileRangeBridge.class); + + /** Class to bridge to: {@value}. */ + public static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + /** + * The singleton instance of the bridge. + */ + private static final FileRangeBridge INSTANCE = new FileRangeBridge(); + + /** + * Is the bridge available? + */ + private final boolean available; + private final Class fileRangeInterface; + private final DynMethods.UnboundMethod _getData; + private final DynMethods.UnboundMethod _setData; + private final DynMethods.UnboundMethod _getLength; + private final DynMethods.UnboundMethod _getOffset; + private final DynMethods.UnboundMethod _getReference; + private final DynMethods.UnboundMethod createFileRange; + + /** + * Constructor. + * This loads the real FileRange and its methods, if present. + */ + FileRangeBridge() { + + // try to load the class + Class loadedClass; + try { + loadedClass = this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + LOG.debug("No {}", CLASSNAME, e); + loadedClass = null; + } + fileRangeInterface = loadedClass; + + // as loadInvocation returns a no-op if the class is null, this sequence + // will either construct in the list of operations or leave them with no-ops + + _getOffset = loadInvocation(loadedClass, long.class, "getOffset"); + _getLength = loadInvocation(loadedClass, int.class, "getLength"); + _getData = loadInvocation(loadedClass, null, "getData"); + _setData = loadInvocation(loadedClass, void.class, "setData", CompletableFuture.class); + _getReference = loadInvocation(loadedClass, Object.class, "getReference"); + // static interface method to create an instance. + createFileRange = loadInvocation(fileRangeInterface, + Object.class, "createFileRange", + long.class, + int.class, + Object.class); + + // we are available only if the class is present and all methods are found + // the checks for the method are extra paranoia, but harmless + available = loadedClass != null + && implemented( + createFileRange, + _getOffset, + _getLength, + _getData, + _setData, + _getReference); + + LOG.debug("FileRangeBridge availability: {}", available); + } + + /** + * Is the bridge available? + * + * @return true iff the bridge is present. + */ + public boolean available() { + return available; + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Interface " + CLASSNAME + " not found"); + } + } + + /** + * Get the file range class. + * + * @return the file range implementation class, if present. + */ + public Class getFileRangeInterface() { + return fileRangeInterface; + } + + /** + * Instantiate. + * + * @param offset offset in file + * @param length length of data to read. + * @param reference nullable reference to store in the range. + * + * @return a VectorFileRange wrapping a FileRange + * + * @throws RuntimeException if the range cannot be instantiated + * @throws IllegalStateException if the API is not available. + */ + public WrappedFileRange createFileRange( + final long offset, + final int length, + final Object reference) { + + checkAvailable(); + return new WrappedFileRange(createFileRange.invoke(null, offset, length, reference)); + } + + /** + * Build a WrappedFileRange from a ParquetFileRange; + * the reference field of the wrapped object refers + * back to the object passed in. + * + * @param in input. + * + * @return a wrapper around a FileRange instance + */ + public FileRangeBridge.WrappedFileRange toFileRange(final ParquetFileRange in) { + // create a new wrapped file range, fill in and then + // get the instance + return createFileRange(in.getOffset(), in.getLength(), in); + } + + @Override + public String toString() { + return "FileRangeBridge{" + + "available=" + available + + ", fileRangeInterface=" + fileRangeInterface + + ", _getOffset=" + _getOffset + + ", _getLength=" + _getLength + + ", _getData=" + _getData + + ", _setData=" + _setData + + ", _getReference=" + _getReference + + ", createFileRange=" + createFileRange + + '}'; + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static FileRangeBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Wraps a Vector {@code FileRange} instance through reflection. + */ + class WrappedFileRange { + + /** + * The wrapped {@code FileRange} instance. + */ + private final Object fileRange; + + /** + * Instantiate. + * + * @param fileRange non null {@code FileRange} instance. + */ + WrappedFileRange(final Object fileRange) { + this.fileRange = requireNonNull(fileRange); + } + + public long getOffset() { + return _getOffset.invoke(fileRange); + } + + public int getLength() { + return _getLength.invoke(fileRange); + } + + public CompletableFuture getData() { + return _getData.invoke(fileRange); + } + + public void setData(final CompletableFuture data) { + _setData.invoke(fileRange, data); + } + + public Object getReference() { + return _getReference.invoke(fileRange); + } + + /** + * Get the wrapped fileRange. + * + * @return the fileRange. + */ + public Object getFileRange() { + return fileRange; + } + + @Override + public String toString() { + return "WrappedFileRange{" + + "fileRange=" + fileRange + + '}'; + } + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java new file mode 100644 index 0000000000..a14be2a66e --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntFunction; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Vector IO bridge. + *

+ * This loads the {@code PositionedReadable} method: + *

+ *   void readVectored(List[?extends FileRange]> ranges,
+ *     IntFunction[ByteBuffer] allocate) throws IOException
+ * 
+ * It is made slightly easier because of type erasure; the signature of the + * function is actually {@code Void.class method(List.class, IntFunction.class)}. + *

+ * There are some counters to aid in testing; the {@link #toString()} method + * will print them and the loaded method, for use in tests and debug logs. + */ +public final class VectorIOBridge { + + private static final Logger LOG = LoggerFactory.getLogger(VectorIOBridge.class); + + /** + * readVectored Method to look for. + */ + private static final String READ_VECTORED = "readVectored"; + + /** + * hasCapability Method name. + * {@code boolean hasCapability(String capability);} + */ + private static final String HAS_CAPABILITY = "hasCapability"; + + /** + * hasCapability() probe for vectored IO api implementation. + */ + static final String VECTOREDIO_CAPABILITY = "in:readvectored"; + + /** + * The singleton instance of the bridge. + */ + private static final VectorIOBridge INSTANCE = new VectorIOBridge(); + + /** + * readVectored() method. + */ + private final DynMethods.UnboundMethod readVectored; + + /** + * {@code boolean StreamCapabilities.hasCapability(String)}. + */ + private final DynMethods.UnboundMethod hasCapabilityMethod; + + /** + * How many vector read calls made. + */ + private final AtomicLong vectorReads = new AtomicLong(); + + /** + * How many blocks were read. + */ + private final AtomicLong blocksRead = new AtomicLong(); + + /** + * How many bytes were read. + */ + private final AtomicLong bytesRead = new AtomicLong(); + + /** + * Constructor. package private for testing. + */ + private VectorIOBridge() { + + readVectored = loadInvocation(PositionedReadable.class, + Void.TYPE, READ_VECTORED, List.class, IntFunction.class); + LOG.debug("Vector IO availability; ", available()); + + + // if readVectored is present, so is hasCapabilities(). + hasCapabilityMethod = loadInvocation(FSDataInputStream.class, + boolean.class, HAS_CAPABILITY, String.class); + + } + + /** + * Is the bridge available. + * + * @return true if readVectored() is available. + */ + public boolean available() { + return !readVectored.isNoop() && FileRangeBridge.bridgeAvailable(); + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Hadoop VectorIO not found"); + } + } + + /** + * Read data in a list of file ranges. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * The {@link ParquetFileRange} parameters all have their + * data read futures set to the range reads of the associated + * operations; callers must await these to complete. + * + * @param stream stream from where the data has to be read. + * @param ranges parquet file ranges. + * @param allocate allocate function to allocate memory to hold data. + * @throws UnsupportedOperationException if the API is not available. + */ + public static void readVectoredRanges( + final FSDataInputStream stream, + final List ranges, + final IntFunction allocate) { + + final VectorIOBridge bridge = availableInstance(); + final FileRangeBridge rangeBridge = FileRangeBridge.instance(); + // Setting the parquet range as a reference. + List fileRanges = ranges.stream() + .map(rangeBridge::toFileRange) + .collect(Collectors.toList()); + bridge.readWrappedRanges(stream, fileRanges, allocate); + + // copy back the completable futures from the scheduled + // vector reads to the ParquetFileRange entries passed in. + fileRanges.forEach(fileRange -> { + // toFileRange() sets up this back reference + ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference(); + parquetFileRange.setDataReadFuture(fileRange.getData()); + }); + } + + + /** + * Read data in a list of wrapped file ranges, extracting the inner + * instances and then executing. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * + * @param stream stream from where the data has to be read. + * @param ranges wrapped file ranges. + * @param allocate allocate function to allocate memory to hold data. + */ + private void readWrappedRanges( + final PositionedReadable stream, + final List ranges, + final IntFunction allocate) { + + // update the counters. + vectorReads.incrementAndGet(); + blocksRead.addAndGet(ranges.size()); + // extract the instances the wrapped ranges refer to; update the + // bytes read counter. + List instances = ranges.stream() + .map(r -> { + bytesRead.addAndGet(r.getLength()); + return r.getFileRange(); + }) + .collect(Collectors.toList()); + LOG.debug("readVectored with {} ranges on stream {}", + ranges.size(), stream); + readVectored.invoke(stream, instances, allocate); + } + + @Override + public String toString() { + return "VectorIOBridge{" + + "readVectored=" + readVectored + + ", vectorReads=" + vectorReads.get() + + ", blocksRead=" + blocksRead.get() + + ", bytesRead=" + bytesRead.get() + + '}'; + } + + /** + * Does a stream implement StreamCapability and, if so, is the capability + * available. + * If the method is not found, this predicate will return false. + * @param stream input stream to query. + * @param capability the capability to look for. + * @return true if the stream declares the capability is available. + */ + public boolean hasCapability(final FSDataInputStream stream, + final String capability) { + + if (hasCapabilityMethod.isNoop()) { + return false; + } else { + return hasCapabilityMethod.invoke(stream, capability); + } + } + + + /** + * How many vector read calls have been made? + * @return the count of vector reads. + */ + public long getVectorReads() { + return vectorReads.get(); + } + + /** + * How many blocks were read? + * @return the count of blocks read. + */ + public long getBlocksRead() { + return blocksRead.get(); + } + + /** + * How many bytes of data have been read. + * @return the count of bytes read. + */ + public long getBytesRead() { + return bytesRead.get(); + } + + /** + * Reset all counters; for testing. + * Non-atomic. + */ + void resetCounters() { + vectorReads.set(0); + blocksRead.set(0); + bytesRead.set(0); + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static VectorIOBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Get the instance verifying that the API is available. + * @return an available instance, always + * @throws UnsupportedOperationException if it is not. + */ + public static VectorIOBridge availableInstance() { + final VectorIOBridge bridge = instance(); + bridge.checkAvailable(); + return bridge; + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java new file mode 100644 index 0000000000..6c927e7c08 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Vector IO support for Hadoop 3.3.5 runtimes. + * Uses reflect so will compile against older versions, + * but will not actually work. + */ +package org.apache.parquet.hadoop.util.vectorio; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java index a6d2732d1b..8ca40484e5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java @@ -42,15 +42,21 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import static java.lang.Thread.sleep; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +@RunWith(Parameterized.class) public class TestInputFormatColumnProjection { public static final String FILE_CONTENT = "" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," + @@ -96,7 +102,19 @@ protected void map(Void key, Group value, Context context) @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputFormatColumnProjection(boolean readType) { + this.readType = readType; + } @Test public void testProjectionSize() throws Exception { Assume.assumeTrue( // only run this test for Hadoop 2 @@ -115,6 +133,8 @@ public void testProjectionSize() throws Exception { outputFolder.delete(); Configuration conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); // set the projection schema conf.set("parquet.read.schema", Types.buildMessage() .required(BINARY).as(UTF8).named("char") @@ -164,7 +184,8 @@ public void testProjectionSize() throws Exception { bytesRead = Reader.bytesReadCounter.getValue(); } - Assert.assertTrue("Should read less than 10% of the input file size", + Assert.assertTrue("Should read (" + bytesRead + " bytes)" + + " less than 10% of the input file size (" + bytesWritten + ")", bytesRead < (bytesWritten / 10)); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 526b77c8c5..94ee413fd6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -125,25 +125,27 @@ public class TestParquetFileWriter { private String writeSchema; - private final String readType; - @Rule public final TemporaryFolder temp = new TemporaryFolder(); - @Parameterized.Parameters(name = "Read type : {0}") - public static List params() { - return Arrays.asList("vectored", "normal"); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); } + /** + * Read type: true for vectored IO. + */ + private final boolean readType; - public TestParquetFileWriter(String readType) { + public TestParquetFileWriter(boolean readType) { this.readType = readType; } private Configuration getTestConfiguration() { Configuration conf = new Configuration(); - conf.set(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, - String.valueOf(readType.equals("vectored"))); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); return conf; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java index 188a79643d..80baaa68a4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -63,9 +64,13 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageTypeParser; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestInputOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class); @@ -82,9 +87,24 @@ public class TestInputOutputFormat { private Class> readMapperClass; private Class> writeMapperClass; + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputOutputFormat(boolean readType) { + this.readType = readType; + } @Before public void setUp() { conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); writeSchema = "message example {\n" + "required int32 line;\n" + "required binary content;\n" + @@ -335,8 +355,9 @@ public void testReadWriteWithCounter() throws Exception { assertTrue(value(readJob, "parquet", "bytesread") > 0L); assertTrue(value(readJob, "parquet", "bytestotal") > 0L); - assertTrue(value(readJob, "parquet", "bytesread") - == value(readJob, "parquet", "bytestotal")); + assertEquals("bytestotal != bytesread", + value(readJob, "parquet", "bytestotal"), + value(readJob, "parquet", "bytesread")); //not testing the time read counter since it could be zero due to the size of data is too small } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java new file mode 100644 index 0000000000..5119177553 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNoException; + +/** + * Test the {@link FileRangeBridge} class; requires + * Hadoop 3.3.5+ + */ +public class TestFileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(TestFileRangeBridge.class); + + /** + * Class required for tests to run: {@value }. + * Probed in setup. + */ + static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + @Before + public void setUp() { + + // look for the FileRange; if not found, skip + try { + this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + assumeNoException(e); + } + } + + /** + * Attempt to explicit instantiate the bridge; + * package private constructor. + */ + @Test + public void testInstantiate() throws Throwable { + new FileRangeBridge(); + } + + /** + * Is the shared bridge instance non-null, and does the + * {@link FileRangeBridge#bridgeAvailable()} predicate + * return true. + */ + @Test + public void testFileRangeBridgeAvailable() throws Throwable { + assertNotNull("FileRangeBridge instance null", FileRangeBridge.instance()); + assertTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + } + + /** + * Create a range and validate the getters return the original values. + */ + @Test + public void testCreateFileRange() { + Object reference = "backref"; + FileRangeBridge.WrappedFileRange range = FileRangeBridge.instance() + .createFileRange(512L, 16384, reference); + LOG.info("created range {}", range); + assertNotNull("null range", range); + assertNotNull("null range instance", range.getFileRange()); + assertEquals("offset of " + range, 512L, range.getOffset()); + assertEquals("length of " + range, 16384, range.getLength()); + assertSame("backref of " + range, reference, range.getReference()); + + // this isn't set until readVectored() is called + assertNull("non-null range future", range.getData()); + } + + /** + * Verify there is no parameter validation. + */ + @Test + public void testCreateInvalidRange() { + FileRangeBridge.instance() + .createFileRange(-1, -1, null); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java new file mode 100644 index 0000000000..8b6c7347e8 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.IntFunction; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.io.ParquetFileRange; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.awaitFuture; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.VECTOREDIO_CAPABILITY; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.instance; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.readVectoredRanges; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Test the vector IO bridge. + * Much of this is lifted from hadoop-common test + * {@code AbstractContractVectoredReadTest}; + * with other utility methods from + * {@code org.apache.hadoop.fs.contract.ContractTestUtils}. + */ +public class TestVectorIOBridge { + private static final int DATASET_LEN = 64 * 1024; + private static final byte[] DATASET = dataset(DATASET_LEN, 'a', 32); + private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + + /** + * Timeout in seconds for vectored read operation in tests : {@value}. + */ + private static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60; + + /** + * relative vectored path. + */ + private final Path vectoredPath = new Path("target/test/vectored"); + + private final HeapByteBufferAllocator bufferAllocator = new HeapByteBufferAllocator(); + private final ElasticByteBufferPool pool = + new ElasticByteBufferPool(); + + private final IntFunction allocate = value -> { + return pool.getBuffer(false, value); + }; + + private FileSystem fileSystem; + private Path testFilePath; + + public TestVectorIOBridge() { + + } + + @Before + public void setUp() throws IOException { + // skip the tests if the FileRangeBridge goes not load. + assumeTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + + fileSystem = FileSystem.getLocal(new Configuration()); + testFilePath = fileSystem.makeQualified(vectoredPath); + createFile(fileSystem, testFilePath, DATASET); + } + + @After + public void tearDown() throws IOException { + if (fileSystem != null) { + fileSystem.delete(testFilePath, false); + } + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + /** + * If the file range bridge is available, so must be the vector io bridge. + */ + @Test + public void testVectorIOBridgeAvailable() throws Throwable { + assertTrue("VectorIOBridge not available", VectorIOBridge.bridgeAvailable()); + } + + /** + * Create a dataset for use in the tests; all data is in the range + * base to (base+modulo-1) inclusive. + * + * @param len length of data + * @param base base of the data + * @param modulo the modulo + * + * @return the newly generated dataset + */ + private static byte[] dataset(int len, int base, int modulo) { + byte[] dataset = new byte[len]; + for (int i = 0; i < len; i++) { + dataset[i] = (byte) (base + (i % modulo)); + } + return dataset; + } + + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * + * @param fileRanges list of file range. + * @param pool buffer pool. + * + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (ParquetFileRange range : fileRanges) { + ByteBuffer buffer = awaitFuture(range.getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + + /** + * Create a file. + * + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * + * @throws IOException on any problem + */ + public static void createFile(FileSystem fs, + Path path, + byte[] data) throws IOException { + try (FSDataOutputStream stream = fs.create(path, true)) { + if (data != null && data.length > 0) { + stream.write(data); + } + } + } + + /** + * Open the test file. + * @return test file input stream + * @throws IOException failure to open + */ + private FSDataInputStream openTestFile() throws IOException { + return getFileSystem().open(testFilePath); + } + + /** + * Read a list of ranges, all adjacent. + */ + @Test + public void testVectoredReadMultipleRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + fileRanges.add(range(i * 100, 100)); + } + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * VectorIO and readFully() can coexist. + */ + @Test + public void testVectoredReadAndReadFully() throws Exception { + final int offset = 100; + final int length = 256; + List fileRanges = ranges(offset, length); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + byte[] readFullRes = new byte[length]; + in.readFully(offset, readFullRes); + ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + final byte[] array = vecRes.array(); + assertDatasetEquals(0, "readFully", + vecRes, length, readFullRes); + } finally { + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024,none of the test ranges + * will get merged. + */ + @Test + public void testDisjointRanges() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Verify the stream really implements the readVectored API, rather + * than fall back to the base implementation. + */ + @Test + public void testStreamImplementsReadVectored() throws Exception { + + try (FSDataInputStream in = openTestFile()) { + final boolean streamDoesNativeVectorIO = instance().hasCapability( + in, + VECTOREDIO_CAPABILITY); + assertTrue("capability " + VECTOREDIO_CAPABILITY + " not supported by " + in, + streamDoesNativeVectorIO); + } + } + + /** + * As the minimum seek value is 4*1024, all the below ranges + * will get merged into one. + */ + @Test + public void testAllRangesMergedIntoOne() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + */ + @Test + public void testSomeRangesMergedSomeUnmerged() throws Exception { + List fileRanges = ranges( + 8 * 1024, 100, + 14 * 1024, 100, + 10 * 1024, 100, + 2 * 1024 - 101, 100, + 40 * 1024, 1024); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testOverlappingRanges() throws Exception { + List fileRanges = getSampleOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + List fileRanges = getSampleSameRanges(); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 1000, 200, + 50, 10, + 10, 5); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testConsecutiveRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 600, 200, + 800, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testNegativeLengthRange() throws Exception { + verifyExceptionalVectoredRead(getFileSystem(), ranges(1, -50), + IllegalArgumentException.class); + } + + /** + * Negative ranges are rejected; the inner cause is an + * {@code EOFException}. + */ + @Test + public void testNegativeOffsetRange() throws Exception { + final RuntimeException ex = verifyExceptionalVectoredRead( + getFileSystem(), ranges(-1, 50), + RuntimeException.class); + if (!(ex.getCause() instanceof EOFException)) { + throw ex; + } + } + + /** + * Classic seek/read read after vectored IO. + */ + @Test + public void testNormalReadAfterVectoredRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Vectored IO after Classic seek/read. + */ + @Test + public void testVectoredReadAfterNormalRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + readVectoredRanges(in, fileRanges, allocate); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testMultipleVectoredReads() throws Exception { + List fileRanges1 = getSampleNonOverlappingRanges(); + + List fileRanges2 = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + + readVectoredRanges(in, fileRanges1, allocate); + readVectoredRanges(in, fileRanges2, allocate); + + validateVectoredReadResult(fileRanges2, DATASET); + validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); + } + } + + /** + * Test to validate EOF ranges. Default implementation fails with EOFException + * while reading the ranges. Some implementation like s3, checksum fs fail fast + * as they already have the file length calculated. + * + * @throws Exception + */ +/* +@Test +public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = ranges(DATASET_LEN, 100); + try (FSDataInputStream in = fs.open(testFilePath)) { + in.readVectored(fileRanges, allocate); + readVectoredRanges(in, fileRanges, allocate); + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + interceptFuture(EOFException.class, + "", + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + } +}*/ + + /** + * Create a ParquetFileRange instance. + * @param offset offset in file + * @param length range length + * @return a range + */ + private ParquetFileRange range(final long offset, final int length) { + return new ParquetFileRange(offset, length); + } + + /** + * Create a list of ranges where the arguments are expected to + * be pairs of int offset and range. + * @param args an even-numbered list. + * @return a list of ranges. + */ + private List ranges(int... args) { + final int len = args.length; + assertTrue("range argument length of " + len + " is not even", + (len % 1) == 0); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < len; i += 2) { + fileRanges.add(range(args[i], args[i + 1])); + } + return fileRanges; + } + + protected List getSampleNonOverlappingRanges() { + return ranges(0, 100, 110, 50); + } + + protected List getSampleOverlappingRanges() { + return ranges( + 100, 500, + 400, 500); + } + + protected List getConsecutiveRanges() { + return ranges( + 100, 500, + 600, 500); + } + + protected List getSampleSameRanges() { + return ranges( + 8_000, 1000, + 8_000, 1000, + 8_000, 1000); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData original data. + */ + public static void assertDatasetEquals( + final int readOffset, + final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } + + /** + * Utility to validate vectored read results. + * + * @param fileRanges input ranges. + * @param originalData original data. + * + * @throws IOException any ioe. + */ + public static void validateVectoredReadResult(List fileRanges, + byte[] originalData) + throws IOException, TimeoutException { + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + awaitFuture(combinedFuture, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + + for (ParquetFileRange res : fileRanges) { + CompletableFuture data = res.getDataReadFuture(); + ByteBuffer buffer = awaitFuture(data, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + assertDatasetEquals((int) res.getOffset(), "vecRead", + buffer, res.getLength(), originalData); + } + } + + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * + * @throws Exception any other IOE. + */ + protected T verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + fail("expected error reading " + in); + // for the compiler + return null; + } catch (AssertionError e) { + throw e; + } catch (Exception e) { + if (!clazz.isAssignableFrom(e.getClass())) { + throw e; + } + return (T) e; + } + } +} diff --git a/pom.xml b/pom.xml index e09eca1b25..0f6e3af55b 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,6 @@ 0.14.2 shaded.parquet 3.3.5 - 1.0-SNAPSHOT 2.9.0 1.13.0 thrift @@ -213,12 +212,6 @@ - - org.apache.hadoop.extensions - hadoop-api-shim - ${hadoop-api-shim.version} - - @@ -445,6 +438,7 @@ maven-surefire-plugin ${surefire.argLine} ${extraJavaTestArgs} + false