From cb4eee0e9e9938520037b923a5c7e86a07b03c43 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Thu, 13 Oct 2022 15:37:03 -0500 Subject: [PATCH] PARQUET-2171. Support vectored IO in Hadoop stream reads HADOOP-18103 introduced high performance vectored read API in hadoop filesystems, including the local filesystem and S3A. This patch supports the vectored IO API in ParquetFileReader, switching to it when the hadoop version supports it and the parquet library has it explicitly enabled. To use vectored IO, set parquet.hadoop.vectored.io.enabled=true The API is invoked through reflection so that on Hadoop releases with the feature (3.3.5+) the Vectored IO operations are handed off to the Hadoop input stream. On older versions the feature is not available -the parquet library will compile and run as normal. Some existing tests have been parameterized to verify that behavior is consistent with/without vector IO enabled; On hadoop2 profile test runs, this implicitly verifies that compatibility is maintained. --- .../apache/parquet/io/ParquetFileRange.java | 71 +++ .../parquet/io/SeekableInputStream.java | 19 + .../org/apache/parquet/util/DynMethods.java | 18 +- parquet-hadoop/README.md | 8 + parquet-hadoop/pom.xml | 5 + .../org/apache/parquet/HadoopReadOptions.java | 8 +- .../apache/parquet/ParquetReadOptions.java | 17 +- .../parquet/hadoop/ParquetFileReader.java | 107 +++- .../parquet/hadoop/ParquetInputFormat.java | 11 + .../hadoop/util/H1SeekableInputStream.java | 20 + .../hadoop/util/H2SeekableInputStream.java | 19 + .../hadoop/util/vectorio/BindingUtils.java | 211 +++++++ .../hadoop/util/vectorio/FileRangeBridge.java | 264 ++++++++ .../hadoop/util/vectorio/VectorIOBridge.java | 300 +++++++++ .../hadoop/util/vectorio/package-info.java | 24 + .../TestInputFormatColumnProjection.java | 23 +- .../parquet/hadoop/TestParquetFileWriter.java | 43 +- .../hadoop/example/TestInputOutputFormat.java | 25 +- .../util/vectorio/TestFileRangeBridge.java | 105 ++++ .../util/vectorio/TestVectorIOBridge.java | 593 ++++++++++++++++++ 20 files changed, 1866 insertions(+), 25 deletions(-) create mode 100644 parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java diff --git a/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java b/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java new file mode 100644 index 0000000000..fea9a9d926 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/ParquetFileRange.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * Class to define a file range for a parquet file and to + * hold future data for any ongoing read for that range. + */ +public class ParquetFileRange { + + /** + * Start position in file. + */ + private final long offset; + + /** + * Length of data to be read from position. + */ + private final int length; + + /** + * A future object to hold future for ongoing read. + */ + private CompletableFuture dataReadFuture; + + public ParquetFileRange(long offset, int length) { + this.offset = offset; + this.length = length; + } + + public long getOffset() { + return offset; + } + + public int getLength() { + return length; + } + + public CompletableFuture getDataReadFuture() { + return dataReadFuture; + } + + public void setDataReadFuture(CompletableFuture dataReadFuture) { + this.dataReadFuture = dataReadFuture; + } + + @Override + public String toString() { + return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")"; + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java index ed20163613..5e05c4f3aa 100644 --- a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * {@code SeekableInputStream} is an interface with the methods needed by @@ -105,4 +107,21 @@ public abstract class SeekableInputStream extends InputStream { */ public abstract void readFully(ByteBuffer buf) throws IOException; + /** + * Read a set of file ranges in a vectored manner. + * @throws UnsupportedOperationException if not available in this class/runtime. + */ + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + + throw new UnsupportedOperationException("Vectored IO is not supported for " + this); + } + + /** + * Is the {@link #readVectored(List, IntFunction)} method available. + */ + public boolean readVectoredAvailable() { + return false; + } + } diff --git a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java index 340baec02e..363afce335 100644 --- a/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java +++ b/parquet-common/src/main/java/org/apache/parquet/util/DynMethods.java @@ -27,10 +27,15 @@ import java.security.PrivilegedAction; import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.parquet.Exceptions.throwIfInstance; public class DynMethods { + private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class); + /** * Convenience wrapper class around {@link java.lang.reflect.Method}. * @@ -243,7 +248,8 @@ public Builder impl(String className, String methodName, Class... argClasses) Class targetClass = Class.forName(className, true, loader); impl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -281,6 +287,7 @@ public Builder impl(Class targetClass, String methodName, Class... argClas targetClass.getMethod(methodName, argClasses), name); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } @@ -311,6 +318,8 @@ public Builder ctorImpl(Class targetClass, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, targetClass, e); } return this; } @@ -327,6 +336,9 @@ public Builder ctorImpl(String className, Class... argClasses) { .buildChecked(); } catch (NoSuchMethodException e) { // not the right implementation + LOG.debug("failed to load constructor arity {} from class {}", + argClasses.length, className, e); + } return this; } @@ -349,7 +361,8 @@ public Builder hiddenImpl(String className, String methodName, Class... argCl Class targetClass = Class.forName(className, true, loader); hiddenImpl(targetClass, methodName, argClasses); } catch (ClassNotFoundException e) { - // not the right implementation + // class not found on supplied classloader. + LOG.debug("failed to load class {}", className, e); } return this; } @@ -388,6 +401,7 @@ public Builder hiddenImpl(Class targetClass, String methodName, Class... a this.method = new UnboundMethod(hidden, name); } catch (SecurityException | NoSuchMethodException e) { // unusable or not the right implementation + LOG.debug("failed to load method {} from class {}", methodName, targetClass, e); } return this; } diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md index c27c5f2fc1..156dd1b710 100644 --- a/parquet-hadoop/README.md +++ b/parquet-hadoop/README.md @@ -486,3 +486,11 @@ If `false`, key material is stored in separate new files, created in the same fo **Description:** Length of key encryption keys (KEKs), randomly generated by parquet key management tools. Can be 128, 192 or 256 bits. **Default value:** `128` +--- + +**Property:** `parquet.hadoop.vectored.io.enabled` +**Description:** Flag to enable use of the FileSystem Vector IO API on Hadoop releases which support the feature. +If `true` then an attempt will be made to dynamically load the relevant classes; +if not found then the library will use the classic non-vectored reads: it is safe to enable this option on older releases. +**Default value:** `false` + diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index e96830cc28..7598af4b44 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -79,6 +79,11 @@ ${hadoop.version} provided + org.apache.parquet parquet-jackson diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 8f0d8d8933..e0118d8b7d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -27,6 +27,7 @@ import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.util.HadoopCodecs; import java.util.Map; @@ -37,6 +38,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED; import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY; @@ -54,6 +56,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, + boolean useHadoopVectoredIO, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -64,7 +67,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, FileDecryptionProperties fileDecryptionProperties) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, codecFactory, allocator, + usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIO, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties ); this.conf = conf; @@ -111,6 +114,7 @@ public Builder(Configuration conf, Path filePath) { usePageChecksumVerification)); useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); useOffHeapDecryptBuffer(conf.getBoolean(OFF_HEAP_DECRYPT_BUFFER_ENABLED, false)); + useHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, ParquetInputFormat.HADOOP_VECTORED_IO_DEFAULT)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); @@ -128,7 +132,7 @@ public ParquetReadOptions build() { } return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, conf, fileDecryptionProperties); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index dc130ee8d2..495d8eb7ec 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -41,6 +41,7 @@ public class ParquetReadOptions { private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true; + private static final boolean HADOOP_VECTORED_IO_ENABLED_DEFAULT = false; private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; @@ -54,6 +55,7 @@ public class ParquetReadOptions { private final boolean usePageChecksumVerification; private final boolean useBloomFilter; private final boolean useOffHeapDecryptBuffer; + private final boolean useHadoopVectoredIO; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -70,6 +72,7 @@ public class ParquetReadOptions { boolean usePageChecksumVerification, boolean useBloomFilter, boolean useOffHeapDecryptBuffer, + boolean useHadoopVectoredIO, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -85,6 +88,7 @@ public class ParquetReadOptions { this.usePageChecksumVerification = usePageChecksumVerification; this.useBloomFilter = useBloomFilter; this.useOffHeapDecryptBuffer = useOffHeapDecryptBuffer; + this.useHadoopVectoredIO = useHadoopVectoredIO; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; @@ -126,6 +130,10 @@ public boolean usePageChecksumVerification() { return usePageChecksumVerification; } + public boolean useHadoopVectoredIO() { + return useHadoopVectoredIO; + } + public FilterCompat.Filter getRecordFilter() { return recordFilter; } @@ -173,6 +181,7 @@ public static class Builder { protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; + protected boolean useHadoopVectoredIo = HADOOP_VECTORED_IO_ENABLED_DEFAULT; protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT; protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; @@ -226,6 +235,11 @@ public Builder useRecordFilter() { return this; } + public Builder useHadoopVectoredIo(boolean useHadoopVectoredIo) { + this.useHadoopVectoredIo = useHadoopVectoredIo; + return this; + } + public Builder useColumnIndexFilter(boolean useColumnIndexFilter) { this.useColumnIndexFilter = useColumnIndexFilter; return this; @@ -320,6 +334,7 @@ public Builder copy(ParquetReadOptions options) { useDictionaryFilter(options.useDictionaryFilter); useRecordFilter(options.useRecordFilter); withRecordFilter(options.recordFilter); + useHadoopVectoredIo(options.useHadoopVectoredIO); withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); @@ -338,7 +353,7 @@ public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, recordFilter, metadataFilter, + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useOffHeapDecryptBuffer, useHadoopVectoredIo, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index f63367022e..c4a2b66ab7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -53,8 +53,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; @@ -63,6 +64,8 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; @@ -102,6 +105,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; +import org.apache.parquet.hadoop.util.vectorio.BindingUtils; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; @@ -110,6 +114,7 @@ import org.apache.parquet.internal.hadoop.metadata.IndexReference; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.ParquetFileRange; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -126,6 +131,8 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300; + private final ParquetMetadataConverter converter; private final CRC32 crc; @@ -986,9 +993,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE } // actually read all the chunks ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); - } + readAllPartsVectoredOrNormal(allParts, builder); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1064,6 +1069,69 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); } + /** + * Read data in all parts via either vectored IO or serial IO. + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + */ + private void readAllPartsVectoredOrNormal(List allParts, ChunkListBuilder builder) + throws IOException { + boolean isVectoredIO = options.useHadoopVectoredIO() + && f.readVectoredAvailable() + && partsLengthValidForVectoredIO(allParts); + if (isVectoredIO) { + readVectored(allParts, builder); + } else { + for (ConsecutivePartList consecutiveChunks : allParts) { + consecutiveChunks.readAll(f, builder); + } + } + } + + /** + * Vectored IO doesn't support reading ranges of size greater than + * Integer.MAX_VALUE. + * @param allParts all parts to read. + * @return true or false. + */ + private boolean partsLengthValidForVectoredIO(List allParts) { + for (ConsecutivePartList consecutivePart : allParts) { + if (consecutivePart.length >= Integer.MAX_VALUE) { + LOG.debug("Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO", consecutivePart.length); + return false; + } + } + return true; + } + + /** + * Read all parts through vectored IO. + * @param allParts all parts to be read. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException any IOE. + */ + private void readVectored(List allParts, + ChunkListBuilder builder) throws IOException { + + List ranges = new ArrayList<>(allParts.size()); + for (ConsecutivePartList consecutiveChunks : allParts) { + Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE, + "Invalid length %s for vectored read operation. It must be less than max integer value.", + consecutiveChunks.length); + ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length)); + } + LOG.debug("Doing vectored IO for ranges {}", ranges); + ByteBufferAllocator allocator = options.getAllocator(); + //blocking or asynchronous vectored read. + f.readVectored(ranges, allocator::allocate); + int k = 0; + for (ConsecutivePartList consecutivePart : allParts) { + ParquetFileRange currRange = ranges.get(k++); + consecutivePart.readFromVectoredRange(currRange, builder); + } + } + /** * Reads all the columns requested from the row group at the current file position. It may skip specific pages based * on the column indexes according to the actual filter. As the rows are not aligned among the pages of the different @@ -1141,10 +1209,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData bloc } } } - // actually read all the chunks - for (ConsecutivePartList consecutiveChunks : allParts) { - consecutiveChunks.readAll(f, builder); - } + readAllPartsVectoredOrNormal(allParts, builder); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1872,6 +1937,32 @@ public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOEx } } + /** + * Populate data in a parquet file range from vectored range. + * @param currRange range to populated. + * @param builder used to build chunk list to read the pages for the different columns. + * @throws IOException if there is an error while reading from the stream. + */ + public void readFromVectoredRange(ParquetFileRange currRange, + ChunkListBuilder builder) throws IOException { + ByteBuffer buffer; + try { + LOG.debug("Waiting for vectored read to finish for range {} ", currRange); + buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(), + HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS); + // report in a counter the data we just scanned + BenchmarkCounter.incrementBytesRead(currRange.getLength()); + } catch (TimeoutException e) { + String error = String.format("Timeout while fetching result for %s", currRange); + LOG.error(error, e); + throw new IOException(error, e); + } + ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer); + for (ChunkDescriptor descriptor : chunks) { + builder.add(descriptor, stream.sliceBuffers(descriptor.size), f); + } + } + /** * @return the position following the last byte of these chunks */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index 1bfd4b20f0..c916b2c7ae 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -163,6 +163,17 @@ public class ParquetInputFormat extends FileInputFormat { private static final int MIN_FOOTER_CACHE_SIZE = 100; + /** + * Key to enable/disable vectored io while reading parquet files: + * {@value}. + */ + public static final String HADOOP_VECTORED_IO_ENABLED = "parquet.hadoop.vectored.io.enabled"; + + /** + * Default value of parquet.hadoop.vectored.io.enabled is {@value}. + */ + public static final boolean HADOOP_VECTORED_IO_DEFAULT = false; + public static void setTaskSideMetaData(Job job, boolean taskSideMetadata) { ContextUtil.getConfiguration(job).setBoolean(TASK_SIDE_METADATA, taskSideMetadata); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java index 0f8cdbb551..ccc0fa5841 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java @@ -20,12 +20,23 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.ParquetFileRange; + import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * SeekableInputStream implementation that implements read(ByteBuffer) for * Hadoop 1 FSDataInputStream. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H1SeekableInputStream extends DelegatingSeekableInputStream { @@ -56,4 +67,13 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { stream.readFully(bytes, start, len); } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + + @Override + public void readVectored(List ranges, IntFunction allocate) throws IOException { + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java index 4bbbb8ed1f..f6f56650e2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java @@ -20,15 +20,24 @@ package org.apache.parquet.hadoop.util; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge; import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.ParquetFileRange; import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; /** * SeekableInputStream implementation for FSDataInputStream that implements * ByteBufferReadable in Hadoop 2. + * It implements {@link #readVectored(List, IntFunction)}) by + * handing off to VectorIOBridge which uses reflection to offer the API if it is + * found. + * The return value of {@link #readVectoredAvailable()} reflects the availability of the + * API. */ class H2SeekableInputStream extends DelegatingSeekableInputStream { @@ -83,6 +92,16 @@ public int read(ByteBuffer buf) throws IOException { } } + @Override + public boolean readVectoredAvailable() { + return VectorIOBridge.bridgeAvailable(); + } + + @Override + public void readVectored(List ranges, IntFunction allocate) throws IOException { + VectorIOBridge.readVectoredRanges(stream, ranges, allocate); + } + public static void readFully(Reader reader, ByteBuffer buf) throws IOException { // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java new file mode 100644 index 0000000000..176998bbc6 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/BindingUtils.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.util.DynMethods; + +/** + * Package-private binding utils. + */ +public final class BindingUtils { + private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class); + + + private BindingUtils() { + } + + /** + * Given a future, evaluate it. + *

+ * Any exception generated in the future is + * extracted and rethrown. + *

+ * + * @param future future to evaluate + * @param timeout timeout to wait + * @param unit time unit. + * @param type of the result. + * + * @return the result, if all went well. + * + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + * @throws TimeoutException the future timed out. + */ + public static T awaitFuture(final Future future, + final long timeout, + final TimeUnit unit) + throws InterruptedIOException, IOException, RuntimeException, + TimeoutException { + try { + LOG.debug("Awaiting future"); + return future.get(timeout, unit); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException(e.toString()) + .initCause(e); + } catch (ExecutionException e) { + return raiseInnerCause(e); + } + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * if it is an IOE or RTE. + * This will always raise an exception, either the inner IOException, + * an inner RuntimeException, or a new IOException wrapping the raised + * exception. + * + * @param e exception. + * @param type of return value. + * + * @return nothing, ever. + * + * @throws IOException either the inner IOException, or a wrapper around + * any non-Runtime-Exception + * @throws RuntimeException if that is the inner cause. + */ + public static T raiseInnerCause(final ExecutionException e) + throws IOException { + throw unwrapInnerException(e); + } + + /** + * From the inner cause of an execution exception, extract the inner cause + * to an IOException, raising RuntimeExceptions and Errors immediately. + *
    + *
  1. If it is an IOE: Return.
  2. + *
  3. If it is a {@link UncheckedIOException}: return the cause
  4. + *
  5. Completion/Execution Exceptions: extract and repeat
  6. + *
  7. If it is an RTE or Error: throw.
  8. + *
  9. Any other type: wrap in an IOE
  10. + *
+ * + * Recursively handles wrapped Execution and Completion Exceptions in + * case something very complicated has happened. + * + * @param e exception. + * + * @return an IOException extracted or built from the cause. + * + * @throws RuntimeException if that is the inner cause. + * @throws Error if that is the inner cause. + */ + @SuppressWarnings("ChainOfInstanceofChecks") + public static IOException unwrapInnerException(final Throwable e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + return (IOException) cause; + } else if (cause instanceof UncheckedIOException) { + // this is always an IOException + return ((UncheckedIOException) cause).getCause(); + } else if (cause instanceof CompletionException) { + return unwrapInnerException(cause); + } else if (cause instanceof ExecutionException) { + return unwrapInnerException(cause); + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else if (cause != null) { + // other type: wrap with a new IOE + return new IOException(cause); + } else { + // this only happens if there was no cause. + return new IOException(e); + } + } + + /** + * Get an invocation from the source class, which will be unavailable() if + * the class is null or the method isn't found. + * + * @param return type + * @param source source. If null, the method is a no-op. + * @param returnType return type class (unused) + * @param name method name + * @param parameterTypes parameters + * + * @return the method or "unavailable" + */ + static DynMethods.UnboundMethod loadInvocation( + Class source, + Class returnType, String name, + Class... parameterTypes) { + + if (source != null) { + final DynMethods.UnboundMethod m = new DynMethods + .Builder(name) + .impl(source, name, parameterTypes) + .orNoop() + .build(); + if (m.isNoop()) { + // this is a sign of a mismatch between this class's expected + // signatures and actual ones. + // log at debug. + LOG.debug("Failed to load method {} from {}", name, source); + } else { + LOG.debug("Found method {} from {}", name, source); + } + return m; + } else { + return noop(name); + } + } + + /** + * Create a no-op method. + * + * @param name method name + * + * @return a no-op method. + */ + static DynMethods.UnboundMethod noop(final String name) { + return new DynMethods.Builder(name) + .orNoop().build(); + } + + /** + * Given a sequence of methods, verify that they are all available. + * + * @param methods methods + * + * @return true if they are all implemented + */ + static boolean implemented(DynMethods.UnboundMethod... methods) { + for (DynMethods.UnboundMethod method : methods) { + if (method.isNoop()) { + return false; + } + } + return true; + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java new file mode 100644 index 0000000000..1937976626 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/FileRangeBridge.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.implemented; +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Class to bridge to the FileRange class through reflection. + * A singleton is created when the class is loaded, so there is no need + * to repeat the reflection process on every API call. + *

+ * The Hadoop FileRange class is an interface with getters and setters; + * to instantiate the static method {@code createFileRange()} is used. + */ +final class FileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(FileRangeBridge.class); + + /** Class to bridge to: {@value}. */ + public static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + /** + * The singleton instance of the bridge. + */ + private static final FileRangeBridge INSTANCE = new FileRangeBridge(); + + /** + * Is the bridge available? + */ + private final boolean available; + private final Class fileRangeInterface; + private final DynMethods.UnboundMethod _getData; + private final DynMethods.UnboundMethod _setData; + private final DynMethods.UnboundMethod _getLength; + private final DynMethods.UnboundMethod _getOffset; + private final DynMethods.UnboundMethod _getReference; + private final DynMethods.UnboundMethod createFileRange; + + /** + * Constructor. + * This loads the real FileRange and its methods, if present. + */ + FileRangeBridge() { + + // try to load the class + Class loadedClass; + try { + loadedClass = this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + LOG.debug("No {}", CLASSNAME, e); + loadedClass = null; + } + fileRangeInterface = loadedClass; + + // as loadInvocation returns a no-op if the class is null, this sequence + // will either construct in the list of operations or leave them with no-ops + + _getOffset = loadInvocation(loadedClass, long.class, "getOffset"); + _getLength = loadInvocation(loadedClass, int.class, "getLength"); + _getData = loadInvocation(loadedClass, null, "getData"); + _setData = loadInvocation(loadedClass, void.class, "setData", CompletableFuture.class); + _getReference = loadInvocation(loadedClass, Object.class, "getReference"); + // static interface method to create an instance. + createFileRange = loadInvocation(fileRangeInterface, + Object.class, "createFileRange", + long.class, + int.class, + Object.class); + + // we are available only if the class is present and all methods are found + // the checks for the method are extra paranoia, but harmless + available = loadedClass != null + && implemented( + createFileRange, + _getOffset, + _getLength, + _getData, + _setData, + _getReference); + + LOG.debug("FileRangeBridge availability: {}", available); + } + + /** + * Is the bridge available? + * + * @return true iff the bridge is present. + */ + public boolean available() { + return available; + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Interface " + CLASSNAME + " not found"); + } + } + + /** + * Get the file range class. + * + * @return the file range implementation class, if present. + */ + public Class getFileRangeInterface() { + return fileRangeInterface; + } + + /** + * Instantiate. + * + * @param offset offset in file + * @param length length of data to read. + * @param reference nullable reference to store in the range. + * + * @return a VectorFileRange wrapping a FileRange + * + * @throws RuntimeException if the range cannot be instantiated + * @throws IllegalStateException if the API is not available. + */ + public WrappedFileRange createFileRange( + final long offset, + final int length, + final Object reference) { + + checkAvailable(); + return new WrappedFileRange(createFileRange.invoke(null, offset, length, reference)); + } + + /** + * Build a WrappedFileRange from a ParquetFileRange; + * the reference field of the wrapped object refers + * back to the object passed in. + * + * @param in input. + * + * @return a wrapper around a FileRange instance + */ + public FileRangeBridge.WrappedFileRange toFileRange(final ParquetFileRange in) { + // create a new wrapped file range, fill in and then + // get the instance + return createFileRange(in.getOffset(), in.getLength(), in); + } + + @Override + public String toString() { + return "FileRangeBridge{" + + "available=" + available + + ", fileRangeInterface=" + fileRangeInterface + + ", _getOffset=" + _getOffset + + ", _getLength=" + _getLength + + ", _getData=" + _getData + + ", _setData=" + _setData + + ", _getReference=" + _getReference + + ", createFileRange=" + createFileRange + + '}'; + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static FileRangeBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Wraps a Vector {@code FileRange} instance through reflection. + */ + class WrappedFileRange { + + /** + * The wrapped {@code FileRange} instance. + */ + private final Object fileRange; + + /** + * Instantiate. + * + * @param fileRange non null {@code FileRange} instance. + */ + WrappedFileRange(final Object fileRange) { + this.fileRange = requireNonNull(fileRange); + } + + public long getOffset() { + return _getOffset.invoke(fileRange); + } + + public int getLength() { + return _getLength.invoke(fileRange); + } + + public CompletableFuture getData() { + return _getData.invoke(fileRange); + } + + public void setData(final CompletableFuture data) { + _setData.invoke(fileRange, data); + } + + public Object getReference() { + return _getReference.invoke(fileRange); + } + + /** + * Get the wrapped fileRange. + * + * @return the fileRange. + */ + public Object getFileRange() { + return fileRange; + } + + @Override + public String toString() { + return "WrappedFileRange{" + + "fileRange=" + fileRange + + '}'; + } + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java new file mode 100644 index 0000000000..ba94c2e976 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/VectorIOBridge.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntFunction; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.parquet.io.ParquetFileRange; +import org.apache.parquet.util.DynMethods; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.loadInvocation; + +/** + * Vector IO bridge. + *

+ * This loads the {@code PositionedReadable} method: + *

+ *   void readVectored(List[?extends FileRange] ranges,
+ *     IntFunction[ByteBuffer] allocate) throws IOException
+ * 
+ * It is made slightly easier because of type erasure; the signature of the + * function is actually {@code Void.class method(List.class, IntFunction.class)}. + *

+ * There are some counters to aid in testing; the {@link #toString()} method + * will print them and the loaded method, for use in tests and debug logs. + */ +public final class VectorIOBridge { + + private static final Logger LOG = LoggerFactory.getLogger(VectorIOBridge.class); + + /** + * readVectored Method to look for. + */ + private static final String READ_VECTORED = "readVectored"; + + /** + * hasCapability Method name. + * {@code boolean hasCapability(String capability);} + */ + private static final String HAS_CAPABILITY = "hasCapability"; + + /** + * hasCapability() probe for vectored IO api implementation. + */ + static final String VECTOREDIO_CAPABILITY = "in:readvectored"; + + /** + * The singleton instance of the bridge. + */ + private static final VectorIOBridge INSTANCE = new VectorIOBridge(); + + /** + * readVectored() method. + */ + private final DynMethods.UnboundMethod readVectored; + + /** + * {@code boolean StreamCapabilities.hasCapability(String)}. + */ + private final DynMethods.UnboundMethod hasCapabilityMethod; + + /** + * How many vector read calls made. + */ + private final AtomicLong vectorReads = new AtomicLong(); + + /** + * How many blocks were read. + */ + private final AtomicLong blocksRead = new AtomicLong(); + + /** + * How many bytes were read. + */ + private final AtomicLong bytesRead = new AtomicLong(); + + /** + * Constructor. package private for testing. + */ + private VectorIOBridge() { + + readVectored = loadInvocation(PositionedReadable.class, + Void.TYPE, READ_VECTORED, List.class, IntFunction.class); + LOG.debug("Vector IO availability; ", available()); + + + // if readVectored is present, so is hasCapabilities(). + hasCapabilityMethod = loadInvocation(FSDataInputStream.class, + boolean.class, HAS_CAPABILITY, String.class); + + } + + /** + * Is the bridge available. + * + * @return true if readVectored() is available. + */ + public boolean available() { + return !readVectored.isNoop() && FileRangeBridge.bridgeAvailable(); + } + + /** + * Check that the bridge is available. + * + * @throws UnsupportedOperationException if it is not. + */ + private void checkAvailable() { + if (!available()) { + throw new UnsupportedOperationException("Hadoop VectorIO not found"); + } + } + + /** + * Read data in a list of file ranges. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * The {@link ParquetFileRange} parameters all have their + * data read futures set to the range reads of the associated + * operations; callers must await these to complete. + * + * @param stream stream from where the data has to be read. + * @param ranges parquet file ranges. + * @param allocate allocate function to allocate memory to hold data. + * @throws UnsupportedOperationException if the API is not available. + */ + public static void readVectoredRanges( + final FSDataInputStream stream, + final List ranges, + final IntFunction allocate) { + + final VectorIOBridge bridge = availableInstance(); + final FileRangeBridge rangeBridge = FileRangeBridge.instance(); + // Setting the parquet range as a reference. + List fileRanges = ranges.stream() + .map(rangeBridge::toFileRange) + .collect(Collectors.toList()); + bridge.readWrappedRanges(stream, fileRanges, allocate); + + // copy back the completable futures from the scheduled + // vector reads to the ParquetFileRange entries passed in. + fileRanges.forEach(fileRange -> { + // toFileRange() sets up this back reference + ParquetFileRange parquetFileRange = (ParquetFileRange) fileRange.getReference(); + parquetFileRange.setDataReadFuture(fileRange.getData()); + }); + //throw new RuntimeException("readVectoredRanges"); + + } + + + /** + * Read data in a list of wrapped file ranges, extracting the inner + * instances and then executing. + * Returns when the data reads are active; possibly executed + * in a blocking sequence of reads, possibly scheduled on different + * threads. + * + * @param stream stream from where the data has to be read. + * @param ranges wrapped file ranges. + * @param allocate allocate function to allocate memory to hold data. + */ + private void readWrappedRanges( + final PositionedReadable stream, + final List ranges, + final IntFunction allocate) { + + // update the counters. + vectorReads.incrementAndGet(); + blocksRead.addAndGet(ranges.size()); + // extract the instances the wrapped ranges refer to; update the + // bytes read counter. + List instances = ranges.stream() + .map(r -> { + bytesRead.addAndGet(r.getLength()); + return r.getFileRange(); + }) + .collect(Collectors.toList()); + LOG.debug("readVectored with {} ranges on stream {}", + ranges.size(), stream); + readVectored.invoke(stream, instances, allocate); + } + + @Override + public String toString() { + return "VectorIOBridge{" + + "readVectored=" + readVectored + + ", vectorReads=" + vectorReads.get() + + ", blocksRead=" + blocksRead.get() + + ", bytesRead=" + bytesRead.get() + + '}'; + } + + /** + * Does a stream implement StreamCapability and, if so, is the capability + * available. + * If the method is not found, this predicate will return false. + * @param stream input stream to query. + * @param capability the capability to look for. + * @return true if the stream declares the capability is available. + */ + public boolean hasCapability(final FSDataInputStream stream, + final String capability) { + + if (hasCapabilityMethod.isNoop()) { + return false; + } else { + return hasCapabilityMethod.invoke(stream, capability); + } + } + + + /** + * How many vector read calls have been made? + * @return the count of vector reads. + */ + public long getVectorReads() { + return vectorReads.get(); + } + + /** + * How many blocks were read? + * @return the count of blocks read. + */ + public long getBlocksRead() { + return blocksRead.get(); + } + + /** + * How many bytes of data have been read. + * @return the count of bytes read. + */ + public long getBytesRead() { + return bytesRead.get(); + } + + /** + * Reset all counters; for testing. + * Non-atomic. + */ + void resetCounters() { + vectorReads.set(0); + blocksRead.set(0); + bytesRead.set(0); + } + + /** + * Get the singleton instance. + * + * @return instance. + */ + public static VectorIOBridge instance() { + return INSTANCE; + } + + /** + * Is the bridge available for use? + * + * @return true if the vector IO API is available. + */ + public static boolean bridgeAvailable() { + return instance().available(); + } + + /** + * Get the instance verifying that the API is available. + * @return an available instance, always + * @throws UnsupportedOperationException if it is not. + */ + public static VectorIOBridge availableInstance() { + final VectorIOBridge bridge = instance(); + bridge.checkAvailable(); + return bridge; + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java new file mode 100644 index 0000000000..6c927e7c08 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/vectorio/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Vector IO support for Hadoop 3.3.5 runtimes. + * Uses reflect so will compile against older versions, + * but will not actually work. + */ +package org.apache.parquet.hadoop.util.vectorio; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java index a6d2732d1b..8ca40484e5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInputFormatColumnProjection.java @@ -42,15 +42,21 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import static java.lang.Thread.sleep; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +@RunWith(Parameterized.class) public class TestInputFormatColumnProjection { public static final String FILE_CONTENT = "" + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ," + @@ -96,7 +102,19 @@ protected void map(Void key, Group value, Context context) @Rule public TemporaryFolder temp = new TemporaryFolder(); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputFormatColumnProjection(boolean readType) { + this.readType = readType; + } @Test public void testProjectionSize() throws Exception { Assume.assumeTrue( // only run this test for Hadoop 2 @@ -115,6 +133,8 @@ public void testProjectionSize() throws Exception { outputFolder.delete(); Configuration conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); // set the projection schema conf.set("parquet.read.schema", Types.buildMessage() .required(BINARY).as(UTF8).named("char") @@ -164,7 +184,8 @@ public void testProjectionSize() throws Exception { bytesRead = Reader.bytesReadCounter.getValue(); } - Assert.assertTrue("Should read less than 10% of the input file size", + Assert.assertTrue("Should read (" + bytesRead + " bytes)" + + " less than 10% of the input file size (" + bytesWritten + ")", bytesRead < (bytesWritten / 10)); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 8dcbf4acf4..94ee413fd6 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -89,10 +89,13 @@ import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestParquetFileWriter { private static final Logger LOG = LoggerFactory.getLogger(TestParquetFileWriter.class); @@ -125,13 +128,34 @@ public class TestParquetFileWriter { @Rule public final TemporaryFolder temp = new TemporaryFolder(); + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestParquetFileWriter(boolean readType) { + this.readType = readType; + } + + private Configuration getTestConfiguration() { + Configuration conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); + return conf; + } + @Test public void testWriteMode() throws Exception { File testFile = temp.newFile(); MessageType schema = MessageTypeParser.parseMessageType( "message m { required group a {required binary b;} required group " + "c { required int64 d; }}"); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); ParquetFileWriter writer = null; boolean exceptionThrown = false; @@ -160,7 +184,7 @@ public void testWriteRead() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); @@ -444,10 +468,11 @@ public void testAlignmentWithPadding() throws Exception { File testFile = temp.newFile(); Path path = new Path(testFile.toURI()); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); // Disable writing out checksums as hardcoded byte offsets in assertions below expect it conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); + // uses the test constructor ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60); @@ -553,7 +578,7 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { File testFile = temp.newFile(); Path path = new Path(testFile.toURI()); - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); // Disable writing out checksums as hardcoded byte offsets in assertions below expect it conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); @@ -686,7 +711,7 @@ public void testWriteReadStatistics() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b (UTF8);} required group c { required int64 d; }}"); @@ -774,7 +799,7 @@ public void testMetaDataFile() throws Exception { File testDir = temp.newFolder(); Path testDirPath = new Path(testDir.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); final FileSystem fs = testDirPath.getFileSystem(configuration); enforceEmptyDir(configuration, testDirPath); @@ -826,7 +851,7 @@ public void testWriteReadStatisticsAllNulls() throws Exception { Path path = new Path(testFile.toURI()); MessageType schema = MessageTypeParser.parseMessageType(writeSchema); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); GroupWriteSupport.setSchema(schema, configuration); @@ -1000,7 +1025,7 @@ public void testMergeFooters() { */ @Test public void testWriteMetadataFileWithRelativeOutputPath() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = getTestConfiguration(); FileSystem fs = FileSystem.get(conf); Path relativeRoot = new Path("target/_test_relative"); Path qualifiedRoot = fs.makeQualified(relativeRoot); @@ -1026,7 +1051,7 @@ public void testColumnIndexWriteRead() throws Exception { testFile.delete(); Path path = new Path(testFile.toURI()); - Configuration configuration = new Configuration(); + Configuration configuration = getTestConfiguration(); ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); w.start(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java index 188a79643d..80baaa68a4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -63,9 +64,13 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageTypeParser; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) public class TestInputOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(TestInputOutputFormat.class); @@ -82,9 +87,24 @@ public class TestInputOutputFormat { private Class> readMapperClass; private Class> writeMapperClass; + @Parameterized.Parameters(name = "vectored : {0}") + public static List params() { + return Arrays.asList(true, false); + } + + /** + * Read type: true for vectored IO. + */ + private final boolean readType; + + public TestInputOutputFormat(boolean readType) { + this.readType = readType; + } @Before public void setUp() { conf = new Configuration(); + // set the vector IO option + conf.setBoolean(ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED, readType); writeSchema = "message example {\n" + "required int32 line;\n" + "required binary content;\n" + @@ -335,8 +355,9 @@ public void testReadWriteWithCounter() throws Exception { assertTrue(value(readJob, "parquet", "bytesread") > 0L); assertTrue(value(readJob, "parquet", "bytestotal") > 0L); - assertTrue(value(readJob, "parquet", "bytesread") - == value(readJob, "parquet", "bytestotal")); + assertEquals("bytestotal != bytesread", + value(readJob, "parquet", "bytestotal"), + value(readJob, "parquet", "bytesread")); //not testing the time read counter since it could be zero due to the size of data is too small } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java new file mode 100644 index 0000000000..5119177553 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestFileRangeBridge.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNoException; + +/** + * Test the {@link FileRangeBridge} class; requires + * Hadoop 3.3.5+ + */ +public class TestFileRangeBridge { + + private static final Logger LOG = LoggerFactory.getLogger(TestFileRangeBridge.class); + + /** + * Class required for tests to run: {@value }. + * Probed in setup. + */ + static final String CLASSNAME = "org.apache.hadoop.fs.FileRange"; + + @Before + public void setUp() { + + // look for the FileRange; if not found, skip + try { + this.getClass().getClassLoader().loadClass(CLASSNAME); + } catch (ReflectiveOperationException e) { + assumeNoException(e); + } + } + + /** + * Attempt to explicit instantiate the bridge; + * package private constructor. + */ + @Test + public void testInstantiate() throws Throwable { + new FileRangeBridge(); + } + + /** + * Is the shared bridge instance non-null, and does the + * {@link FileRangeBridge#bridgeAvailable()} predicate + * return true. + */ + @Test + public void testFileRangeBridgeAvailable() throws Throwable { + assertNotNull("FileRangeBridge instance null", FileRangeBridge.instance()); + assertTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + } + + /** + * Create a range and validate the getters return the original values. + */ + @Test + public void testCreateFileRange() { + Object reference = "backref"; + FileRangeBridge.WrappedFileRange range = FileRangeBridge.instance() + .createFileRange(512L, 16384, reference); + LOG.info("created range {}", range); + assertNotNull("null range", range); + assertNotNull("null range instance", range.getFileRange()); + assertEquals("offset of " + range, 512L, range.getOffset()); + assertEquals("length of " + range, 16384, range.getLength()); + assertSame("backref of " + range, reference, range.getReference()); + + // this isn't set until readVectored() is called + assertNull("non-null range future", range.getData()); + } + + /** + * Verify there is no parameter validation. + */ + @Test + public void testCreateInvalidRange() { + FileRangeBridge.instance() + .createFileRange(-1, -1, null); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java new file mode 100644 index 0000000000..8b6c7347e8 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/vectorio/TestVectorIOBridge.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util.vectorio; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.IntFunction; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.io.ParquetFileRange; + +import static org.apache.parquet.hadoop.util.vectorio.BindingUtils.awaitFuture; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.VECTOREDIO_CAPABILITY; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.instance; +import static org.apache.parquet.hadoop.util.vectorio.VectorIOBridge.readVectoredRanges; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; + +/** + * Test the vector IO bridge. + * Much of this is lifted from hadoop-common test + * {@code AbstractContractVectoredReadTest}; + * with other utility methods from + * {@code org.apache.hadoop.fs.contract.ContractTestUtils}. + */ +public class TestVectorIOBridge { + private static final int DATASET_LEN = 64 * 1024; + private static final byte[] DATASET = dataset(DATASET_LEN, 'a', 32); + private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + + /** + * Timeout in seconds for vectored read operation in tests : {@value}. + */ + private static final int VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS = 5 * 60; + + /** + * relative vectored path. + */ + private final Path vectoredPath = new Path("target/test/vectored"); + + private final HeapByteBufferAllocator bufferAllocator = new HeapByteBufferAllocator(); + private final ElasticByteBufferPool pool = + new ElasticByteBufferPool(); + + private final IntFunction allocate = value -> { + return pool.getBuffer(false, value); + }; + + private FileSystem fileSystem; + private Path testFilePath; + + public TestVectorIOBridge() { + + } + + @Before + public void setUp() throws IOException { + // skip the tests if the FileRangeBridge goes not load. + assumeTrue("Bridge not available", FileRangeBridge.bridgeAvailable()); + + fileSystem = FileSystem.getLocal(new Configuration()); + testFilePath = fileSystem.makeQualified(vectoredPath); + createFile(fileSystem, testFilePath, DATASET); + } + + @After + public void tearDown() throws IOException { + if (fileSystem != null) { + fileSystem.delete(testFilePath, false); + } + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + /** + * If the file range bridge is available, so must be the vector io bridge. + */ + @Test + public void testVectorIOBridgeAvailable() throws Throwable { + assertTrue("VectorIOBridge not available", VectorIOBridge.bridgeAvailable()); + } + + /** + * Create a dataset for use in the tests; all data is in the range + * base to (base+modulo-1) inclusive. + * + * @param len length of data + * @param base base of the data + * @param modulo the modulo + * + * @return the newly generated dataset + */ + private static byte[] dataset(int len, int base, int modulo) { + byte[] dataset = new byte[len]; + for (int i = 0; i < len; i++) { + dataset[i] = (byte) (base + (i % modulo)); + } + return dataset; + } + + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * + * @param fileRanges list of file range. + * @param pool buffer pool. + * + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (ParquetFileRange range : fileRanges) { + ByteBuffer buffer = awaitFuture(range.getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + + /** + * Create a file. + * + * @param fs filesystem + * @param path path to write + * @param data source dataset. Can be null + * + * @throws IOException on any problem + */ + public static void createFile(FileSystem fs, + Path path, + byte[] data) throws IOException { + try (FSDataOutputStream stream = fs.create(path, true)) { + if (data != null && data.length > 0) { + stream.write(data); + } + } + } + + /** + * Open the test file. + * @return test file input stream + * @throws IOException failure to open + */ + private FSDataInputStream openTestFile() throws IOException { + return getFileSystem().open(testFilePath); + } + + /** + * Read a list of ranges, all adjacent. + */ + @Test + public void testVectoredReadMultipleRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + fileRanges.add(range(i * 100, 100)); + } + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * VectorIO and readFully() can coexist. + */ + @Test + public void testVectoredReadAndReadFully() throws Exception { + final int offset = 100; + final int length = 256; + List fileRanges = ranges(offset, length); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + byte[] readFullRes = new byte[length]; + in.readFully(offset, readFullRes); + ByteBuffer vecRes = awaitFuture(fileRanges.get(0).getDataReadFuture(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + final byte[] array = vecRes.array(); + assertDatasetEquals(0, "readFully", + vecRes, length, readFullRes); + } finally { + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024,none of the test ranges + * will get merged. + */ + @Test + public void testDisjointRanges() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Verify the stream really implements the readVectored API, rather + * than fall back to the base implementation. + */ + @Test + public void testStreamImplementsReadVectored() throws Exception { + + try (FSDataInputStream in = openTestFile()) { + final boolean streamDoesNativeVectorIO = instance().hasCapability( + in, + VECTOREDIO_CAPABILITY); + assertTrue("capability " + VECTOREDIO_CAPABILITY + " not supported by " + in, + streamDoesNativeVectorIO); + } + } + + /** + * As the minimum seek value is 4*1024, all the below ranges + * will get merged into one. + */ + @Test + public void testAllRangesMergedIntoOne() throws Exception { + List fileRanges = ranges( + 0, 100, + 4_000 + 101, 100, + 16_000 + 101, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + */ + @Test + public void testSomeRangesMergedSomeUnmerged() throws Exception { + List fileRanges = ranges( + 8 * 1024, 100, + 14 * 1024, 100, + 10 * 1024, 100, + 2 * 1024 - 101, 100, + 40 * 1024, 1024); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testOverlappingRanges() throws Exception { + List fileRanges = getSampleOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + List fileRanges = getSampleSameRanges(); + + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 1000, 200, + 50, 10, + 10, 5); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testConsecutiveRanges() throws Exception { + List fileRanges = ranges( + 500, 100, + 600, 200, + 800, 100); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testNegativeLengthRange() throws Exception { + verifyExceptionalVectoredRead(getFileSystem(), ranges(1, -50), + IllegalArgumentException.class); + } + + /** + * Negative ranges are rejected; the inner cause is an + * {@code EOFException}. + */ + @Test + public void testNegativeOffsetRange() throws Exception { + final RuntimeException ex = verifyExceptionalVectoredRead( + getFileSystem(), ranges(-1, 50), + RuntimeException.class); + if (!(ex.getCause() instanceof EOFException)) { + throw ex; + } + } + + /** + * Classic seek/read read after vectored IO. + */ + @Test + public void testNormalReadAfterVectoredRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + readVectoredRanges(in, fileRanges, allocate); + + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + /** + * Vectored IO after Classic seek/read. + */ + @Test + public void testVectoredReadAfterNormalRead() throws Exception { + List fileRanges = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + assertEquals("Vectored read shouldn't change file pointer.", 200, in.getPos()); + readVectoredRanges(in, fileRanges, allocate); + + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); + } + } + + @Test + public void testMultipleVectoredReads() throws Exception { + List fileRanges1 = getSampleNonOverlappingRanges(); + + List fileRanges2 = getSampleNonOverlappingRanges(); + try (FSDataInputStream in = openTestFile()) { + + readVectoredRanges(in, fileRanges1, allocate); + readVectoredRanges(in, fileRanges2, allocate); + + validateVectoredReadResult(fileRanges2, DATASET); + validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); + } + } + + /** + * Test to validate EOF ranges. Default implementation fails with EOFException + * while reading the ranges. Some implementation like s3, checksum fs fail fast + * as they already have the file length calculated. + * + * @throws Exception + */ +/* +@Test +public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = ranges(DATASET_LEN, 100); + try (FSDataInputStream in = fs.open(testFilePath)) { + in.readVectored(fileRanges, allocate); + readVectoredRanges(in, fileRanges, allocate); + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + interceptFuture(EOFException.class, + "", + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS, + data); + } + } +}*/ + + /** + * Create a ParquetFileRange instance. + * @param offset offset in file + * @param length range length + * @return a range + */ + private ParquetFileRange range(final long offset, final int length) { + return new ParquetFileRange(offset, length); + } + + /** + * Create a list of ranges where the arguments are expected to + * be pairs of int offset and range. + * @param args an even-numbered list. + * @return a list of ranges. + */ + private List ranges(int... args) { + final int len = args.length; + assertTrue("range argument length of " + len + " is not even", + (len % 1) == 0); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < len; i += 2) { + fileRanges.add(range(args[i], args[i + 1])); + } + return fileRanges; + } + + protected List getSampleNonOverlappingRanges() { + return ranges(0, 100, 110, 50); + } + + protected List getSampleOverlappingRanges() { + return ranges( + 100, 500, + 400, 500); + } + + protected List getConsecutiveRanges() { + return ranges( + 100, 500, + 600, 500); + } + + protected List getSampleSameRanges() { + return ranges( + 8_000, 1000, + 8_000, 1000, + 8_000, 1000); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData original data. + */ + public static void assertDatasetEquals( + final int readOffset, + final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } + + /** + * Utility to validate vectored read results. + * + * @param fileRanges input ranges. + * @param originalData original data. + * + * @throws IOException any ioe. + */ + public static void validateVectoredReadResult(List fileRanges, + byte[] originalData) + throws IOException, TimeoutException { + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (ParquetFileRange res : fileRanges) { + completableFutures[i++] = res.getDataReadFuture(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + awaitFuture(combinedFuture, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + + for (ParquetFileRange res : fileRanges) { + CompletableFuture data = res.getDataReadFuture(); + ByteBuffer buffer = awaitFuture(data, + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + assertDatasetEquals((int) res.getOffset(), "vecRead", + buffer, res.getLength(), originalData); + } + } + + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * + * @throws Exception any other IOE. + */ + protected T verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + + try (FSDataInputStream in = fs.open(testFilePath)) { + readVectoredRanges(in, fileRanges, allocate); + fail("expected error reading " + in); + // for the compiler + return null; + } catch (AssertionError e) { + throw e; + } catch (Exception e) { + if (!clazz.isAssignableFrom(e.getClass())) { + throw e; + } + return (T) e; + } + } +}